skimindex.processing
skimindex.processing
skimindex.processing — registry of data processing operation types.
Each processing type is a Python function decorated with @processing_type.
The decorator registers it so that config validation can verify
[processing.X].type values and build() can construct a ready-to-call
pipeline step directly from a TOML config block.
Key concepts:
OutputKind— what a type produces:STREAM(stdout pipe),DIRECTORY, orFILE.ProcessingType— metadata + builder for one registered type.@processing_type— registers a builder function by its__name__.build(name)— unified factory: detects atomic vs composite automatically.make_pipeline(name)— builds a composite pipeline from astepslist.
Example
from skimindex.processing import processing_type, OutputKind, build
# Registering a new type:
@processing_type(output_kind=OutputKind.STREAM)
def my_filter(params: dict):
"""Filter sequences by a custom criterion."""
threshold = params.get("threshold", 10)
def run(input_data):
...
return run
# Building and running from config:
step = build("count_kmers_decontam")
result = step(input_data, dry_run=False)
OutputKind
Bases: Enum
ProcessingType
dataclass
ProcessingType(
name: str,
description: str,
output_kind: OutputKind,
needs_tmpdir: bool = False,
is_indexer: bool = False,
output_filename: str | None = None,
_builder: Callable[[dict[str, Any]], Callable] = None,
)
Metadata and builder for a registered processing operation type.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
Type name (used as |
description |
str
|
Human-readable description (from |
output_kind |
OutputKind
|
What the type produces ( |
needs_tmpdir |
bool
|
|
is_indexer |
bool
|
|
output_filename |
str | None
|
Filename used when a |
pipable
property
pipable: bool
True if the output is a STDOUT stream (chainable in a plumbum pipe).
is_runnable
is_runnable(params: dict[str, Any]) -> bool
True if this type has an effective output directory with these params.
A type is runnable when 'output' is present in params. STREAM/FILE types additionally require output_filename to be set.
build
build(params: dict[str, Any]) -> Callable
Return a callable configured with params, ready to execute.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
params
|
dict[str, Any]
|
The |
required |
Returns:
| Type | Description |
|---|---|
Callable
|
A callable whose signature depends on |
Callable
|
|
Callable
|
|
processing_type
processing_type(
output_kind: OutputKind,
needs_tmpdir: bool = False,
output_filename: str | None = None,
is_indexer: bool = False,
)
Decorator that registers a builder function as a ProcessingType.
The function's name becomes the type name; doc becomes the description. The function itself is the builder: it receives a params dict and returns a callable ready to execute.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
output_kind
|
OutputKind
|
What the type produces (STREAM / DIRECTORY / FILE). |
required |
needs_tmpdir
|
bool
|
True if the type needs a temporary working directory. |
False
|
output_filename
|
str | None
|
Filename used when a STREAM/FILE output is persisted. None = type cannot be persisted (must stay temporary). |
None
|
is_indexer
|
bool
|
True if this type writes to the indexes tree rather than the processed_data tree. |
False
|
Example::
@processing_type(output_kind=OutputKind.STREAM)
def split(params: dict):
"""Split sequences into overlapping fragments."""
size = params.get("size", 200)
def run(input_cmd):
return input_cmd | obiscript(SPLITSEQS_LUA)
return run
get_processing_type
get_processing_type(name: str) -> ProcessingType
Return a registered ProcessingType by name, or raise KeyError.
registered_types
registered_types() -> frozenset[str]
Return the set of all registered processing type names.
make_pipeline
make_pipeline(processing_name: str) -> Callable
Build a runnable callable from a composite [processing.X] TOML block.
A composite block has a steps list and a top-level output reference.
Each step is either an inline dict ({type = "split", size = 200, …}) or
a string reference to another named [processing.*] block.
Persistence rules (managed by the runner, not the atomic bricks):
STREAMstep withdirectory, non-terminal → tee to file, pipe continues.STREAMstep withdirectory, terminal → redirect stdout to file.DIRECTORY/FILEstep →output_dirpassed to brick (step dir or tmpdir).- No
directory→STREAMchains without executing;DIRECTORYuses tmpdir.
Stamp management: needs_run / stamp operate on the composite output
only. Intermediate steps with a directory are persisted but not stamped.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
processing_name
|
str
|
Key of the |
required |
Returns:
| Type | Description |
|---|---|
Callable
|
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If the block has no |
build
build(processing_name: str) -> Callable
Build a runnable callable from any [processing.X] TOML block.
Dispatches automatically:
stepskey present →make_pipeline()(composite pipeline).typekey present → atomic build (type must be registered and runnable).
The returned callable wraps needs_run / stamp so callers do not
need to manage stamps themselves.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
processing_name
|
str
|
Key of the |
required |
Returns:
| Type | Description |
|---|---|
Callable
|
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If the block is not found, is not runnable (no |
Built-in processing types
skimindex.processing.buildindex
skimindex.processing.buildindex — atomic 'buildindex' processing type.
Builds a kmindex sub-index from FASTA fragments using kmindex build.
Output kind: DIRECTORY (is_indexer=True).
Called once per dataset. Scans the dataset output directory recursively for
all parts/ subdirectories (one sample per assembly or division) and all
kmercount/ subdirectories (to compute the max F1 across samples).
Bloom filter sizing uses a single-hash model:
fpr = (n / (n + m)) ^ (z+1) → m = ceil(n * (fpr^(-1/(z+1)) - 1))
where:
n= max number of distinct k-mers across all samples (max F1 from ntcard)m= Bloom filter size in cells (nb_cellparameter to kmindex)z= kmindex--zvalueparameter (queries use z+1 k-mers for a positive hit)fpr= target false positive rate
buildindex
buildindex(
params: dict,
) -> Callable[[Data, Path, bool], Data]
Build a kmindex sub-index for an entire dataset.
Called once per dataset. Scans the dataset output directory for all
parts/ subdirectories (one sample per assembly/division), computes
the Bloom filter size from the maximum F1 across all samples, and calls
kmindex build to register one sub-index in the global meta-index.
Parameters (TOML):
output: Artifact ref for the FOF directory and stamp target
(e.g. "kmindex@decontamination").
index: Artifact ref for the kmindex global meta-index
(e.g. "@idx:decontamination").
kmer_size: k-mer length (default 29).
zvalue: k-mers required for a positive query (default 3).
fpr: Target false positive rate (default 1e-3).
bloom_size: Explicit Bloom filter size; computed from F1 if omitted.
threads: Number of threads (default 1).
skimindex.processing.kmercount
skimindex.processing.kmercount — atomic 'kmercount' processing type.
Counts k-mers in a directory of FASTA fragments using ntcard. Output kind: DIRECTORY.
ntcard is run as
ntcard -t
The output prefix is derived from the last component of data.subdir
(typically the accession or division name), falling back to "kmers".
ntcard writes one file per k value:
kmercount
kmercount(
params: dict,
) -> Callable[[Data, Path, bool], Data]
Count k-mers in FASTA fragments using ntcard.