Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: parallelizing sending bulk updates to OpenSearch on same snapshot and updating tessellation version #70

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
with:
path: tessellation
repository: Constellation-Labs/tessellation
ref: v2.7.1
ref: v2.12.0
token: ${{ secrets.GITHUB_TOKEN }}

- name: Set up Java
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
with:
path: tessellation
repository: Constellation-Labs/tessellation
ref: v2.7.1
ref: v2.12.0
token: ${{ secrets.GITHUB_TOKEN }}

- name: Set up Java
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object Dependencies {
val logstash = "7.2"
val organizeImports = "0.6.0"
val refined = "0.10.1"
val tessellation = "2.7.1"
val tessellation = "2.12.0"
val weaver = "0.8.1"
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package org.constellation.snapshotstreaming

import java.util.Date
import cats.Applicative
import cats.{Applicative, Parallel}
import cats.data.NonEmptyList
import cats.data.NonEmptyMap
import cats.data.Validated
import cats.effect._
import cats.effect.std.Random
import cats.effect.syntax.all._
import cats.syntax.all._
import org.tessellation.currency.schema.currency.CurrencyIncrementalSnapshot
import org.tessellation.currency.schema.currency.CurrencySnapshot
Expand All @@ -30,6 +31,7 @@ import org.tessellation.schema.GlobalSnapshotInfoV2
import org.tessellation.security._
import org.tessellation.security.signature.Signed
import com.sksamuel.elastic4s.ElasticDsl.bulk
import com.sksamuel.elastic4s.requests.update.UpdateRequest
import fs2.Stream
import org.constellation.snapshotstreaming.opensearch.OpensearchDAO
import org.constellation.snapshotstreaming.opensearch.UpdateRequestBuilder
Expand All @@ -49,7 +51,7 @@ trait SnapshotProcessor[F[_]] {

object SnapshotProcessor {

def make[F[_]: Async: KryoSerializer: JsonSerializer: SecurityProvider: Random: HasherSelector](
def make[F[_]: Async: Parallel: KryoSerializer: JsonSerializer: SecurityProvider: Random: HasherSelector](
configuration: Configuration,
txHasher: Hasher[F]
): Resource[F, SnapshotProcessor[F]] =
Expand Down Expand Up @@ -99,7 +101,7 @@ object SnapshotProcessor {
lastFullGlobalSnapshotStorage
)

def make[F[_]: Async: HasherSelector](
def make[F[_]: Async: Parallel: HasherSelector](
configuration: Configuration,
lastIncrementalGlobalSnapshotStorage: LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo],
l0Service: GlobalL0Service[F],
Expand All @@ -111,6 +113,13 @@ object SnapshotProcessor {
): SnapshotProcessor[F] = new SnapshotProcessor[F] {
val logger = Slf4jLogger.getLogger[F]

private def logGroupedRequests(br: Seq[UpdateRequest], mode: String): F[Unit] = {
val groupedBr = br.groupBy(_.index.index)
groupedBr.toList.traverse_ { case (index, group) =>
logger.info(s"Processing $mode group for index: $index with ${group.size} requests")
}
}

private def prepareAndExecuteBulkUpdate(
globalSnapshotWithState: GlobalSnapshotWithState,
hasher: Hasher[F]
Expand All @@ -119,7 +128,25 @@ object SnapshotProcessor {
Clock[F].realTime
.map(d => new Date(d.toMillis))
.flatMap(updateRequestBuilder.bulkUpdateRequests(state, _, hasher))
.flatMap(_.traverse(br => opensearchDAO.sendToOpensearch(bulk(br).refreshImmediately)))
.flatMap { requests =>
for {
_ <- logger.info("Starting to send parallel bulk updates to Opensearch")
_ <- requests.parallelRequests.parTraverse { br =>
logGroupedRequests(br, "parallel") >>
opensearchDAO.sendToOpensearch(bulk(br))
}.timed.flatTap { case (elapsedTime, _) =>
logger.info(s"Parallel bulk update operation took ${elapsedTime.toMillis} ms")
}

_ <- logger.info("Starting to send sequential bulk updates to Opensearch")
_ <- requests.sequentialRequests.traverse { br =>
logGroupedRequests(br, "sequential") >>
opensearchDAO.sendToOpensearch(bulk(br))
}.timed.flatTap { case (elapsedTime, _) =>
logger.info(s"Sequential bulk update operation took ${elapsedTime.toMillis} ms")
}
} yield ()
}
.flatMap(_ =>
logger.info(
s"Snapshot ${snapshot.ordinal.value.value} (hash: ${snapshot.hash.show.take(8)}) sent to opensearch."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ import org.constellation.snapshotstreaming.opensearch.mapper.GlobalSnapshotMappe
import org.constellation.snapshotstreaming.opensearch.schema._
import org.constellation.snapshotstreaming.Configuration

case class UpdateRequests(
sequentialRequests: Seq[Seq[UpdateRequest]],
parallelRequests : List[List[UpdateRequest]]
)

trait UpdateRequestBuilder[F[_]] {

def bulkUpdateRequests(
globalSnapshotWithState: GlobalSnapshotWithState,
timestamp: Date,
hasher: Hasher[F]
): F[Seq[Seq[UpdateRequest]]]
): F[UpdateRequests]

}

Expand All @@ -37,9 +42,7 @@ object UpdateRequestBuilder {
globalSnapshotWithState: GlobalSnapshotWithState,
timestamp: Date,
hasher: Hasher[F]
): F[
Seq[Seq[UpdateRequest]]
] =
): F[UpdateRequests] =
for {
_ <- Async[F].unit
GlobalSnapshotWithState(globalSnapshot, maybePrevSnapshotInfo, snapshotInfo, currencySnapshots) =
Expand All @@ -65,34 +68,44 @@ object UpdateRequestBuilder {
(currSnapshot, currIncrementalSnapshots, currBlocks, currTransactions, currFeeTransactions, currBalances) =
mappedCurrencyData

} yield updateRequests(
snapshot,
blocks,
transactions,
balances,
currSnapshot,
currIncrementalSnapshots,
currBlocks,
currTransactions,
currFeeTransactions,
currBalances
).grouped(config.bulkSize).toSeq

def updateRequests[T](
parallelRequests = updateParallelRequests(
blocks,
transactions,
balances,
currSnapshot,
currIncrementalSnapshots,
currBlocks,
currTransactions,
currFeeTransactions,
currBalances
).grouped(config.bulkSize).toList

sequentialRequests = updateSequentialRequests(
snapshot,
).grouped(config.bulkSize).toSeq

} yield UpdateRequests(
sequentialRequests,
parallelRequests
)

def updateSequentialRequests(
snapshot: Snapshot,
blocks: Seq[Block],
transactions: Seq[Transaction],
balances: Seq[AddressBalance],
currencySnapshots: Seq[CurrencyData[Snapshot]],
): Seq[UpdateRequest] =
Seq(updateById(config.snapshotsIndex, snapshot.hash).docAsUpsert(snapshot))

def updateParallelRequests(
blocks : Seq[Block],
transactions : Seq[Transaction],
balances : Seq[AddressBalance],
currencySnapshots : Seq[CurrencyData[Snapshot]],
currencyIncrementalSnapshots: Seq[CurrencyData[CurrencySnapshot]],
currencyBlocks: Seq[CurrencyData[Block]],
currencyTransactions: Seq[CurrencyData[Transaction]],
currencyFeeTransactions: Seq[CurrencyData[FeeTransaction]],
currencyBalances: Seq[CurrencyData[AddressBalance]]
): Seq[UpdateRequest] = {

Seq(updateById(config.snapshotsIndex, snapshot.hash).docAsUpsert(snapshot)) ++
blocks.map(block => updateById(config.blocksIndex, block.hash).docAsUpsert(block)) ++
currencyBlocks : Seq[CurrencyData[Block]],
currencyTransactions : Seq[CurrencyData[Transaction]],
currencyFeeTransactions : Seq[CurrencyData[FeeTransaction]],
currencyBalances : Seq[CurrencyData[AddressBalance]]
): List[UpdateRequest] = {
blocks.toList.map(block => updateById(config.blocksIndex, block.hash).docAsUpsert(block)) ++
transactions.map(transaction =>
updateById(config.transactionsIndex, transaction.hash).docAsUpsert(transaction)
) ++
Expand Down