Skip to content

skimindex.processing

skimindex.processing

skimindex.processing — registry of data processing operation types.

Each processing type is a Python function decorated with @processing_type. The decorator registers it so that config validation can verify [processing.X].type values and build() can construct a ready-to-call pipeline step directly from a TOML config block.

Key concepts:

  • OutputKind — what a type produces: STREAM (stdout pipe), DIRECTORY, or FILE.
  • ProcessingType — metadata + builder for one registered type.
  • @processing_type — registers a builder function by its __name__.
  • build(name) — unified factory: detects atomic vs composite automatically.
  • make_pipeline(name) — builds a composite pipeline from a steps list.
Example
from skimindex.processing import processing_type, OutputKind, build

# Registering a new type:
@processing_type(output_kind=OutputKind.STREAM)
def my_filter(params: dict):
    """Filter sequences by a custom criterion."""
    threshold = params.get("threshold", 10)
    def run(input_data):
        ...
    return run

# Building and running from config:
step = build("count_kmers_decontam")
result = step(input_data, dry_run=False)

OutputKind

Bases: Enum

ProcessingType dataclass

ProcessingType(
    name: str,
    description: str,
    output_kind: OutputKind,
    needs_tmpdir: bool = False,
    is_indexer: bool = False,
    output_filename: str | None = None,
    _builder: Callable[[dict[str, Any]], Callable] = None,
)

Metadata and builder for a registered processing operation type.

Attributes:

Name Type Description
name str

Type name (used as type = "…" in TOML).

description str

Human-readable description (from __doc__).

output_kind OutputKind

What the type produces (STREAM, DIRECTORY, or FILE).

needs_tmpdir bool

True if the builder requires a temporary working directory.

is_indexer bool

True if outputs go to the indexes tree instead of processed_data.

output_filename str | None

Filename used when a STREAM/FILE output is persisted to disk (e.g. "filtered.fasta.gz"). None means the type cannot be persisted; declaring output for it is a validation error.

pipable property

pipable: bool

True if the output is a STDOUT stream (chainable in a plumbum pipe).

is_runnable

is_runnable(params: dict[str, Any]) -> bool

True if this type has an effective output directory with these params.

A type is runnable when 'output' is present in params. STREAM/FILE types additionally require output_filename to be set.

build

build(params: dict[str, Any]) -> Callable

Return a callable configured with params, ready to execute.

Parameters:

Name Type Description Default
params dict[str, Any]

The [processing.X] config dict for this step.

required

Returns:

Type Description
Callable

A callable whose signature depends on output_kind:

Callable

STREAM types return run(input_data) -> Data;

Callable

DIRECTORY types return run(input_data, output_dir, dry_run) -> Data.

processing_type

processing_type(
    output_kind: OutputKind,
    needs_tmpdir: bool = False,
    output_filename: str | None = None,
    is_indexer: bool = False,
)

Decorator that registers a builder function as a ProcessingType.

The function's name becomes the type name; doc becomes the description. The function itself is the builder: it receives a params dict and returns a callable ready to execute.

Parameters:

Name Type Description Default
output_kind OutputKind

What the type produces (STREAM / DIRECTORY / FILE).

required
needs_tmpdir bool

True if the type needs a temporary working directory.

False
output_filename str | None

Filename used when a STREAM/FILE output is persisted. None = type cannot be persisted (must stay temporary).

None
is_indexer bool

True if this type writes to the indexes tree rather than the processed_data tree.

False

Example::

@processing_type(output_kind=OutputKind.STREAM)
def split(params: dict):
    """Split sequences into overlapping fragments."""
    size = params.get("size", 200)
    def run(input_cmd):
        return input_cmd | obiscript(SPLITSEQS_LUA)
    return run

get_processing_type

get_processing_type(name: str) -> ProcessingType

Return a registered ProcessingType by name, or raise KeyError.

registered_types

registered_types() -> frozenset[str]

Return the set of all registered processing type names.

make_pipeline

make_pipeline(processing_name: str) -> Callable

Build a runnable callable from a composite [processing.X] TOML block.

A composite block has a steps list and a top-level output reference. Each step is either an inline dict ({type = "split", size = 200, …}) or a string reference to another named [processing.*] block.

Persistence rules (managed by the runner, not the atomic bricks):

  • STREAM step with directory, non-terminal → tee to file, pipe continues.
  • STREAM step with directory, terminal → redirect stdout to file.
  • DIRECTORY/FILE step → output_dir passed to brick (step dir or tmpdir).
  • No directorySTREAM chains without executing; DIRECTORY uses tmpdir.

Stamp management: needs_run / stamp operate on the composite output only. Intermediate steps with a directory are persisted but not stamped.

Parameters:

Name Type Description Default
processing_name str

Key of the [processing.X] block in the TOML config.

required

Returns:

Type Description
Callable

run(input_data: Data, dry_run: bool = False) -> Data

Raises:

Type Description
ValueError

If the block has no steps, no output, or references an unknown processing name.

build

build(processing_name: str) -> Callable

Build a runnable callable from any [processing.X] TOML block.

Dispatches automatically:

  • steps key present → make_pipeline() (composite pipeline).
  • type key present → atomic build (type must be registered and runnable).

The returned callable wraps needs_run / stamp so callers do not need to manage stamps themselves.

Parameters:

Name Type Description Default
processing_name str

Key of the [processing.X] block in the TOML config.

required

Returns:

Type Description
Callable

run(input_data: Data, dry_run: bool = False) -> Data

Raises:

Type Description
ValueError

If the block is not found, is not runnable (no output), or is missing both type and steps.

Built-in processing types

skimindex.processing.buildindex

skimindex.processing.buildindex — atomic 'buildindex' processing type.

Builds a kmindex sub-index from FASTA fragments using kmindex build. Output kind: DIRECTORY (is_indexer=True).

Called once per dataset. Scans the dataset output directory recursively for all parts/ subdirectories (one sample per assembly or division) and all kmercount/ subdirectories (to compute the max F1 across samples).

Bloom filter sizing uses a single-hash model:

fpr = (n / (n + m)) ^ (z+1)  →  m = ceil(n * (fpr^(-1/(z+1)) - 1))

where:

  • n = max number of distinct k-mers across all samples (max F1 from ntcard)
  • m = Bloom filter size in cells (nb_cell parameter to kmindex)
  • z = kmindex --zvalue parameter (queries use z+1 k-mers for a positive hit)
  • fpr = target false positive rate

buildindex

buildindex(
    params: dict,
) -> Callable[[Data, Path, bool], Data]

Build a kmindex sub-index for an entire dataset.

Called once per dataset. Scans the dataset output directory for all parts/ subdirectories (one sample per assembly/division), computes the Bloom filter size from the maximum F1 across all samples, and calls kmindex build to register one sub-index in the global meta-index.

Parameters (TOML): output: Artifact ref for the FOF directory and stamp target (e.g. "kmindex@decontamination"). index: Artifact ref for the kmindex global meta-index (e.g. "@idx:decontamination"). kmer_size: k-mer length (default 29). zvalue: k-mers required for a positive query (default 3). fpr: Target false positive rate (default 1e-3). bloom_size: Explicit Bloom filter size; computed from F1 if omitted. threads: Number of threads (default 1).

skimindex.processing.kmercount

skimindex.processing.kmercount — atomic 'kmercount' processing type.

Counts k-mers in a directory of FASTA fragments using ntcard. Output kind: DIRECTORY.

ntcard is run as

ntcard -t -k -p / /*.fasta.gz

The output prefix is derived from the last component of data.subdir (typically the accession or division name), falling back to "kmers". ntcard writes one file per k value: _k.hist

kmercount

kmercount(
    params: dict,
) -> Callable[[Data, Path, bool], Data]

Count k-mers in FASTA fragments using ntcard.