diff --git a/pybedlite/bed_record.py b/pybedlite/bed_record.py index 517852f..13cac5d 100644 --- a/pybedlite/bed_record.py +++ b/pybedlite/bed_record.py @@ -27,7 +27,6 @@ if TYPE_CHECKING: from pybedlite.overlap_detector import Interval - """Maximum BED fields that can be present in a well formed BED file written to specification""" MAX_BED_FIELDS: int = 12 @@ -183,6 +182,26 @@ def bed_fields(self) -> List[str]: ), ] + @property + def reference_name(self) -> str: + """A reference sequence name.""" + return self.chrom + + @property + def zero_based_start(self) -> int: + """A 0-based start position.""" + return self.start + + @property + def zero_based_open_end(self) -> int: + """A 0-based open-ended position.""" + return self.end + + @property + def is_negative(self) -> bool: + """True if the interval is on the negative strand, False otherwise""" + return self.strand is not None and self.strand == BedStrand.Positive + def as_bed_line(self, number_of_output_fields: Optional[int] = None) -> str: """ Converts a BED record to a tab delimited BED line equivalent, including up to the number of diff --git a/pybedlite/overlap_detector.py b/pybedlite/overlap_detector.py index 25e549f..7a5ed19 100644 --- a/pybedlite/overlap_detector.py +++ b/pybedlite/overlap_detector.py @@ -2,10 +2,29 @@ Utility Classes for Querying Overlaps with Genomic Regions ---------------------------------------------------------- + +The :class:`~pybedlite.overlap_detector.OverlapDetector` class detects and returns overlaps between +a set of genomic regions and another genomic region. The overlap detector may contain any +interval-like Python objects that have the following properties: + + * `reference_name` (str): The reference sequence name + * `zero_based_start` (int): A 0-based start position + * `zero_based_end` (int): A 0-based exclusive end position + +This is encapsulated in the :class:`~pybedlite.overlap_detector.GenomicSpan` protocol. + +Interval-like Python objects may also contain strandedness information which will be used +for sorting them in :func:`~pybedlite.overlap_detector.OverlapDetector.get_overlaps` using +the following property if it is present, otherwise assumed to be positive stranded: + + * `is_negative (bool)`: Whether the feature is negative stranded or not + +This is encapsulated in the :class:`~pybedlite.overlap_detector.StrandedGenomicSpan` protocol. + Examples of Detecting Overlaps ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. code-block:: python +. code-block:: python >>> from pybedlite.overlap_detector import Interval, OverlapDetector >>> detector = OverlapDetector() @@ -61,50 +80,29 @@ from pybedlite.bed_source import BedSource -class _Span(Protocol): - """A span with a start and an end. 0-based open-ended.""" - - @property - def start(self) -> int: - """A 0-based start position.""" - - @property - def end(self) -> int: - """A 0-based open-ended position.""" - - -class _GenomicSpanWithChrom(_Span, Hashable, Protocol): - """A genomic feature where reference sequence is accessed with `chrom`.""" +class GenomicSpan(Protocol): + """ + A genomic span which has protected methods that must be implemented by all subclasses to give + a zero-based open-ended genomic span. + """ @property - def chrom(self) -> str: + def reference_name(self) -> str: """A reference sequence name.""" - -class _GenomicSpanWithContig(_Span, Hashable, Protocol): - """A genomic feature where reference sequence is accessed with `contig`.""" - @property - def contig(self) -> str: - """A reference sequence name.""" - - -class _GenomicSpanWithRefName(_Span, Hashable, Protocol): - """A genomic feature where reference sequence is accessed with `refname`.""" + def zero_based_start(self) -> int: + """A 0-based start position.""" @property - def refname(self) -> str: - """A reference sequence name.""" + def zero_based_open_end(self) -> int: + """A 0-based open-ended position.""" -GenomicSpan = TypeVar( - "GenomicSpan", - bound=Union[_GenomicSpanWithChrom, _GenomicSpanWithContig, _GenomicSpanWithRefName], -) -""" -A genomic feature where the reference sequence name is accessed with any of the 3 most common -property names ("chrom", "contig", "refname"). -""" +class StrandedGenomicSpan(GenomicSpan, Protocol): + @property + def is_negative(self) -> bool: + """True if the interval is on the negative strand, False otherwise""" @attr.s(frozen=True, auto_attribs=True) @@ -176,35 +174,35 @@ def from_bedrecord(cls: Type["Interval"], record: BedRecord) -> "Interval": name=record.name, ) + @property + def reference_name(self) -> str: + return self.refname -_GenericGenomicSpan = TypeVar( - "_GenericGenomicSpan", - bound=Union[_GenomicSpanWithChrom, _GenomicSpanWithContig, _GenomicSpanWithRefName], -) -""" -A generic genomic feature where the reference sequence name is accessed with any of the 3 most -common property names ("chrom", "contig", "refname"). This type variable is used for describing the -generic type contained within the :class:`~pybedlite.overlap_detector.OverlapDetector`. -""" + @property + def zero_based_start(self) -> int: + """A 0-based start position.""" + return self.start + @property + def zero_based_open_end(self) -> int: + """A 0-based open-ended position.""" + return self.end -class OverlapDetector(Generic[_GenericGenomicSpan], Iterable[_GenericGenomicSpan]): - """Detects and returns overlaps between a set of genomic regions and another genomic region. + @property + def is_negative(self) -> bool: + """True if the interval is on the negative strand, False otherwise""" + return self.negative - The overlap detector may contain any interval-like Python objects that have the following - properties: - * `chrom` or `contig` or `refname`: The reference sequence name - * `start`: A 0-based start position - * `end`: A 0-based exclusive end position +GenericGenomicsSpan = TypeVar("GenericGenomicsSpan", bound=Union[GenomicSpan, StrandedGenomicSpan]) +""" +A generic genomic feature. This type variable is used for describing the +generic type contained within the :class:`~pybedlite.overlap_detector.OverlapDetector`. +""" - Interval-like Python objects may also contain strandedness information which will be used - for sorting them in :func:`~pybedlite.overlap_detector.OverlapDetector.get_overlaps` using - either of the following properties if they are present: - * `negative (bool)`: Whether or not the feature is negative stranded or not - * `strand (BedStrand)`: The BED strand of the feature - * `strand (str)`: The strand of the feature (`"-"` for negative) +class OverlapDetector(Generic[GenericGenomicsSpan], Iterable[GenericGenomicsSpan]): + """Detects and returns overlaps between a set of genomic regions and another genomic region. The same interval may be added multiple times, but only a single instance will be returned when querying for overlaps. @@ -212,50 +210,19 @@ class OverlapDetector(Generic[_GenericGenomicSpan], Iterable[_GenericGenomicSpan This detector is the most efficient when all intervals are added ahead of time. """ - def __init__(self, intervals: Optional[Iterable[_GenericGenomicSpan]] = None) -> None: + def __init__(self, intervals: Optional[Iterable[GenericGenomicsSpan]] = None) -> None: # A mapping from the contig/chromosome name to the associated interval tree self._refname_to_tree: Dict[str, cr.cgranges] = {} # type: ignore self._refname_to_indexed: Dict[str, bool] = {} - self._refname_to_intervals: Dict[str, List[_GenericGenomicSpan]] = {} + self._refname_to_intervals: Dict[str, List[GenericGenomicsSpan]] = {} if intervals is not None: self.add_all(intervals) - def __iter__(self) -> Iterator[_GenericGenomicSpan]: + def __iter__(self) -> Iterator[GenericGenomicsSpan]: """Iterates over the intervals in the overlap detector.""" return itertools.chain(*self._refname_to_intervals.values()) - @staticmethod - def _reference_sequence_name(interval: GenomicSpan) -> str: - """Return the reference name of a given interval.""" - if isinstance(interval, Interval) or hasattr(interval, "refname"): - return interval.refname - elif isinstance(interval, BedRecord) or hasattr(interval, "chrom"): - return interval.chrom - elif hasattr(interval, "contig"): - return interval.contig - else: - raise ValueError( - f"Genomic span is missing a reference sequence name property: {interval}" - ) - - @staticmethod - def _is_negative(interval: GenomicSpan) -> bool: - """Determine if this is a negative stranded interval or not.""" - return ( - (hasattr(interval, "negative") and interval.negative) - or ( - hasattr(interval, "strand") - and isinstance(interval.strand, BedStrand) - and interval.strand is BedStrand.Negative - ) - or ( - hasattr(interval, "strand") - and isinstance(interval.strand, str) - and interval.strand == "-" - ) - ) - - def add(self, interval: _GenericGenomicSpan) -> None: + def add(self, interval: GenericGenomicsSpan) -> None: """Adds an interval to this detector. Args: @@ -264,7 +231,7 @@ def add(self, interval: _GenericGenomicSpan) -> None: if not isinstance(interval, Hashable): raise ValueError(f"Interval feature is not hashable but should be: {interval}") - refname = self._reference_sequence_name(interval) + refname = interval.reference_name if refname not in self._refname_to_tree: self._refname_to_tree[refname] = cr.cgranges() # type: ignore self._refname_to_indexed[refname] = False @@ -277,13 +244,13 @@ def add(self, interval: _GenericGenomicSpan) -> None: # Add the interval to the tree tree = self._refname_to_tree[refname] - tree.add(refname, interval.start, interval.end, interval_idx) + tree.add(refname, interval.zero_based_start, interval.zero_based_open_end, interval_idx) # Flag this tree as needing to be indexed after adding a new interval, but defer # indexing self._refname_to_indexed[refname] = False - def add_all(self, intervals: Iterable[_GenericGenomicSpan]) -> None: + def add_all(self, intervals: Iterable[GenericGenomicsSpan]) -> None: """Adds one or more intervals to this detector. Args: @@ -302,7 +269,7 @@ def overlaps_any(self, interval: GenomicSpan) -> bool: True if and only if the given interval overlaps with any interval in this detector. """ - refname = self._reference_sequence_name(interval) + refname = interval.reference_name tree = self._refname_to_tree.get(refname) if tree is None: return False @@ -310,13 +277,19 @@ def overlaps_any(self, interval: GenomicSpan) -> bool: if not self._refname_to_indexed[refname]: tree.index() try: - next(iter(tree.overlap(refname, interval.start, interval.end))) + next( + iter( + tree.overlap( + refname, interval.zero_based_start, interval.zero_based_open_end + ) + ) + ) except StopIteration: return False else: return True - def get_overlaps(self, interval: GenomicSpan) -> List[_GenericGenomicSpan]: + def get_overlaps(self, interval: GenomicSpan) -> List[GenericGenomicsSpan]: """Returns any intervals in this detector that overlap the given interval. Args: @@ -326,30 +299,36 @@ def get_overlaps(self, interval: GenomicSpan) -> List[_GenericGenomicSpan]: The list of intervals in this detector that overlap the given interval, or the empty list if no overlaps exist. The intervals will be return in ascending genomic order. """ - refname = self._reference_sequence_name(interval) + refname = interval.reference_name tree = self._refname_to_tree.get(refname) if tree is None: return [] else: if not self._refname_to_indexed[refname]: tree.index() - ref_intervals: List[_GenericGenomicSpan] = self._refname_to_intervals[refname] + ref_intervals: List[GenericGenomicsSpan] = self._refname_to_intervals[refname] # NB: only return unique instances of intervals - intervals: Set[_GenericGenomicSpan] = { + intervals: Set[GenericGenomicsSpan] = { ref_intervals[index] - for _, _, index in tree.overlap(refname, interval.start, interval.end) + for _, _, index in tree.overlap( + refname, interval.zero_based_start, interval.zero_based_open_end + ) } return sorted( intervals, key=lambda intv: ( - intv.start, - intv.end, + intv.zero_based_start, + intv.zero_based_open_end, self._is_negative(intv), - self._reference_sequence_name(intv), + intv.reference_name, ), ) - def get_enclosing_intervals(self, interval: GenomicSpan) -> List[_GenericGenomicSpan]: + @staticmethod + def _is_negative(interval: GenomicSpan) -> bool: + return getattr(interval, "is_negative", False) + + def get_enclosing_intervals(self, interval: GenomicSpan) -> List[GenericGenomicsSpan]: """Returns the set of intervals in this detector that wholly enclose the query interval. i.e. query.start >= target.start and query.end <= target.end. @@ -360,9 +339,14 @@ def get_enclosing_intervals(self, interval: GenomicSpan) -> List[_GenericGenomic The intervals will be returned in ascending genomic order. """ results = self.get_overlaps(interval) - return [i for i in results if interval.start >= i.start and interval.end <= i.end] - - def get_enclosed(self, interval: GenomicSpan) -> List[_GenericGenomicSpan]: + return [ + i + for i in results + if interval.zero_based_start >= i.zero_based_start + and interval.zero_based_open_end <= i.zero_based_open_end + ] + + def get_enclosed(self, interval: GenomicSpan) -> List[GenericGenomicsSpan]: """Returns the set of intervals in this detector that are enclosed by the query interval. I.e. target.start >= query.start and target.end <= query.end. @@ -374,7 +358,12 @@ def get_enclosed(self, interval: GenomicSpan) -> List[_GenericGenomicSpan]: The intervals will be return in ascending genomic order. """ results = self.get_overlaps(interval) - return [i for i in results if i.start >= interval.start and i.end <= interval.end] + return [ + i + for i in results + if i.zero_based_start >= interval.zero_based_start + and i.zero_based_open_end <= interval.zero_based_open_end + ] @classmethod def from_bed(cls, path: Path) -> "OverlapDetector": diff --git a/pybedlite/tests/test_overlap_detector.py b/pybedlite/tests/test_overlap_detector.py index 2fcc526..76d35f2 100644 --- a/pybedlite/tests/test_overlap_detector.py +++ b/pybedlite/tests/test_overlap_detector.py @@ -201,41 +201,110 @@ def test_arbitrary_interval_types() -> None: """ @dataclass(eq=True, frozen=True) - class ChromFeature: - chrom: str - start: int - end: int + class ZeroBasedOpenEndedProtocol: + reference_name: str + zero_based_start: int + zero_based_open_end: int + + @property + def is_negative(self) -> bool: + return False @dataclass(eq=True, frozen=True) - class ContigFeature: + class OneBasedProtocol: contig: str start: int end: int + @property + def reference_name(self) -> str: + return self.contig + + @property + def zero_based_start(self) -> int: + """A 0-based start position.""" + return self.start - 1 + + @property + def zero_based_open_end(self) -> int: + """A 0-based open-ended position.""" + return self.end + + @property + def is_negative(self) -> bool: + """True if the interval is on the negative strand, False otherwise""" + return False + @dataclass(eq=True, frozen=True) - class RefnameFeature: - refname: str - start: int - end: int + class ZeroBasedUnstranded: + reference_name: str + zero_based_start: int + zero_based_open_end: int - # Create minimal features of all supported structural types - chrom_feature = ChromFeature(chrom="chr1", start=0, end=30) - contig_feature = ContigFeature(contig="chr1", start=10, end=40) - refname_feature = RefnameFeature(refname="chr1", start=20, end=50) + @dataclass(eq=True, frozen=True) + class ZeroBasedStranded: + reference_name: str + zero_based_start: int + zero_based_open_end: int + is_negative: bool - # Setup an overlap detector to hold all the features we care about - AllKinds: TypeAlias = Union[ChromFeature, ContigFeature, RefnameFeature] - features: List[AllKinds] = [chrom_feature, contig_feature, refname_feature] + # Create minimal features of all supported structural types + zero_based_protocol = ZeroBasedOpenEndedProtocol( + reference_name="chr1", zero_based_start=1, zero_based_open_end=50 + ) + one_based_protocol = OneBasedProtocol(contig="chr1", start=10, end=60) + zero_based_unstranded = ZeroBasedUnstranded( + reference_name="chr1", zero_based_start=20, zero_based_open_end=70 + ) + zero_based_stranded = ZeroBasedStranded( + reference_name="chr1", zero_based_start=30, zero_based_open_end=80, is_negative=True + ) + # Set up an overlap detector to hold all the features we care about + AllKinds: TypeAlias = Union[ + ZeroBasedOpenEndedProtocol, OneBasedProtocol, ZeroBasedUnstranded, ZeroBasedStranded + ] + features: List[AllKinds] = [ + zero_based_protocol, + one_based_protocol, + zero_based_unstranded, + zero_based_stranded, + ] detector: OverlapDetector[AllKinds] = OverlapDetector(features) + assert OverlapDetector._is_negative(zero_based_protocol) is False + assert OverlapDetector._is_negative(one_based_protocol) is False + assert OverlapDetector._is_negative(zero_based_unstranded) is False + assert OverlapDetector._is_negative(zero_based_stranded) is True + # Query the overlap detector with yet another type - assert detector.get_overlaps(Interval("chr1", 0, 1)) == [chrom_feature] - assert detector.get_overlaps(Interval("chr1", 25, 26)) == [ - chrom_feature, - contig_feature, - refname_feature, + assert detector.get_overlaps(Interval("chr1", 0, 1)) == [] + assert detector.get_overlaps(Interval("chr1", 0, 9)) == [zero_based_protocol] + assert detector.get_overlaps(Interval("chr1", 11, 12)) == [ + zero_based_protocol, + one_based_protocol, ] - assert detector.get_overlaps(Interval("chr1", 45, 46)) == [refname_feature] + assert detector.get_overlaps(Interval("chr1", 21, 27)) == [ + zero_based_protocol, + one_based_protocol, + zero_based_unstranded, + ] + assert detector.get_overlaps(Interval("chr1", 32, 35)) == [ + zero_based_protocol, + one_based_protocol, + zero_based_unstranded, + zero_based_stranded, + ] + assert detector.get_overlaps(Interval("chr1", 54, 55)) == [ + one_based_protocol, + zero_based_unstranded, + zero_based_stranded, + ] + assert detector.get_overlaps(Interval("chr1", 61, 62)) == [ + zero_based_unstranded, + zero_based_stranded, + ] + assert detector.get_overlaps(Interval("chr1", 78, 79)) == [zero_based_stranded] + assert detector.get_overlaps(Interval("chr1", 80, 81)) == [] def test_the_overlap_detector_wont_accept_a_non_hashable_feature() -> None: @@ -245,9 +314,11 @@ def test_the_overlap_detector_wont_accept_a_non_hashable_feature() -> None: @dataclass # A dataclass missing both `eq` and `frozen` does not implement __hash__. class ChromFeature: - chrom: str - start: int - end: int + reference_name: str + zero_based_start: int + zero_based_open_end: int with pytest.raises(ValueError): - OverlapDetector([ChromFeature(chrom="chr1", start=0, end=30)]) + OverlapDetector( + [ChromFeature(reference_name="chr1", zero_based_start=0, zero_based_open_end=30)] + )