Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adopt snapshot streaming to handle snapshot fee balances #65

Merged
merged 1 commit into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
with:
path: tessellation
repository: Constellation-Labs/tessellation
ref: v2.7.0
ref: v2.7.1
token: ${{ secrets.GITHUB_TOKEN }}

- name: Set up Java
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
with:
path: tessellation
repository: Constellation-Labs/tessellation
ref: v2.7.0
ref: v2.7.1
token: ${{ secrets.GITHUB_TOKEN }}

- name: Set up Java
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object Dependencies {
val logstash = "7.2"
val organizeImports = "0.6.0"
val refined = "0.10.1"
val tessellation = "2.7.0"
val tessellation = "2.7.1"
val weaver = "0.8.1"
}

Expand Down
11 changes: 6 additions & 5 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
snapshotStreaming {
lastSnapshotPath = "lastSnapshot.json"
lastIncrementalSnapshotPath = "lastIncrementalSnapshot.json"
environment = Testnet
collateral = 25000000000000
lastCurrencySnapshotsPath = "lastCurrencySnapshots"
environment = Dev
collateral = 0
httpClient {
timeout = "120s"
idleTimeInPool = "30s"
Expand Down Expand Up @@ -31,8 +32,8 @@ snapshotStreaming {
}
}
s3 {
bucketRegion = "us-west-1"
bucketName = "constellationlabs-testnet-snapshots"
bucketDir = "snapshot-streaming"
bucketRegion = ""
bucketName = ""
bucketDir = ""
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Configuration(sharedConfigReader: SharedConfigReader) {

val lastFullSnapshotPath: Path = Path(config.getString("snapshotStreaming.lastSnapshotPath"))
val lastIncrementalSnapshotPath: Path = Path(config.getString("snapshotStreaming.lastIncrementalSnapshotPath"))
val lastCurrencySnapshotsPath: Path = Path(config.getString("snapshotStreaming.lastCurrencySnapshotsPath"))
val collateral: Amount = Amount(NonNegLong.unsafeFrom(config.getLong("snapshotStreaming.collateral")))

val environment: AppEnvironment =
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ trait GlobalSnapshotContextService[F[_]] {

object GlobalSnapshotContextService {

type CurrencySnapshotWithState =
Either[Signed[CurrencySnapshot], (Signed[CurrencyIncrementalSnapshot], CurrencySnapshotInfo)]

def make[F[_]: Async: KryoSerializer: HasherSelector](
globalSnapshotStateChannelEventsProcessor: GlobalSnapshotStateChannelEventsProcessor[F],
globalSnapshotContextFns: GlobalSnapshotContextFunctions[F]
Expand Down Expand Up @@ -60,7 +57,7 @@ object GlobalSnapshotContextService {
}
})
}
.map(GlobalSnapshotWithState(artifact, newContext, _))
.map(GlobalSnapshotWithState(artifact, newContext, context.some, _))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ import fs2.Stream
import org.constellation.snapshotstreaming.opensearch.OpensearchDAO
import org.constellation.snapshotstreaming.opensearch.UpdateRequestBuilder
import org.constellation.snapshotstreaming.s3.S3DAO
import org.constellation.snapshotstreaming.storage.FileBasedLastFullSnapshotStorage
import org.constellation.snapshotstreaming.storage.FileBasedLastIncrementalGlobalSnapshotStorage
import org.constellation.snapshotstreaming.storage.LastCurrencySnapshotStorage
import org.http4s.ember.client.EmberClientBuilder
import org.tessellation.schema.GlobalSnapshot
import org.tessellation.statechannel.StateChannelSnapshotBinary
import org.typelevel.log4cats.slf4j.Slf4jLogger

Expand Down Expand Up @@ -71,11 +75,14 @@ object SnapshotProcessor {
configuration.pullLimit.some,
configuration.l0Peers.keys.some
)
requestBuilder = UpdateRequestBuilder.make[F](configuration, txHasher: Hasher[F])
requestBuilder <- Resource.eval(UpdateRequestBuilder.make[F](configuration, txHasher: Hasher[F]))
tesselationServices <- Resource.eval(
TessellationServices.make[F](configuration)
)
lastFullGlobalSnapshotStorage = FileBasedLastFullGlobalSnapshotStorage.make[F](configuration.lastFullSnapshotPath)
lastFullGlobalSnapshotStorage = FileBasedLastFullSnapshotStorage.make[F, GlobalSnapshot](
configuration.lastFullSnapshotPath
)
lastCurrencySnapshotStorage <- Resource.eval(LastCurrencySnapshotStorage.make[F](configuration.lastCurrencySnapshotsPath))
} yield make(
configuration,
lastIncrementalGlobalSnapshotStorage,
Expand All @@ -84,26 +91,28 @@ object SnapshotProcessor {
s3DAO,
requestBuilder,
tesselationServices,
lastFullGlobalSnapshotStorage
lastFullGlobalSnapshotStorage,
lastCurrencySnapshotStorage
)

def make[F[_]: Async: KryoSerializer: JsonSerializer: HasherSelector](
def make[F[_]: Async: HasherSelector](
configuration: Configuration,
lastIncrementalGlobalSnapshotStorage: LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo],
l0Service: GlobalL0Service[F],
opensearchDAO: OpensearchDAO[F],
s3DAO: S3DAO[F],
updateRequestBuilder: UpdateRequestBuilder[F],
tessellationServices: TessellationServices[F],
lastFullGlobalSnapshotStorage: FileBasedLastFullGlobalSnapshotStorage[F]
lastFullGlobalSnapshotStorage: FileBasedLastFullSnapshotStorage[F, GlobalSnapshot],
lastCurrencySnapshotStorage: LastCurrencySnapshotStorage[F]
): SnapshotProcessor[F] = new SnapshotProcessor[F] {
val logger = Slf4jLogger.getLogger[F]

private def prepareAndExecuteBulkUpdate(
globalSnapshotWithState: GlobalSnapshotWithState,
hasher: Hasher[F]
): F[Unit] =
globalSnapshotWithState.pure[F].flatMap { case state @ GlobalSnapshotWithState(snapshot, _, _) =>
globalSnapshotWithState.pure[F].flatMap { case state @ GlobalSnapshotWithState(snapshot, _, _, _) =>
Clock[F].realTime
.map(d => new Date(d.toMillis))
.flatMap(updateRequestBuilder.bulkUpdateRequests(state, _, hasher))
Expand All @@ -116,8 +125,19 @@ object SnapshotProcessor {
.void
}

private def setLastCurrencySnapshots(
snapshots: Map[Address, NonEmptyList[
Either[Hashed[
CurrencySnapshot
], (Hashed[CurrencyIncrementalSnapshot], CurrencySnapshotInfo, Signed[StateChannelSnapshotBinary])]
]],
hasher: Hasher[F]
): F[Unit] = snapshots.toList.flatTraverse { case (addr, snapshots) =>
snapshots.traverse(s => lastCurrencySnapshotStorage.set(addr, s, hasher)).map(_.toList)
}.void

private def process(globalSnapshotWithState: GlobalSnapshotWithState, hasher: Hasher[F]): F[Unit] =
globalSnapshotWithState.pure[F].flatMap { case state @ GlobalSnapshotWithState(snapshot, snapshotInfo, _) =>
globalSnapshotWithState.pure[F].flatMap { case state @ GlobalSnapshotWithState(snapshot, snapshotInfo, _, _) =>
HasherSelector[F]
.forOrdinal(snapshot.ordinal) { implicit hasher =>
logger.info(
Expand All @@ -135,6 +155,7 @@ object SnapshotProcessor {
case Some(last) if Validator.isNextSnapshot(last, snapshot.signed.value) =>
s3DAO.uploadSnapshot(snapshot) >>
prepareAndExecuteBulkUpdate(state, hasher) >>
setLastCurrencySnapshots(globalSnapshotWithState.currencySnapshots, hasher) >>
lastIncrementalGlobalSnapshotStorage.set(snapshot, snapshotInfo)

case Some(last) =>
Expand All @@ -151,6 +172,7 @@ object SnapshotProcessor {
case Some(last) if Validator.isNextSnapshot(last, snapshot.signed.value) =>
s3DAO.uploadSnapshot(snapshot) >>
prepareAndExecuteBulkUpdate(state, hasher) >>
setLastCurrencySnapshots(globalSnapshotWithState.currencySnapshots, hasher) >>
lastIncrementalGlobalSnapshotStorage
.setInitial(snapshot, snapshotInfo)
.onError(e => logger.error(e)(s"Failure setting initial global snapshot!"))
Expand Down Expand Up @@ -210,7 +232,7 @@ object SnapshotProcessor {
.pullGlobalSnapshot(signedFullGlobalSnapshot.value.ordinal.next)
.map(
_.map(nextSnapshot =>
GlobalSnapshotWithState(nextSnapshot, signedFullGlobalSnapshot.value.info, Map.empty)
GlobalSnapshotWithState(nextSnapshot, signedFullGlobalSnapshot.value.info, None, Map.empty)
)
)
.map(_.toList)
Expand All @@ -223,13 +245,13 @@ object SnapshotProcessor {
}
}
.evalTap { snapshots =>
snapshots.traverse { case GlobalSnapshotWithState(snapshot, _, _) =>
snapshots.traverse { case GlobalSnapshotWithState(snapshot, _, _, _) =>
logger.info(s"Pulled following global snapshot: ${getSnapshotReference(snapshot).show}")
}
}
.evalMap {
_.tailRecM {
case (state @ GlobalSnapshotWithState(snapshot, _, _)) :: nextSnapshots
case (state @ GlobalSnapshotWithState(snapshot, _, _, _)) :: nextSnapshots
if configuration.terminalSnapshotOrdinal.forall(snapshot.ordinal <= _) =>
val hasher = HasherSelector[F].getForOrdinal(snapshot.ordinal)
process(state, hasher).as {
Expand Down Expand Up @@ -258,6 +280,7 @@ object SnapshotProcessor {
case class GlobalSnapshotWithState(
snapshot: Hashed[GlobalIncrementalSnapshot],
snapshotInfo: GlobalSnapshotInfo,
prevSnapshotInfo: Option[GlobalSnapshotInfo],
currencySnapshots: Map[Address, NonEmptyList[
Either[Hashed[
CurrencySnapshot
Expand Down
Loading
Loading