Source code for finaletoolkit.io.reference

"""
Unified access to 2bit and FASTA reference genomes.

:class:`ReferenceWrapper` hides the py2bit/pysam differences behind a single
:meth:`ReferenceWrapper.sequence` method and is optionally thread-safe.
"""
from __future__ import annotations

import os
import threading
from pathlib import Path
from typing import Dict

import py2bit
import pysam

from ..exceptions import ContigNotFoundError, OutOfBoundsError
from ..utils.logging import get_logger

logger = get_logger(__name__)

__all__ = ["ReferenceWrapper"]

_TWOBIT_SUFFIXES = (".2bit", ".tb2")
_FASTA_SUFFIXES = (
    ".fa",
    ".fasta",
    ".fna",
    ".fa.gz",
    ".fasta.gz",
    ".fna.gz",
)


[docs] class ReferenceWrapper: """Wrap a 2bit- or FASTA-formatted reference genome for sequence queries. The wrapper encapsulates the py2bit/pysam handle logic behind a common interface and may be used as a context manager. Parameters ---------- reference_path : str or Path Path to a ``.2bit``/``.tb2`` or FASTA (``.fa``/``.fasta``/``.fna``, optionally ``.gz``) reference file. use_lock : bool, optional If ``True`` (default), guard sequence queries with a :class:`threading.Lock` so a single instance may be shared across threads. Set ``False`` for per-thread/per-process instances to avoid lock overhead. Raises ------ FileNotFoundError If ``reference_path`` does not exist. """ def __init__(self, reference_path: str | Path, use_lock: bool = True) -> None: self.reference_path = str(reference_path) self.use_lock = use_lock self._lock = threading.Lock() if use_lock else None self._handle = None self._is_2bit = False self._is_fasta = False self._chroms: Dict[str, int] | None = None if not os.path.exists(self.reference_path): error_msg = f"Reference file not found: {self.reference_path}" logger.error(error_msg) raise FileNotFoundError(error_msg) if self.reference_path.endswith(_TWOBIT_SUFFIXES): self._is_2bit = True self._open_2bit() elif self.reference_path.endswith(_FASTA_SUFFIXES): self._is_fasta = True self._open_fasta() else: logger.warning( "Unknown reference file extension for %s. " "Attempting to open as FASTA.", self.reference_path, ) self._is_fasta = True self._open_fasta() # -- handle management -------------------------------------------------- def _open_2bit(self) -> None: try: self._handle = py2bit.open(self.reference_path) self._chroms = self._handle.chroms() except Exception as e: logger.error("Failed to open 2-bit file %s: %s", self.reference_path, e) raise def _open_fasta(self) -> None: fai_path = self.reference_path + ".fai" if not os.path.exists(fai_path): logger.info( "FASTA index not found for %s. Creating index with " "pysam.faidx()...", self.reference_path, ) pysam.faidx(self.reference_path) try: self._handle = pysam.FastaFile(self.reference_path) self._chroms = dict(zip(self._handle.references, self._handle.lengths)) except Exception as e: logger.error("Failed to open FASTA file %s: %s", self.reference_path, e) raise # -- public interface --------------------------------------------------- @property def chroms(self) -> Dict[str, int]: """Mapping of chromosome name to length.""" return self._chroms
[docs] def sequence( self, contig: str, start: int | None = None, stop: int | None = None, fail_on_excess_range: bool = True, ) -> str: """Return the upper-cased reference sequence for a region. Parameters ---------- contig : str Chromosome or contig name. start : int, optional 0-based start position (defaults to 0). stop : int, optional 0-based, exclusive end position (defaults to the contig length). fail_on_excess_range : bool, optional If ``True`` (default), raise when the range exceeds the contig bounds; otherwise truncate the range to the contig. Returns ------- str The upper-cased sequence (empty string if the truncated range is empty). Raises ------ ContigNotFoundError If ``contig`` is absent from the reference. OutOfBoundsError If the range is out of bounds and ``fail_on_excess_range`` is True. """ if contig not in self._chroms: raise ContigNotFoundError(f"Contig {contig} not found in reference.") chrom_len = self._chroms[contig] if start is None: start = 0 if stop is None: stop = chrom_len if start < 0 or stop > chrom_len or start > stop: if fail_on_excess_range: raise OutOfBoundsError( f"Requested range {contig}:{start}-{stop} is out of bounds " f"(0-{chrom_len})." ) start = max(0, start) stop = min(chrom_len, stop) if start > stop: return "" seq = self._fetch(contig, start, stop) return seq.upper() if seq else ""
def _fetch(self, contig: str, start: int, stop: int) -> str: """Backend-agnostic fetch, guarded by the lock when enabled.""" if self._lock is not None: with self._lock: return self._fetch_unlocked(contig, start, stop) return self._fetch_unlocked(contig, start, stop) def _fetch_unlocked(self, contig: str, start: int, stop: int) -> str: # Both backends use 0-based, half-open coordinates. if self._is_2bit: return self._handle.sequence(contig, start, stop) return self._handle.fetch(contig, start, stop)
[docs] def close(self) -> None: """Safely close the underlying file handle.""" if self._lock is not None: with self._lock: self._close_handle() else: self._close_handle()
def _close_handle(self) -> None: if self._handle is not None: self._handle.close() self._handle = None # -- context manager / slicing ----------------------------------------- def __enter__(self) -> "ReferenceWrapper": return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.close() def __del__(self) -> None: try: self.close() except Exception: pass def __getitem__(self, contig: str) -> "_ContigSlicer": """Enable ``ref['chr1'][100:200]`` slicing syntax.""" if contig not in self._chroms: raise ContigNotFoundError(f"Contig {contig} not found in reference.") return _ContigSlicer(self, contig)
class _ContigSlicer: """Helper enabling ``ref[contig][start:stop]`` slicing of a reference.""" def __init__(self, wrapper: ReferenceWrapper, contig: str) -> None: self.wrapper = wrapper self.contig = contig def __getitem__(self, key: slice | int) -> str: if isinstance(key, slice): return self.wrapper.sequence(self.contig, key.start, key.stop) if isinstance(key, int): return self.wrapper.sequence(self.contig, key, key + 1) raise TypeError("Slicer indices must be integers or slices.") def __len__(self) -> int: return self.wrapper.chroms[self.contig]