Skip to content

Commit

Permalink
Keep OverlapDetector generic but normalize the getters for BedRecord …
Browse files Browse the repository at this point in the history
…and Interval (#36)
  • Loading branch information
nh13 authored Aug 1, 2024
1 parent 2d71949 commit 13b7834
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 113 deletions.
11 changes: 10 additions & 1 deletion pybedlite/bed_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
if TYPE_CHECKING:
from pybedlite.overlap_detector import Interval


"""Maximum BED fields that can be present in a well formed BED file written to specification"""
MAX_BED_FIELDS: int = 12

Expand Down Expand Up @@ -183,6 +182,16 @@ def bed_fields(self) -> List[str]:
),
]

@property
def refname(self) -> str:
"""A reference sequence name."""
return self.chrom

Check warning on line 188 in pybedlite/bed_record.py

View check run for this annotation

Codecov / codecov/patch

pybedlite/bed_record.py#L188

Added line #L188 was not covered by tests

@property
def negative(self) -> bool:
"""True if the interval is on the negative strand, False otherwise"""
return self.strand is BedStrand.Negative

Check warning on line 193 in pybedlite/bed_record.py

View check run for this annotation

Codecov / codecov/patch

pybedlite/bed_record.py#L193

Added line #L193 was not covered by tests

def as_bed_line(self, number_of_output_fields: Optional[int] = None) -> str:
"""
Converts a BED record to a tab delimited BED line equivalent, including up to the number of
Expand Down
145 changes: 55 additions & 90 deletions pybedlite/overlap_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,29 @@
Utility Classes for Querying Overlaps with Genomic Regions
----------------------------------------------------------
The :class:`~pybedlite.overlap_detector.OverlapDetector` class detects and returns overlaps between
a set of genomic regions and another genomic region. The overlap detector may contain any
interval-like Python objects that have the following properties:
* `refname` (str): The reference sequence name
* `start` (int): A 0-based start position
* `end` (int): A 0-based exclusive end position
This is encapsulated in the :class:`~pybedlite.overlap_detector.GenomicSpan` protocol.
Interval-like Python objects may also contain strandedness information which will be used
for sorting them in :func:`~pybedlite.overlap_detector.OverlapDetector.get_overlaps` using
the following property if it is present, otherwise assumed to be positive stranded:
* `negative (bool)`: Whether the feature is negative stranded or not
This is encapsulated in the :class:`~pybedlite.overlap_detector.StrandedGenomicSpan` protocol.
Examples of Detecting Overlaps
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: python
. code-block:: python
>>> from pybedlite.overlap_detector import Interval, OverlapDetector
>>> detector = OverlapDetector()
Expand Down Expand Up @@ -61,8 +80,15 @@
from pybedlite.bed_source import BedSource


class _Span(Protocol):
"""A span with a start and an end. 0-based open-ended."""
class GenomicSpan(Protocol):
"""
A genomic span which has protected methods that must be implemented by all subclasses to give
a zero-based open-ended genomic span.
"""

@property
def refname(self) -> str:
"""A reference sequence name."""

@property
def start(self) -> int:
Expand All @@ -73,38 +99,10 @@ def end(self) -> int:
"""A 0-based open-ended position."""


class _GenomicSpanWithChrom(_Span, Hashable, Protocol):
"""A genomic feature where reference sequence is accessed with `chrom`."""

class StrandedGenomicSpan(GenomicSpan, Protocol):
@property
def chrom(self) -> str:
"""A reference sequence name."""


class _GenomicSpanWithContig(_Span, Hashable, Protocol):
"""A genomic feature where reference sequence is accessed with `contig`."""

@property
def contig(self) -> str:
"""A reference sequence name."""


class _GenomicSpanWithRefName(_Span, Hashable, Protocol):
"""A genomic feature where reference sequence is accessed with `refname`."""

@property
def refname(self) -> str:
"""A reference sequence name."""


GenomicSpan = TypeVar(
"GenomicSpan",
bound=Union[_GenomicSpanWithChrom, _GenomicSpanWithContig, _GenomicSpanWithRefName],
)
"""
A genomic feature where the reference sequence name is accessed with any of the 3 most common
property names ("chrom", "contig", "refname").
"""
def negative(self) -> bool:
"""True if the interval is on the negative strand, False otherwise"""


@attr.s(frozen=True, auto_attribs=True)
Expand Down Expand Up @@ -177,31 +175,25 @@ def from_bedrecord(cls: Type["Interval"], record: BedRecord) -> "Interval":
)


_GenericGenomicSpan = TypeVar(
"_GenericGenomicSpan",
bound=Union[_GenomicSpanWithChrom, _GenomicSpanWithContig, _GenomicSpanWithRefName],
)
GenericGenomicSpan = TypeVar("GenericGenomicSpan", bound=Union[GenomicSpan, StrandedGenomicSpan])
"""
A generic genomic feature where the reference sequence name is accessed with any of the 3 most
common property names ("chrom", "contig", "refname"). This type variable is used for describing the
A generic genomic feature. This type variable is used for describing the
generic type contained within the :class:`~pybedlite.overlap_detector.OverlapDetector`.
"""


class OverlapDetector(Generic[_GenericGenomicSpan], Iterable[_GenericGenomicSpan]):
class OverlapDetector(Generic[GenericGenomicSpan], Iterable[GenericGenomicSpan]):
"""Detects and returns overlaps between a set of genomic regions and another genomic region.
The overlap detector may contain any interval-like Python objects that have the following
properties:
* `chrom` or `contig` or `refname`: The reference sequence name
* `start`: A 0-based start position
* `end`: A 0-based exclusive end position
Interval-like Python objects may also contain strandedness information which will be used
for sorting them in :func:`~pybedlite.overlap_detector.OverlapDetector.get_overlaps` using
either of the following properties if they are present:
* `negative (bool)`: Whether or not the feature is negative stranded or not
* `strand (BedStrand)`: The BED strand of the feature
* `strand (str)`: The strand of the feature (`"-"` for negative)
Expand All @@ -212,50 +204,19 @@ class OverlapDetector(Generic[_GenericGenomicSpan], Iterable[_GenericGenomicSpan
This detector is the most efficient when all intervals are added ahead of time.
"""

def __init__(self, intervals: Optional[Iterable[_GenericGenomicSpan]] = None) -> None:
def __init__(self, intervals: Optional[Iterable[GenericGenomicSpan]] = None) -> None:
# A mapping from the contig/chromosome name to the associated interval tree
self._refname_to_tree: Dict[str, cr.cgranges] = {} # type: ignore
self._refname_to_indexed: Dict[str, bool] = {}
self._refname_to_intervals: Dict[str, List[_GenericGenomicSpan]] = {}
self._refname_to_intervals: Dict[str, List[GenericGenomicSpan]] = {}
if intervals is not None:
self.add_all(intervals)

def __iter__(self) -> Iterator[_GenericGenomicSpan]:
def __iter__(self) -> Iterator[GenericGenomicSpan]:
"""Iterates over the intervals in the overlap detector."""
return itertools.chain(*self._refname_to_intervals.values())

@staticmethod
def _reference_sequence_name(interval: GenomicSpan) -> str:
"""Return the reference name of a given interval."""
if isinstance(interval, Interval) or hasattr(interval, "refname"):
return interval.refname
elif isinstance(interval, BedRecord) or hasattr(interval, "chrom"):
return interval.chrom
elif hasattr(interval, "contig"):
return interval.contig
else:
raise ValueError(
f"Genomic span is missing a reference sequence name property: {interval}"
)

@staticmethod
def _is_negative(interval: GenomicSpan) -> bool:
"""Determine if this is a negative stranded interval or not."""
return (
(hasattr(interval, "negative") and interval.negative)
or (
hasattr(interval, "strand")
and isinstance(interval.strand, BedStrand)
and interval.strand is BedStrand.Negative
)
or (
hasattr(interval, "strand")
and isinstance(interval.strand, str)
and interval.strand == "-"
)
)

def add(self, interval: _GenericGenomicSpan) -> None:
def add(self, interval: GenericGenomicSpan) -> None:
"""Adds an interval to this detector.
Args:
Expand All @@ -264,7 +225,7 @@ def add(self, interval: _GenericGenomicSpan) -> None:
if not isinstance(interval, Hashable):
raise ValueError(f"Interval feature is not hashable but should be: {interval}")

refname = self._reference_sequence_name(interval)
refname = interval.refname
if refname not in self._refname_to_tree:
self._refname_to_tree[refname] = cr.cgranges() # type: ignore
self._refname_to_indexed[refname] = False
Expand All @@ -283,7 +244,7 @@ def add(self, interval: _GenericGenomicSpan) -> None:
# indexing
self._refname_to_indexed[refname] = False

def add_all(self, intervals: Iterable[_GenericGenomicSpan]) -> None:
def add_all(self, intervals: Iterable[GenericGenomicSpan]) -> None:
"""Adds one or more intervals to this detector.
Args:
Expand All @@ -302,7 +263,7 @@ def overlaps_any(self, interval: GenomicSpan) -> bool:
True if and only if the given interval overlaps with any interval in this
detector.
"""
refname = self._reference_sequence_name(interval)
refname = interval.refname
tree = self._refname_to_tree.get(refname)
if tree is None:
return False
Expand All @@ -316,7 +277,7 @@ def overlaps_any(self, interval: GenomicSpan) -> bool:
else:
return True

def get_overlaps(self, interval: GenomicSpan) -> List[_GenericGenomicSpan]:
def get_overlaps(self, interval: GenomicSpan) -> List[GenericGenomicSpan]:
"""Returns any intervals in this detector that overlap the given interval.
Args:
Expand All @@ -326,16 +287,16 @@ def get_overlaps(self, interval: GenomicSpan) -> List[_GenericGenomicSpan]:
The list of intervals in this detector that overlap the given interval, or the empty
list if no overlaps exist. The intervals will be return in ascending genomic order.
"""
refname = self._reference_sequence_name(interval)
refname = interval.refname
tree = self._refname_to_tree.get(refname)
if tree is None:
return []
else:
if not self._refname_to_indexed[refname]:
tree.index()
ref_intervals: List[_GenericGenomicSpan] = self._refname_to_intervals[refname]
ref_intervals: List[GenericGenomicSpan] = self._refname_to_intervals[refname]
# NB: only return unique instances of intervals
intervals: Set[_GenericGenomicSpan] = {
intervals: Set[GenericGenomicSpan] = {
ref_intervals[index]
for _, _, index in tree.overlap(refname, interval.start, interval.end)
}
Expand All @@ -344,14 +305,18 @@ def get_overlaps(self, interval: GenomicSpan) -> List[_GenericGenomicSpan]:
key=lambda intv: (
intv.start,
intv.end,
self._is_negative(intv),
self._reference_sequence_name(intv),
self._negative(intv),
intv.refname,
),
)

def get_enclosing_intervals(self, interval: GenomicSpan) -> List[_GenericGenomicSpan]:
@staticmethod
def _negative(interval: GenomicSpan) -> bool:
return getattr(interval, "negative", False)

def get_enclosing_intervals(self, interval: GenomicSpan) -> List[GenericGenomicSpan]:
"""Returns the set of intervals in this detector that wholly enclose the query interval.
i.e. query.start >= target.start and query.end <= target.end.
i.e. `query.start >= target.start` and `query.end <= target.end`.
Args:
interval: the query interval
Expand All @@ -362,7 +327,7 @@ def get_enclosing_intervals(self, interval: GenomicSpan) -> List[_GenericGenomic
results = self.get_overlaps(interval)
return [i for i in results if interval.start >= i.start and interval.end <= i.end]

def get_enclosed(self, interval: GenomicSpan) -> List[_GenericGenomicSpan]:
def get_enclosed(self, interval: GenomicSpan) -> List[GenericGenomicSpan]:
"""Returns the set of intervals in this detector that are enclosed by the query
interval. I.e. target.start >= query.start and target.end <= query.end.
Expand Down
Loading

0 comments on commit 13b7834

Please sign in to comment.