Skip to content

Commit

Permalink
Alternative PR to add duplicate marking to GroupReadsByUmi (#940)
Browse files Browse the repository at this point in the history
Add duplicate marking to GroupReadsByUmi.  Also includes:
- Ability to pass through secondary and supplementary reads
- Defaulting passing through secondary and supplementary reads based on whether duplicate marking is activated or not

Co-authored-by: Joe Vieira <[email protected]>
Co-authored-by: Nils Homer <[email protected]>
  • Loading branch information
3 people authored Oct 26, 2023
1 parent 2363908 commit 6fa1fde
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 32 deletions.
1 change: 1 addition & 0 deletions .github/workflows/unittests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ jobs:
- name: Code Coverage
uses: codecov/[email protected]
with:
token: ${{ secrets.CODECOV_TOKEN }}
flags: unittests # optional
fail_ci_if_error: true # optional (default = false)
verbose: true # optional (default = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,14 @@ class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iter
private var collectedStats: Boolean = false

protected val iter: Iterator[SamRecord] = {
val filteredIterator = sourceIterator.filterNot { r =>
r.secondary || r.supplementary || r.unmapped || (r.paired && r.mateUnmapped)
}

// Wrap our input iterator in a progress logging iterator if we have a progress logger
val progressIterator = progress match {
case Some(p) => sourceIterator.tapEach { r => p.record(r) }
case None => sourceIterator
case Some(p) => filteredIterator.tapEach { r => p.record(r) }
case None => filteredIterator
}

// Then turn it into a grouping iterator
Expand Down
104 changes: 75 additions & 29 deletions src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@
*/
package com.fulcrumgenomics.umi

import java.util.concurrent.atomic.AtomicLong
import com.fulcrumgenomics.FgBioDef._
import com.fulcrumgenomics.bam.api.SamOrder.TemplateCoordinate
import com.fulcrumgenomics.bam.{Bams, Template}
import com.fulcrumgenomics.bam.api.{SamOrder, SamRecord, SamSource, SamWriter}
import com.fulcrumgenomics.bam.{Bams, Template}
import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool}
import com.fulcrumgenomics.commons.util.{LazyLogging, NumericCounter, SimpleCounter}
import com.fulcrumgenomics.sopt.{arg, clp}
Expand All @@ -39,13 +38,14 @@ import com.fulcrumgenomics.util.Metric.{Count, Proportion}
import com.fulcrumgenomics.util.Sequences.countMismatches
import com.fulcrumgenomics.util._
import enumeratum.EnumEntry
import htsjdk.samtools.DuplicateScoringStrategy.ScoringStrategy
import htsjdk.samtools._
import htsjdk.samtools.util.SequenceUtil

import scala.collection.BufferedIterator
import java.util.concurrent.atomic.AtomicLong
import scala.collection.immutable.IndexedSeq
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.collection.{BufferedIterator, Iterator, mutable}


object GroupReadsByUmi {
Expand Down Expand Up @@ -439,9 +439,18 @@ object Strategy extends FgBioEnum[Strategy] {
| 3. The assigned UMI tag
| 4. Read Name
|
|During grouping, reads are filtered out if a) all reads with the same queryname are unmapped, b) any primary
|read has mapping quality < `min-map-q` (default=1), or c) the primary mappings for R1 and R2 are on different
|chromosomes and `--allow-inter-contig` has been set to false.
|If the input is not template-coordinate sorted (i.e. `SO:unsorted GO:query SS:unsorted:template-coordinate`), then
|this tool will re-sort the input. The output will be written in template-coordinate order.
|
|During grouping, reads and templates are filtered out as follows:
|
|1. Templates are filtered if all reads for the template are unmapped
|2. Templates are filtered if any non-secondary, non-supplementary read has mapping quality < `min-map-q`
|3. Templates are filtered if R1 and R2 are mapped to different chromosomes and `--allow-inter-contig` is false
|4. Templates are filtered if any UMI sequence contains one or more `N` bases
|5. Templates are filtered if `--min-umi-length` is specified and the UMI does not meet the length requirement
|6. Reads are filtered out if flagged as secondary and `--include-secondary` is false
|7. Reads are filtered out if flagged as supplementary and `--include-supplementary` is false
|
|Grouping of UMIs is performed by one of four strategies:
|
Expand Down Expand Up @@ -470,27 +479,40 @@ object Strategy extends FgBioEnum[Strategy] {
|(i.e. does not count dashes and other non-ACGT characters). This option is not implemented for reads with UMI pairs
|(i.e. using the paired assigner).
|
|If the input is not template-coordinate sorted (i.e. `SO:unsorted GO:query SS:unsorted:template-coordinate`), then
|this tool will re-sort the input. The ouitput will be written in template-coordinate order.
|If the `--mark-duplicates` option is given, reads will also have their duplicate flag set in the BAM file.
|Each tag-family is treated separately, and a single template within the tag family is chosen to be the "unique"
|template and marked as non-duplicate, while all other templates in the tag family are then marked as duplicate.
|One limitation of duplicate-marking mode, vs. e.g. Picard MarkDuplicates, is that read pairs with one unmapped read
|are duplicate-marked independently from read pairs with both reads mapped.
|
|Several parameters have different defaults depending on whether duplicates are being marked or not (all are
|directly settable on the command line):
|
| 1. `--min-map-q` defaults to 0 in duplicate marking mode and 1 otherwise
| 2. `--include-secondary` defaults to true in duplicate marking mode and false otherwise
| 3. `--include-supplementary` defaults to true in duplicate marking mode and false otherwise
"""
)
class GroupReadsByUmi
( @arg(flag='i', doc="The input BAM file.") val input: PathToBam = Io.StdIn,
@arg(flag='o', doc="The output BAM file.") val output: PathToBam = Io.StdOut,
@arg(flag='f', doc="Optional output of tag family size counts.") val familySizeHistogram: Option[FilePath] = None,
@arg(flag='t', doc="The tag containing the raw UMI.") val rawTag: String = "RX",
@arg(flag='T', doc="The output tag for UMI grouping.") val assignTag: String = "MI",
@arg(flag='m', doc="Minimum mapping quality for mapped reads.") val minMapQ: Int = 1,
@arg(flag='n', doc="Include non-PF reads.") val includeNonPfReads: Boolean = false,
@arg(flag='s', doc="The UMI assignment strategy.") val strategy: Strategy,
@arg(flag='e', doc="The allowable number of edits between UMIs.") val edits: Int = 1,
@arg(flag='l', doc= """The minimum UMI length. If not specified then all UMIs must have the same length,
|otherwise discard reads with UMIs shorter than this length and allow for differing UMI lengths.
|""")
val minUmiLength: Option[Int] = None,
@arg(flag='x', doc= """
|DEPRECATED: this option will be removed in future versions and inter-contig reads will be
|automatically processed.""")
(@arg(flag='i', doc="The input BAM file.") val input: PathToBam = Io.StdIn,
@arg(flag='o', doc="The output BAM file.") val output: PathToBam = Io.StdOut,
@arg(flag='f', doc="Optional output of tag family size counts.") val familySizeHistogram: Option[FilePath] = None,
@arg(flag='t', doc="The tag containing the raw UMI.") val rawTag: String = ConsensusTags.UmiBases,
@arg(flag='T', doc="The output tag for UMI grouping.") val assignTag: String = ConsensusTags.MolecularId,
@arg(flag='d', doc="Turn on duplicate marking mode.") val markDuplicates: Boolean = false,
@arg(flag='S', doc="Include secondary reads.") val includeSecondary: Option[Boolean] = None,
@arg(flag='U', doc="Include supplementary reads.") val includeSupplementary: Option[Boolean] = None,
@arg(flag='m', doc="Minimum mapping quality for mapped reads.") val minMapQ: Option[Int] = None,
@arg(flag='n', doc="Include non-PF reads.") val includeNonPfReads: Boolean = false,
@arg(flag='s', doc="The UMI assignment strategy.") val strategy: Strategy,
@arg(flag='e', doc="The allowable number of edits between UMIs.") val edits: Int = 1,
@arg(flag='l', doc= """The minimum UMI length. If not specified then all UMIs must have the same length,
|otherwise discard reads with UMIs shorter than this length and allow for differing UMI lengths.
|""")
val minUmiLength: Option[Int] = None,
@arg(flag='x', doc= """
|DEPRECATED: this option will be removed in future versions and inter-contig reads will be
|automatically processed.""")
@deprecated val allowInterContig: Boolean = true
)extends FgBioTool with LazyLogging {
import GroupReadsByUmi._
Expand All @@ -499,6 +521,10 @@ class GroupReadsByUmi

private val assigner = strategy.newStrategy(this.edits)

// Give values to unset parameters that are different in duplicate marking mode
private val _minMapQ = this.minMapQ.getOrElse(if (this.markDuplicates) 0 else 1)
private val _includeSecondaryReads = this.includeSecondary.getOrElse(this.markDuplicates)
private val _includeSupplementaryReads = this.includeSupplementary.getOrElse(this.markDuplicates)

/** Checks that the read's mapq is over a minimum, and if the read is paired, that the mate mapq is also over the min. */
private def mapqOk(rec: SamRecord, minMapQ: Int): Boolean = {
Expand Down Expand Up @@ -546,11 +572,12 @@ class GroupReadsByUmi
// Filter and sort the input BAM file
logger.info("Filtering the input.")
val filteredIterator = in.iterator
.filter(r => !r.secondary && !r.supplementary)
.filter(r => this._includeSecondaryReads || !r.secondary )
.filter(r => this._includeSupplementaryReads || !r.supplementary )
.filter(r => (includeNonPfReads || r.pf) || { filteredNonPf += 1; false })
.filter(r => (r.mapped || (r.paired && r.mateMapped)) || { filteredPoorAlignment += 1; false })
.filter(r => (allowInterContig || r.unpaired || r.refIndex == r.mateRefIndex) || { filteredPoorAlignment += 1; false })
.filter(r => mapqOk(r, this.minMapQ) || { filteredPoorAlignment += 1; false })
.filter(r => mapqOk(r, this._minMapQ) || { filteredPoorAlignment += 1; false })
.filter(r => !r.get[String](rawTag).exists(_.contains('N')) || { filteredNsInUmi += 1; false })
.filter { r =>
this.minUmiLength.forall { l =>
Expand Down Expand Up @@ -610,10 +637,15 @@ class GroupReadsByUmi
// Take the next set of templates by position and assign UMIs
val templates = takeNextGroup(templateCoordinateIterator, canTakeNextGroupByUmi=canTakeNextGroupByUmi)
assignUmiGroups(templates)

// Then output the records in the right order (assigned tag, read name, r1, r2)
// Then group the records in the right order (assigned tag, read name, r1, r2)
val templatesByMi = templates.groupBy { t => t.r1.get.apply[String](this.assignTag) }

// If marking duplicates, assign bitflag to all duplicate reads
if (this.markDuplicates) {
templatesByMi.values.foreach(t => setDuplicateFlags(t))
}

// Then output the records in the right order (assigned tag, read name, r1, r2)
templatesByMi.keys.toSeq.sortBy(id => (id.length, id)).foreach(tag => {
templatesByMi(tag).sortBy(t => t.name).flatMap(t => t.primaryReads).foreach(rec => {
out += rec
Expand Down Expand Up @@ -731,4 +763,18 @@ class GroupReadsByUmi
r1[String](this.rawTag)
}
}

/** Sets the duplicate flags on all reads within all templates. */
private def setDuplicateFlags(group: Seq[Template]): Unit = {
val nonDuplicateTemplate = group.maxBy { template =>
template.primaryReads.sumBy { r =>
r.mapq + DuplicateScoringStrategy.computeDuplicateScore(r.asSam, ScoringStrategy.SUM_OF_BASE_QUALITIES)
}
}

group.foreach { template =>
val flag = template ne nonDuplicateTemplate
template.allReads.foreach(_.duplicate = flag)
}
}
}
63 changes: 62 additions & 1 deletion src/test/scala/com/fulcrumgenomics/umi/GroupReadsByUmiTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class GroupReadsByUmiTest extends UnitSpec with OptionValues with PrivateMethodT
val in = builder.toTempFile()
val out = Files.createTempFile("umi_grouped.", ".sam")
val hist = Files.createTempFile("umi_grouped.", ".histogram.txt")
val tool = new GroupReadsByUmi(input=in, output=out, familySizeHistogram=Some(hist), rawTag="RX", assignTag="MI", strategy=Strategy.Edit, edits=1, minMapQ=30)
val tool = new GroupReadsByUmi(input=in, output=out, familySizeHistogram=Some(hist), rawTag="RX", assignTag="MI", strategy=Strategy.Edit, edits=1, minMapQ=Some(30))
val logs = executeFgbioTool(tool)

val groups = readBamRecs(out).groupBy(_.name.charAt(0))
Expand All @@ -255,6 +255,67 @@ class GroupReadsByUmiTest extends UnitSpec with OptionValues with PrivateMethodT
}
}

it should "correctly mark duplicates on duplicate reads in group, when flag is passed" in {
val builder = new SamBuilder(readLength = 100, sort = Some(SamOrder.Coordinate))
// Mapping Quality is a tie breaker, so use that to our advantage here.
builder.addPair(mapq1 = 10, mapq2 = 10, name = "a01", start1 = 100, start2 = 300, strand1 = Plus, strand2 = Minus, attrs = Map("RX" -> "ACT-ACT"))
builder.addPair(mapq1 = 30, mapq2 = 30, name = "a02", start1 = 100, start2 = 300, strand1 = Plus, strand2 = Minus, attrs = Map("RX" -> "ACT-ACT"))
builder.addPair(mapq1 = 100, mapq2 = 10, name = "a03", start1 = 100, start2 = 300, strand1 = Plus, strand2 = Minus, attrs = Map("RX" -> "ACT-ACT"))
builder.addPair(mapq1 = 0, mapq2 = 0, name = "a04", start1 = 100, start2 = 300, strand1 = Plus, strand2 = Minus, attrs = Map("RX" -> "ACT-ACT"))

val in = builder.toTempFile()
val out = Files.createTempFile("umi_grouped.", ".sam")
val hist = Files.createTempFile("umi_grouped.", ".histogram.txt")
val gr = new GroupReadsByUmi(input=in, output=out, familySizeHistogram=Some(hist), strategy=Strategy.Paired, edits=1, markDuplicates=true)

gr.markDuplicates shouldBe true

val recs = readBamRecs(out)
recs.filter(_.name.equals("a01")).forall(_.duplicate == true) shouldBe true
recs.filter(_.name.equals("a02")).forall(_.duplicate == true) shouldBe true
recs.filter(_.name.equals("a03")).forall(_.duplicate == false) shouldBe true
recs.filter(_.name.equals("a04")).forall(_.duplicate == true) shouldBe true
}

it should "does not mark duplicates on reads in group, when flag is not passed" in {
val builder = new SamBuilder(readLength = 100, sort = Some(SamOrder.Coordinate))
// Mapping Quality is a tie breaker, so use that to our advantage here.
builder.addPair(mapq1 = 10, mapq2 = 10, name = "a01", start1 = 100, start2 = 300, strand1 = Plus, strand2 = Minus, attrs = Map("RX" -> "ACT-ACT"))
builder.addPair(mapq1 = 30, mapq2 = 30, name = "a02", start1 = 100, start2 = 300, strand1 = Plus, strand2 = Minus, attrs = Map("RX" -> "ACT-ACT"))
builder.addPair(mapq1 = 100, mapq2 = 10, name = "a03", start1 = 100, start2 = 300, strand1 = Plus, strand2 = Minus, attrs = Map("RX" -> "ACT-ACT"))
builder.addPair(mapq1 = 0, mapq2 = 0, name = "a04", start1 = 100, start2 = 300, strand1 = Plus, strand2 = Minus, attrs = Map("RX" -> "ACT-ACT"))

val in = builder.toTempFile()
val out = Files.createTempFile("umi_grouped.", ".sam")
val hist = Files.createTempFile("umi_grouped.", ".histogram.txt")
new GroupReadsByUmi(input=in, output=out, familySizeHistogram=Some(hist), strategy=Strategy.Paired, edits=1).execute()

val recs = readBamRecs(out)
recs.filter(_.name.equals("a01")).forall(_.duplicate == false) shouldBe true
recs.filter(_.name.equals("a02")).forall(_.duplicate == false) shouldBe true
recs.filter(_.name.equals("a03")).forall(_.duplicate == false) shouldBe true
recs.filter(_.name.equals("a04")).forall(_.duplicate == false) shouldBe true
}

it should "correctly mark duplicates on duplicate single-end reads with UMIs" in {
val builder = new SamBuilder(readLength = 100, sort = Some(SamOrder.Coordinate))
builder.addFrag(mapq = 100, name = "a01", start = 100, attrs = Map("RX" -> "AAAAAAAA"))
builder.addFrag(mapq = 10, name = "a02", start = 100, attrs = Map("RX" -> "AAAAAAAA"))
builder.addFrag(mapq = 100, name = "a03", start = 100, attrs = Map("RX" -> "CACACACA"))
builder.addFrag(mapq = 10, name = "a04", start = 100, attrs = Map("RX" -> "CACACACC"))

val in = builder.toTempFile()
val out = Files.createTempFile("umi_grouped.", ".sam")
val hist = Files.createTempFile("umi_grouped.", ".histogram.txt")
new GroupReadsByUmi(input = in, output = out, familySizeHistogram = Some(hist), rawTag = "RX", assignTag = "MI", strategy = Strategy.Edit, edits = 1, markDuplicates = true).execute()

val recs = readBamRecs(out)
recs.filter(_.name.equals("a01")).forall(_.duplicate == false) shouldBe true
recs.filter(_.name.equals("a02")).forall(_.duplicate == true) shouldBe true
recs.filter(_.name.equals("a03")).forall(_.duplicate == false) shouldBe true
recs.filter(_.name.equals("a04")).forall(_.duplicate == true) shouldBe true
}

it should "correctly group reads with the paired assigner when the two UMIs are the same" in {
val builder = new SamBuilder(readLength=100, sort=Some(SamOrder.Coordinate))
builder.addPair(name="a01", start1=100, start2=300, strand1=Plus, strand2=Minus, attrs=Map("RX" -> "ACT-ACT"))
Expand Down

0 comments on commit 6fa1fde

Please sign in to comment.