Skip to content

Commit

Permalink
alternate
Browse files Browse the repository at this point in the history
  • Loading branch information
nh13 committed Jul 17, 2024
1 parent 2d71949 commit 878aa6f
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 138 deletions.
21 changes: 20 additions & 1 deletion pybedlite/bed_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
if TYPE_CHECKING:
from pybedlite.overlap_detector import Interval


"""Maximum BED fields that can be present in a well formed BED file written to specification"""
MAX_BED_FIELDS: int = 12

Expand Down Expand Up @@ -183,6 +182,26 @@ def bed_fields(self) -> List[str]:
),
]

@property
def reference_name(self) -> str:
"""A reference sequence name."""
return self.chrom

Check warning on line 188 in pybedlite/bed_record.py

View check run for this annotation

Codecov / codecov/patch

pybedlite/bed_record.py#L188

Added line #L188 was not covered by tests

@property
def zero_based_start(self) -> int:
"""A 0-based start position."""
return self.start

Check warning on line 193 in pybedlite/bed_record.py

View check run for this annotation

Codecov / codecov/patch

pybedlite/bed_record.py#L193

Added line #L193 was not covered by tests

@property
def zero_based_open_end(self) -> int:
"""A 0-based open-ended position."""
return self.end

Check warning on line 198 in pybedlite/bed_record.py

View check run for this annotation

Codecov / codecov/patch

pybedlite/bed_record.py#L198

Added line #L198 was not covered by tests

@property
def is_negative(self) -> bool:
"""True if the interval is on the negative strand, False otherwise"""
return self.strand is not None and self.strand == BedStrand.Positive

Check warning on line 203 in pybedlite/bed_record.py

View check run for this annotation

Codecov / codecov/patch

pybedlite/bed_record.py#L203

Added line #L203 was not covered by tests

def as_bed_line(self, number_of_output_fields: Optional[int] = None) -> str:
"""
Converts a BED record to a tab delimited BED line equivalent, including up to the number of
Expand Down
211 changes: 100 additions & 111 deletions pybedlite/overlap_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,29 @@
Utility Classes for Querying Overlaps with Genomic Regions
----------------------------------------------------------
The :class:`~pybedlite.overlap_detector.OverlapDetector` class detects and returns overlaps between
a set of genomic regions and another genomic region. The overlap detector may contain any
interval-like Python objects that have the following properties:
* `reference_name` (str): The reference sequence name
* `zero_based_start` (int): A 0-based start position
* `zero_based_end` (int): A 0-based exclusive end position
This is encapsulated in the :class:`~pybedlite.overlap_detector.GenomicSpan` protocol.
Interval-like Python objects may also contain strandedness information which will be used
for sorting them in :func:`~pybedlite.overlap_detector.OverlapDetector.get_overlaps` using
the following property if it is present, otherwise assumed to be positive stranded:
* `is_negative (bool)`: Whether the feature is negative stranded or not
This is encapsulated in the :class:`~pybedlite.overlap_detector.StrandedGenomicSpan` protocol.
Examples of Detecting Overlaps
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: python
. code-block:: python
>>> from pybedlite.overlap_detector import Interval, OverlapDetector
>>> detector = OverlapDetector()
Expand Down Expand Up @@ -61,50 +80,29 @@
from pybedlite.bed_source import BedSource


class _Span(Protocol):
"""A span with a start and an end. 0-based open-ended."""

@property
def start(self) -> int:
"""A 0-based start position."""

@property
def end(self) -> int:
"""A 0-based open-ended position."""


class _GenomicSpanWithChrom(_Span, Hashable, Protocol):
"""A genomic feature where reference sequence is accessed with `chrom`."""
class GenomicSpan(Protocol):
"""
A genomic span which has protected methods that must be implemented by all subclasses to give
a zero-based open-ended genomic span.
"""

@property
def chrom(self) -> str:
def reference_name(self) -> str:
"""A reference sequence name."""


class _GenomicSpanWithContig(_Span, Hashable, Protocol):
"""A genomic feature where reference sequence is accessed with `contig`."""

@property
def contig(self) -> str:
"""A reference sequence name."""


class _GenomicSpanWithRefName(_Span, Hashable, Protocol):
"""A genomic feature where reference sequence is accessed with `refname`."""
def zero_based_start(self) -> int:
"""A 0-based start position."""

@property
def refname(self) -> str:
"""A reference sequence name."""
def zero_based_open_end(self) -> int:
"""A 0-based open-ended position."""


GenomicSpan = TypeVar(
"GenomicSpan",
bound=Union[_GenomicSpanWithChrom, _GenomicSpanWithContig, _GenomicSpanWithRefName],
)
"""
A genomic feature where the reference sequence name is accessed with any of the 3 most common
property names ("chrom", "contig", "refname").
"""
class StrandedGenomicSpan(GenomicSpan, Protocol):
@property
def is_negative(self) -> bool:
"""True if the interval is on the negative strand, False otherwise"""


@attr.s(frozen=True, auto_attribs=True)
Expand Down Expand Up @@ -176,86 +174,55 @@ def from_bedrecord(cls: Type["Interval"], record: BedRecord) -> "Interval":
name=record.name,
)

@property
def reference_name(self) -> str:
return self.refname

_GenericGenomicSpan = TypeVar(
"_GenericGenomicSpan",
bound=Union[_GenomicSpanWithChrom, _GenomicSpanWithContig, _GenomicSpanWithRefName],
)
"""
A generic genomic feature where the reference sequence name is accessed with any of the 3 most
common property names ("chrom", "contig", "refname"). This type variable is used for describing the
generic type contained within the :class:`~pybedlite.overlap_detector.OverlapDetector`.
"""
@property
def zero_based_start(self) -> int:
"""A 0-based start position."""
return self.start

@property
def zero_based_open_end(self) -> int:
"""A 0-based open-ended position."""
return self.end

class OverlapDetector(Generic[_GenericGenomicSpan], Iterable[_GenericGenomicSpan]):
"""Detects and returns overlaps between a set of genomic regions and another genomic region.
@property
def is_negative(self) -> bool:
"""True if the interval is on the negative strand, False otherwise"""
return self.negative

The overlap detector may contain any interval-like Python objects that have the following
properties:

* `chrom` or `contig` or `refname`: The reference sequence name
* `start`: A 0-based start position
* `end`: A 0-based exclusive end position
GenericGenomicsSpan = TypeVar("GenericGenomicsSpan", bound=Union[GenomicSpan, StrandedGenomicSpan])
"""
A generic genomic feature. This type variable is used for describing the
generic type contained within the :class:`~pybedlite.overlap_detector.OverlapDetector`.
"""

Interval-like Python objects may also contain strandedness information which will be used
for sorting them in :func:`~pybedlite.overlap_detector.OverlapDetector.get_overlaps` using
either of the following properties if they are present:

* `negative (bool)`: Whether or not the feature is negative stranded or not
* `strand (BedStrand)`: The BED strand of the feature
* `strand (str)`: The strand of the feature (`"-"` for negative)
class OverlapDetector(Generic[GenericGenomicsSpan], Iterable[GenericGenomicsSpan]):
"""Detects and returns overlaps between a set of genomic regions and another genomic region.
The same interval may be added multiple times, but only a single instance will be returned
when querying for overlaps.
This detector is the most efficient when all intervals are added ahead of time.
"""

def __init__(self, intervals: Optional[Iterable[_GenericGenomicSpan]] = None) -> None:
def __init__(self, intervals: Optional[Iterable[GenericGenomicsSpan]] = None) -> None:
# A mapping from the contig/chromosome name to the associated interval tree
self._refname_to_tree: Dict[str, cr.cgranges] = {} # type: ignore
self._refname_to_indexed: Dict[str, bool] = {}
self._refname_to_intervals: Dict[str, List[_GenericGenomicSpan]] = {}
self._refname_to_intervals: Dict[str, List[GenericGenomicsSpan]] = {}
if intervals is not None:
self.add_all(intervals)

def __iter__(self) -> Iterator[_GenericGenomicSpan]:
def __iter__(self) -> Iterator[GenericGenomicsSpan]:
"""Iterates over the intervals in the overlap detector."""
return itertools.chain(*self._refname_to_intervals.values())

@staticmethod
def _reference_sequence_name(interval: GenomicSpan) -> str:
"""Return the reference name of a given interval."""
if isinstance(interval, Interval) or hasattr(interval, "refname"):
return interval.refname
elif isinstance(interval, BedRecord) or hasattr(interval, "chrom"):
return interval.chrom
elif hasattr(interval, "contig"):
return interval.contig
else:
raise ValueError(
f"Genomic span is missing a reference sequence name property: {interval}"
)

@staticmethod
def _is_negative(interval: GenomicSpan) -> bool:
"""Determine if this is a negative stranded interval or not."""
return (
(hasattr(interval, "negative") and interval.negative)
or (
hasattr(interval, "strand")
and isinstance(interval.strand, BedStrand)
and interval.strand is BedStrand.Negative
)
or (
hasattr(interval, "strand")
and isinstance(interval.strand, str)
and interval.strand == "-"
)
)

def add(self, interval: _GenericGenomicSpan) -> None:
def add(self, interval: GenericGenomicsSpan) -> None:
"""Adds an interval to this detector.
Args:
Expand All @@ -264,7 +231,7 @@ def add(self, interval: _GenericGenomicSpan) -> None:
if not isinstance(interval, Hashable):
raise ValueError(f"Interval feature is not hashable but should be: {interval}")

refname = self._reference_sequence_name(interval)
refname = interval.reference_name
if refname not in self._refname_to_tree:
self._refname_to_tree[refname] = cr.cgranges() # type: ignore
self._refname_to_indexed[refname] = False
Expand All @@ -277,13 +244,13 @@ def add(self, interval: _GenericGenomicSpan) -> None:

# Add the interval to the tree
tree = self._refname_to_tree[refname]
tree.add(refname, interval.start, interval.end, interval_idx)
tree.add(refname, interval.zero_based_start, interval.zero_based_open_end, interval_idx)

# Flag this tree as needing to be indexed after adding a new interval, but defer
# indexing
self._refname_to_indexed[refname] = False

def add_all(self, intervals: Iterable[_GenericGenomicSpan]) -> None:
def add_all(self, intervals: Iterable[GenericGenomicsSpan]) -> None:
"""Adds one or more intervals to this detector.
Args:
Expand All @@ -302,21 +269,27 @@ def overlaps_any(self, interval: GenomicSpan) -> bool:
True if and only if the given interval overlaps with any interval in this
detector.
"""
refname = self._reference_sequence_name(interval)
refname = interval.reference_name
tree = self._refname_to_tree.get(refname)
if tree is None:
return False
else:
if not self._refname_to_indexed[refname]:
tree.index()
try:
next(iter(tree.overlap(refname, interval.start, interval.end)))
next(
iter(
tree.overlap(
refname, interval.zero_based_start, interval.zero_based_open_end
)
)
)
except StopIteration:
return False
else:
return True

def get_overlaps(self, interval: GenomicSpan) -> List[_GenericGenomicSpan]:
def get_overlaps(self, interval: GenomicSpan) -> List[GenericGenomicsSpan]:
"""Returns any intervals in this detector that overlap the given interval.
Args:
Expand All @@ -326,30 +299,36 @@ def get_overlaps(self, interval: GenomicSpan) -> List[_GenericGenomicSpan]:
The list of intervals in this detector that overlap the given interval, or the empty
list if no overlaps exist. The intervals will be return in ascending genomic order.
"""
refname = self._reference_sequence_name(interval)
refname = interval.reference_name
tree = self._refname_to_tree.get(refname)
if tree is None:
return []
else:
if not self._refname_to_indexed[refname]:
tree.index()
ref_intervals: List[_GenericGenomicSpan] = self._refname_to_intervals[refname]
ref_intervals: List[GenericGenomicsSpan] = self._refname_to_intervals[refname]
# NB: only return unique instances of intervals
intervals: Set[_GenericGenomicSpan] = {
intervals: Set[GenericGenomicsSpan] = {
ref_intervals[index]
for _, _, index in tree.overlap(refname, interval.start, interval.end)
for _, _, index in tree.overlap(
refname, interval.zero_based_start, interval.zero_based_open_end
)
}
return sorted(
intervals,
key=lambda intv: (
intv.start,
intv.end,
intv.zero_based_start,
intv.zero_based_open_end,
self._is_negative(intv),
self._reference_sequence_name(intv),
intv.reference_name,
),
)

def get_enclosing_intervals(self, interval: GenomicSpan) -> List[_GenericGenomicSpan]:
@staticmethod
def _is_negative(interval: GenomicSpan) -> bool:
return getattr(interval, "is_negative", False)

def get_enclosing_intervals(self, interval: GenomicSpan) -> List[GenericGenomicsSpan]:
"""Returns the set of intervals in this detector that wholly enclose the query interval.
i.e. query.start >= target.start and query.end <= target.end.
Expand All @@ -360,9 +339,14 @@ def get_enclosing_intervals(self, interval: GenomicSpan) -> List[_GenericGenomic
The intervals will be returned in ascending genomic order.
"""
results = self.get_overlaps(interval)
return [i for i in results if interval.start >= i.start and interval.end <= i.end]

def get_enclosed(self, interval: GenomicSpan) -> List[_GenericGenomicSpan]:
return [
i
for i in results
if interval.zero_based_start >= i.zero_based_start
and interval.zero_based_open_end <= i.zero_based_open_end
]

def get_enclosed(self, interval: GenomicSpan) -> List[GenericGenomicsSpan]:
"""Returns the set of intervals in this detector that are enclosed by the query
interval. I.e. target.start >= query.start and target.end <= query.end.
Expand All @@ -374,7 +358,12 @@ def get_enclosed(self, interval: GenomicSpan) -> List[_GenericGenomicSpan]:
The intervals will be return in ascending genomic order.
"""
results = self.get_overlaps(interval)
return [i for i in results if i.start >= interval.start and i.end <= interval.end]
return [
i
for i in results
if i.zero_based_start >= interval.zero_based_start
and i.zero_based_open_end <= interval.zero_based_open_end
]

@classmethod
def from_bed(cls, path: Path) -> "OverlapDetector":
Expand Down
Loading

0 comments on commit 878aa6f

Please sign in to comment.