skimindex Processing Model
Overview
Processing sections ([processing.X]) describe how raw or downloaded data are
transformed into pipeline outputs. Each section is either atomic (a single
operation) or composite (an ordered sequence of steps).
The type and steps keys are mutually exclusive — a processing section
must have exactly one of them.
Atomic processing
An atomic section wraps a single registered operation. The type value maps
to a Python function decorated with @processing in the skimindex.processing
module — the set of valid types is therefore determined by the implementation.
| Key | Type | Required | Description |
|---|---|---|---|
type |
string | yes | Registered processing function name |
directory |
string | no | Output subdirectory. If absent, output is temporary (see Persistence) |
| (others) | any | no | Operation-specific parameters passed to the function |
[processing.split_decontam]
type = "split"
directory = "split" # results saved to this subdirectory
size = 200 # fragment size in bp
overlap = 28 # overlap = kmer_size - 1
[processing.remove_n]
type = "remove_n_only" # no directory → temporary
[processing.distribute_batches]
type = "distribute"
batches = 20 # no directory → temporary
Composite processing
A composite section chains multiple steps in order. Each step is either a named reference (string) or an inline atomic (TOML inline table).
| Key | Type | Required | Description |
|---|---|---|---|
steps |
array | yes | Ordered list of steps (strings or inline tables) |
directory |
string | yes* | Output subdirectory. *Required when this section is referenced by run |
Each element of steps is one of:
| Form | Example | Parameters |
|---|---|---|
| Named reference | "split_decontam" |
Defined in its own [processing.X] section |
| Inline atomic | {type = "remove_n_only"} |
Always temporary, always atomic |
[processing.prepare_decontam]
directory = "prepared" # required: this section is run by a role
steps = [
"split_decontam", # named reference — saved if it has directory
{type = "remove_n_only"}, # inline — always temporary
{type = "distribute", batches = 20}, # inline — always temporary
]
Persistence
Whether a step's output is saved to disk depends on whether it declares a
directory key. The runner (not the atomic bricks) enforces these rules.
Who persists what
| Context | directory present? |
Result |
|---|---|---|
Top-level [processing.X] referenced by run |
must be present | saved |
Named reference inside steps |
yes | saved to its directory |
Named reference inside steps |
no | temporary (cleaned up after pipeline) |
Inline table {type=...} inside steps |
n/a | always temporary |
Only the composite output directory carries a stamp. Intermediate steps
saved to their own directory are persisted but not stamped — they will not
trigger a short-circuit on re-run.
How the runner persists each OutputKind
STREAM output with directory, non-terminal step
The runner injects tee into the pipe so the byte stream is simultaneously
written to disk and forwarded to the next step:
input_cmd | tee(out_file) | next_step
The output file is step_dir / output_filename (declared by the type).
STREAM output with directory, terminal step (last in composite)
The runner redirects stdout to the output file and executes the pipeline:
(input_cmd > str(out_file))()
Execution happens here; the next Data object is files_data([out_file]).
DIRECTORY or FILE output with directory
The runner passes step_dir as the output directory to the atomic brick:
fn(data, step_dir, dry_run=dry_run)
The brick creates the directory, writes its files, and returns a Data object.
Without directory (temporary)
- STREAM: the step is chained without executing (no intermediate file).
- DIRECTORY / FILE: the runner creates a temporary directory via
tempfile.mkdtemp(), passes it to the brick, and removes it in atry/finallyblock after the pipeline completes.
Stamp management
needs_run(composite_output_dir, *sources) → skip if already stamped
remove_if_not_stamped(composite_output_dir) → clean partial results
… all steps execute …
stamp(composite_output_dir) → mark success
Stamp files live under the [stamp] directory configured in skimindex.toml.
Linking processing to roles and data
A role or data section triggers a processing pipeline via the run key:
[role.decontamination]
directory = "decontamination"
run = "prepare_decontam" # references [processing.prepare_decontam]
A data section can override the role's run:
[data.fungi]
source = "genbank"
role = "decontamination"
run = "prepare_gb_decontam" # overrides [role.decontamination].run
Rule: the processing section referenced by run must have a directory key.
Full example
# Atomic steps
[processing.split_decontam]
type = "split"
directory = "split"
size = 200
overlap = 28
[processing.count_kmers]
type = "kmercount"
directory = "kmercount"
kmer_size = 29
# Composite pipeline — run by [role.decontamination]
[processing.prepare_decontam]
directory = "prepared"
steps = [
"split_decontam", # saved to split/
{type = "remove_n_only"}, # temporary
"count_kmers", # saved to kmercount/
]
# Role referencing the pipeline
[role.decontamination]
directory = "decontamination"
run = "prepare_decontam"
Data model
Data flowing between processing steps is represented as a Data object.
The plumbum execution layer is an implementation detail — pipeline orchestration
only sees Data.
Three kinds, aligned with OutputKind:
| Kind | Description | Carries |
|---|---|---|
| STREAM | A deferred pipeline not yet executed | a plumbum command/pipe |
| FILES | One or more files on disk | list[Path] |
| DIRECTORY | A directory of files | a single Path |
Every processing type has the uniform interface Data → Data.
The pipeline executor chains steps by passing the Data output of one step as
the Data input of the next.
Initial data (downloaded sources) is wrapped into Data before entering the
pipeline:
files_data(Path("genome.gbff.gz"), format="gbff.gz") # single genome file
files_data(list(dir.glob("*.fasta.gz")), format="fasta.gz") # batch of files
directory_data(Path("/genbank/fasta/bct")) # directory of FASTA files
stream_data(cmd, format="fasta") # pre-filtered stream
The to_stream_command(data) adapter is the only place where Data touches
plumbum — it converts any Data kind into a plumbum source command via
obiconvert. Atomic type implementations call it internally; pipeline
orchestration never sees plumbum objects.
Output resolution
Effective output directory
The effective output directory of a processing section is determined as follows:
- Atomic with
directory: its owndirectory. - Atomic without
directory: no persistent output (temporary / piped). - Composite with
directory: its owndirectory(ignores last step's directory). - Composite without
directory: the effective output directory of its last step (which itself must have one, otherwise the composite has no persistent output).
The output type (STREAM / DIRECTORY / FILE) of a composite is always the output type
of its last step, regardless of whether the composite has its own directory.
Output filename for STREAM and FILE types
When a STREAM or FILE output must be persisted to disk (because a directory is
declared), the filename is determined by the processing type implementation, not
by the TOML config. Each registered processing type declares its output_filename in
code (e.g. "filtered.fasta.gz"), because the type knows its own output format.
A type with no declared output_filename cannot be persisted and must remain
temporary; declaring a directory for such a type is a validation error.
Runnability
A processing section is runnable as a top-level step only if it has an effective output directory. Sections without a persistent output can only appear as intermediate steps inside a composite.
Dataset binding and input chaining
role — which datasets a processing operates on
A processing section can declare the role of datasets it operates on via the
optional role key. When a pipeline is executed without an explicit dataset
list, the runner uses role to discover the relevant datasets automatically.
[processing.prepare_decontam]
role = "decontamination" # operates on all datasets with role="decontamination"
directory = "parts"
steps = [...]
This makes the processing section self-describing: it knows its own dataset scope without any hardcoding in the calling code.
input — where input data comes from
A processing section can declare where its input data comes from via the
optional input key:
input present? |
Source of input data |
|---|---|
| absent | raw dataset files (dataset.to_data()) |
input = "X" |
output directory of [processing.X] for the same dataset |
[processing.prepare_decontam]
role = "decontamination"
directory = "parts"
steps = [
{type = "split", size = 200, overlap = 28},
{type = "filter_n_only"},
{type = "distribute", batches = 20},
]
# no input → reads raw downloaded files
[processing.count_kmers_decontam]
type = "kmercount"
input = "prepare_decontam" # reads from parts/ produced by prepare_decontam
directory = "kmercount"
kmer_size = 29
These two sections implicitly define a chain:
dataset.to_data()
→ prepare_decontam (role="decontamination") → parts/
→ count_kmers_decontam → kmercount/
Each section can be run independently: decontam prepare starts from raw files,
decontam count starts from parts/ without re-running prepare_decontam.
Input resolution rules
The rule is recursive and uniform:
Standalone execution (referenced by run or called directly):
- no input → dataset.to_data() (raw source files)
- input = "X" → directory_data(output_dir_of_X / for_this_dataset)
As a step inside a composite:
- the step's own input key is silently ignored
- first step → receives the composite's resolved input (same rules as standalone)
- subsequent steps → receive the output of the previous step
The composite resolves its own input first, then propagates the result through
its steps in order. A step's input is only meaningful when that section is
executed as a top-level pipeline.
Validation rules
- A
[processing.X]section must have exactly one oftypeorsteps. - A section with
stepsis composite; all elements must be strings or inline tables withtype. - Inline tables inside
stepsmust not havesteps(inline steps are always atomic). - Any processing section referenced by a
runkey must have a runnable effective output directory. typevalues must match a registered@processing_typefunction inskimindex.processing.- A STREAM/FILE atomic with
directorymust have a declaredoutput_filenamein its type. - Composite sections (
steps) are not registered in the@processing_typeregistry. They are structural constructs executed by the pipeline runner, not named operations. - If
inputis present, it must reference an existing[processing.X]section that has adirectory. - If
roleis present, it must be a valid role name declared in a[role.X]section.