diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index eb716ff..c16ab9d 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -17,7 +17,7 @@ jobs: with: path: tessellation repository: Constellation-Labs/tessellation - ref: v2.7.0 + ref: v2.7.1 token: ${{ secrets.GITHUB_TOKEN }} - name: Set up Java diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index dca9442..f8d15f2 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -21,7 +21,7 @@ jobs: with: path: tessellation repository: Constellation-Labs/tessellation - ref: v2.7.0 + ref: v2.7.1 token: ${{ secrets.GITHUB_TOKEN }} - name: Set up Java diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 4861cfc..c1d8ade 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -18,7 +18,7 @@ object Dependencies { val logstash = "7.2" val organizeImports = "0.6.0" val refined = "0.10.1" - val tessellation = "2.7.0" + val tessellation = "2.7.1" val weaver = "0.8.1" } diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 897683c..3e81501 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -1,8 +1,9 @@ snapshotStreaming { lastSnapshotPath = "lastSnapshot.json" lastIncrementalSnapshotPath = "lastIncrementalSnapshot.json" - environment = Testnet - collateral = 25000000000000 + lastCurrencySnapshotsPath = "lastCurrencySnapshots" + environment = Dev + collateral = 0 httpClient { timeout = "120s" idleTimeInPool = "30s" @@ -31,8 +32,8 @@ snapshotStreaming { } } s3 { - bucketRegion = "us-west-1" - bucketName = "constellationlabs-testnet-snapshots" - bucketDir = "snapshot-streaming" + bucketRegion = "" + bucketName = "" + bucketDir = "" } } diff --git a/src/main/scala/org/constellation/snapshotstreaming/Configuration.scala b/src/main/scala/org/constellation/snapshotstreaming/Configuration.scala index 04899ae..7834271 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/Configuration.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/Configuration.scala @@ -33,6 +33,7 @@ class Configuration(sharedConfigReader: SharedConfigReader) { val lastFullSnapshotPath: Path = Path(config.getString("snapshotStreaming.lastSnapshotPath")) val lastIncrementalSnapshotPath: Path = Path(config.getString("snapshotStreaming.lastIncrementalSnapshotPath")) + val lastCurrencySnapshotsPath: Path = Path(config.getString("snapshotStreaming.lastCurrencySnapshotsPath")) val collateral: Amount = Amount(NonNegLong.unsafeFrom(config.getLong("snapshotStreaming.collateral"))) val environment: AppEnvironment = diff --git a/src/main/scala/org/constellation/snapshotstreaming/FileBasedLastFullGlobalSnapshotStorage.scala b/src/main/scala/org/constellation/snapshotstreaming/FileBasedLastFullGlobalSnapshotStorage.scala deleted file mode 100644 index 655aed3..0000000 --- a/src/main/scala/org/constellation/snapshotstreaming/FileBasedLastFullGlobalSnapshotStorage.scala +++ /dev/null @@ -1,58 +0,0 @@ -package org.constellation.snapshotstreaming - -import cats.Applicative -import cats.effect.Async -import cats.syntax.applicativeError._ -import cats.syntax.either._ -import cats.syntax.flatMap._ -import cats.syntax.functor._ -import cats.syntax.option._ - -import org.tessellation.schema.GlobalSnapshot -import org.tessellation.security.Hashed -import org.tessellation.security.signature.Signed - -import fs2.io.file.{Path, _} -import fs2.{Stream, text} -import io.circe.Codec -import io.circe.generic.semiauto.deriveCodec -import io.circe.parser.decode -import io.circe.syntax._ - -trait FileBasedLastFullGlobalSnapshotStorage[F[_]] { - def set(snapshot: Hashed[GlobalSnapshot]): F[Unit] - def get: F[Option[Signed[GlobalSnapshot]]] -} - -object FileBasedLastFullGlobalSnapshotStorage { - - def make[F[_]: Async](path: Path): FileBasedLastFullGlobalSnapshotStorage[F] = - new FileBasedLastFullGlobalSnapshotStorage[F] { - implicit val codec: Codec[Hashed[GlobalSnapshot]] = deriveCodec[Hashed[GlobalSnapshot]] - - def set(snapshot: Hashed[GlobalSnapshot]): F[Unit] = - Stream - .emit(snapshot.asJson.spaces2) - .through(text.utf8.encode) - .through(Files[F].writeAll(path)) - .compile - .drain - - def get: F[Option[Signed[GlobalSnapshot]]] = Files[F] - .readAll(path) - .through(text.utf8.decode) - .compile - .toList - .map(_.mkString) - .map(decode[Hashed[GlobalSnapshot]]) - .flatMap(_.liftTo[F]) - .map(_.signed) - .map(_.some) - .handleErrorWith { - case _: NoSuchFileException => Applicative[F].pure(None) - case e => e.raiseError[F, Option[Signed[GlobalSnapshot]]] - } - - } - -} diff --git a/src/main/scala/org/constellation/snapshotstreaming/FileBasedLastIncrementalGlobalSnapshotStorage.scala b/src/main/scala/org/constellation/snapshotstreaming/FileBasedLastIncrementalGlobalSnapshotStorage.scala deleted file mode 100644 index d97e001..0000000 --- a/src/main/scala/org/constellation/snapshotstreaming/FileBasedLastIncrementalGlobalSnapshotStorage.scala +++ /dev/null @@ -1,128 +0,0 @@ -package org.constellation.snapshotstreaming - -import java.nio.file.NoSuchFileException - -import cats.effect.Async -import cats.effect.std.Semaphore -import cats.syntax.applicative._ -import cats.syntax.applicativeError._ -import cats.syntax.either._ -import cats.syntax.flatMap._ -import cats.syntax.functor._ -import cats.syntax.option._ -import cats.{Applicative, MonadThrow} - -import org.tessellation.kryo.KryoSerializer -import org.tessellation.merkletree.StateProofValidator -import org.tessellation.node.shared.domain.snapshot.Validator.isNextSnapshot -import org.tessellation.node.shared.domain.snapshot.storage.LastSnapshotStorage -import org.tessellation.schema._ -import org.tessellation.schema.height.Height -import org.tessellation.security._ - -import fs2.io.file._ -import fs2.{Stream, text} -import io.circe.Codec -import io.circe.generic.semiauto.deriveCodec -import io.circe.parser.decode -import io.circe.syntax._ - -object FileBasedLastIncrementalGlobalSnapshotStorage { - - def make[F[_]: Async: Files: KryoSerializer: HasherSelector]( - path: Path - ): F[LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo]] = - Semaphore[F](1) - .map(make(path, _)) - - private def make[F[_]: Async: Files: KryoSerializer: HasherSelector]( - path: Path, - semaphore: Semaphore[F] - ): LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo] = - new LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo] { - - implicit val codec: Codec[Hashed[GlobalIncrementalSnapshot]] = deriveCodec[Hashed[GlobalIncrementalSnapshot]] - implicit val snapshotWithInfoCodec: Codec[SnapshotWithState] = deriveCodec[SnapshotWithState] - - def set(snapshot: Hashed[GlobalIncrementalSnapshot], state: GlobalSnapshotInfo): F[Unit] = - semaphore.permit.use { _ => - validateStateProof(snapshot, state) >> - get.flatMap { - case Some(last) if isNextSnapshot(last, snapshot.signed.value) => - Stream - .emit(SnapshotWithState(snapshot, state).asJson.spaces2) - .through(text.utf8.encode) - .through(Files[F].writeAll(path)) - .compile - .drain - case Some(last) => - MonadThrow[F].raiseError[Unit]( - new Throwable( - s"Snapshot is not the next one! last: ${SnapshotReference - .fromHashedSnapshot(last)}, lastHash: ${last.hash}, next: ${SnapshotReference - .fromHashedSnapshot(snapshot)}, prevHash: ${snapshot.signed.value.lastSnapshotHash}" - ) - ) - case None => - MonadThrow[F].raiseError[Unit]( - new Throwable("Previous snapshot not found when setting next global snapshot!") - ) - } - } - - def setInitial(snapshot: Hashed[GlobalIncrementalSnapshot], state: GlobalSnapshotInfo): F[Unit] = - semaphore.permit.use { _ => - validateStateProof(snapshot, state) >> - Stream - .emit(SnapshotWithState(snapshot, state).asJson.spaces2) - .through(text.utf8.encode) - .through(Files[F].writeAll(path, Flags(Flag.Write, Flag.CreateNew))) - .compile - .drain - } - - private def validateStateProof(snapshot: Hashed[GlobalIncrementalSnapshot], state: GlobalSnapshotInfo) = - HasherSelector[F].forOrdinal(snapshot.ordinal) { implicit hasher => - (hasher.getLogic(snapshot.ordinal) match { - case JsonHash => StateProofValidator.validate(snapshot, state) - case KryoHash => StateProofValidator.validate(snapshot, GlobalSnapshotInfoV2.fromGlobalSnapshotInfo(state)) - }) - .map(_.isValid) - .flatMap(new Throwable("State proof doesn't match!").raiseError[F, Unit].unlessA) - } - - def get: F[Option[Hashed[GlobalIncrementalSnapshot]]] = - getSnasphotWithState(_.snapshot) - - def getCombined: F[Option[(Hashed[GlobalIncrementalSnapshot], GlobalSnapshotInfo)]] = - getSnasphotWithState(sws => (sws.snapshot, sws.state)) - - def getCombinedStream: Stream[F, Option[(Hashed[GlobalIncrementalSnapshot], GlobalSnapshotInfo)]] = - ??? - - private def getSnasphotWithState[A](extract: SnapshotWithState => A): F[Option[A]] = Files[F] - .readAll(path) - .through(text.utf8.decode) - .compile - .toList - .map(_.mkString) - .map(decode[SnapshotWithState]) - .flatMap(_.liftTo[F]) - .map(extract) - .map(_.some) - .handleErrorWith { - case _: NoSuchFileException => Applicative[F].pure(None) - case e => e.raiseError[F, Option[A]] - } - - def getOrdinal: F[Option[SnapshotOrdinal]] = - get.map(_.map(_.ordinal)) - - def getHeight: F[Option[Height]] = - get.map(_.map(_.height)) - - } - - case class SnapshotWithState(snapshot: Hashed[GlobalIncrementalSnapshot], state: GlobalSnapshotInfo) - -} diff --git a/src/main/scala/org/constellation/snapshotstreaming/GlobalSnapshotContextService.scala b/src/main/scala/org/constellation/snapshotstreaming/GlobalSnapshotContextService.scala index 2090fda..c3ac305 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/GlobalSnapshotContextService.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/GlobalSnapshotContextService.scala @@ -28,9 +28,6 @@ trait GlobalSnapshotContextService[F[_]] { object GlobalSnapshotContextService { - type CurrencySnapshotWithState = - Either[Signed[CurrencySnapshot], (Signed[CurrencyIncrementalSnapshot], CurrencySnapshotInfo)] - def make[F[_]: Async: KryoSerializer: HasherSelector]( globalSnapshotStateChannelEventsProcessor: GlobalSnapshotStateChannelEventsProcessor[F], globalSnapshotContextFns: GlobalSnapshotContextFunctions[F] @@ -60,7 +57,7 @@ object GlobalSnapshotContextService { } }) } - .map(GlobalSnapshotWithState(artifact, newContext, _)) + .map(GlobalSnapshotWithState(artifact, newContext, context.some, _)) } } diff --git a/src/main/scala/org/constellation/snapshotstreaming/SnapshotProcessor.scala b/src/main/scala/org/constellation/snapshotstreaming/SnapshotProcessor.scala index 0c1ad5e..ab93019 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/SnapshotProcessor.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/SnapshotProcessor.scala @@ -34,7 +34,11 @@ import fs2.Stream import org.constellation.snapshotstreaming.opensearch.OpensearchDAO import org.constellation.snapshotstreaming.opensearch.UpdateRequestBuilder import org.constellation.snapshotstreaming.s3.S3DAO +import org.constellation.snapshotstreaming.storage.FileBasedLastFullSnapshotStorage +import org.constellation.snapshotstreaming.storage.FileBasedLastIncrementalGlobalSnapshotStorage +import org.constellation.snapshotstreaming.storage.LastCurrencySnapshotStorage import org.http4s.ember.client.EmberClientBuilder +import org.tessellation.schema.GlobalSnapshot import org.tessellation.statechannel.StateChannelSnapshotBinary import org.typelevel.log4cats.slf4j.Slf4jLogger @@ -71,11 +75,14 @@ object SnapshotProcessor { configuration.pullLimit.some, configuration.l0Peers.keys.some ) - requestBuilder = UpdateRequestBuilder.make[F](configuration, txHasher: Hasher[F]) + requestBuilder <- Resource.eval(UpdateRequestBuilder.make[F](configuration, txHasher: Hasher[F])) tesselationServices <- Resource.eval( TessellationServices.make[F](configuration) ) - lastFullGlobalSnapshotStorage = FileBasedLastFullGlobalSnapshotStorage.make[F](configuration.lastFullSnapshotPath) + lastFullGlobalSnapshotStorage = FileBasedLastFullSnapshotStorage.make[F, GlobalSnapshot]( + configuration.lastFullSnapshotPath + ) + lastCurrencySnapshotStorage <- Resource.eval(LastCurrencySnapshotStorage.make[F](configuration.lastCurrencySnapshotsPath)) } yield make( configuration, lastIncrementalGlobalSnapshotStorage, @@ -84,10 +91,11 @@ object SnapshotProcessor { s3DAO, requestBuilder, tesselationServices, - lastFullGlobalSnapshotStorage + lastFullGlobalSnapshotStorage, + lastCurrencySnapshotStorage ) - def make[F[_]: Async: KryoSerializer: JsonSerializer: HasherSelector]( + def make[F[_]: Async: HasherSelector]( configuration: Configuration, lastIncrementalGlobalSnapshotStorage: LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo], l0Service: GlobalL0Service[F], @@ -95,7 +103,8 @@ object SnapshotProcessor { s3DAO: S3DAO[F], updateRequestBuilder: UpdateRequestBuilder[F], tessellationServices: TessellationServices[F], - lastFullGlobalSnapshotStorage: FileBasedLastFullGlobalSnapshotStorage[F] + lastFullGlobalSnapshotStorage: FileBasedLastFullSnapshotStorage[F, GlobalSnapshot], + lastCurrencySnapshotStorage: LastCurrencySnapshotStorage[F] ): SnapshotProcessor[F] = new SnapshotProcessor[F] { val logger = Slf4jLogger.getLogger[F] @@ -103,7 +112,7 @@ object SnapshotProcessor { globalSnapshotWithState: GlobalSnapshotWithState, hasher: Hasher[F] ): F[Unit] = - globalSnapshotWithState.pure[F].flatMap { case state @ GlobalSnapshotWithState(snapshot, _, _) => + globalSnapshotWithState.pure[F].flatMap { case state @ GlobalSnapshotWithState(snapshot, _, _, _) => Clock[F].realTime .map(d => new Date(d.toMillis)) .flatMap(updateRequestBuilder.bulkUpdateRequests(state, _, hasher)) @@ -116,8 +125,19 @@ object SnapshotProcessor { .void } + private def setLastCurrencySnapshots( + snapshots: Map[Address, NonEmptyList[ + Either[Hashed[ + CurrencySnapshot + ], (Hashed[CurrencyIncrementalSnapshot], CurrencySnapshotInfo, Signed[StateChannelSnapshotBinary])] + ]], + hasher: Hasher[F] + ): F[Unit] = snapshots.toList.flatTraverse { case (addr, snapshots) => + snapshots.traverse(s => lastCurrencySnapshotStorage.set(addr, s, hasher)).map(_.toList) + }.void + private def process(globalSnapshotWithState: GlobalSnapshotWithState, hasher: Hasher[F]): F[Unit] = - globalSnapshotWithState.pure[F].flatMap { case state @ GlobalSnapshotWithState(snapshot, snapshotInfo, _) => + globalSnapshotWithState.pure[F].flatMap { case state @ GlobalSnapshotWithState(snapshot, snapshotInfo, _, _) => HasherSelector[F] .forOrdinal(snapshot.ordinal) { implicit hasher => logger.info( @@ -135,6 +155,7 @@ object SnapshotProcessor { case Some(last) if Validator.isNextSnapshot(last, snapshot.signed.value) => s3DAO.uploadSnapshot(snapshot) >> prepareAndExecuteBulkUpdate(state, hasher) >> + setLastCurrencySnapshots(globalSnapshotWithState.currencySnapshots, hasher) >> lastIncrementalGlobalSnapshotStorage.set(snapshot, snapshotInfo) case Some(last) => @@ -151,6 +172,7 @@ object SnapshotProcessor { case Some(last) if Validator.isNextSnapshot(last, snapshot.signed.value) => s3DAO.uploadSnapshot(snapshot) >> prepareAndExecuteBulkUpdate(state, hasher) >> + setLastCurrencySnapshots(globalSnapshotWithState.currencySnapshots, hasher) >> lastIncrementalGlobalSnapshotStorage .setInitial(snapshot, snapshotInfo) .onError(e => logger.error(e)(s"Failure setting initial global snapshot!")) @@ -210,7 +232,7 @@ object SnapshotProcessor { .pullGlobalSnapshot(signedFullGlobalSnapshot.value.ordinal.next) .map( _.map(nextSnapshot => - GlobalSnapshotWithState(nextSnapshot, signedFullGlobalSnapshot.value.info, Map.empty) + GlobalSnapshotWithState(nextSnapshot, signedFullGlobalSnapshot.value.info, None, Map.empty) ) ) .map(_.toList) @@ -223,13 +245,13 @@ object SnapshotProcessor { } } .evalTap { snapshots => - snapshots.traverse { case GlobalSnapshotWithState(snapshot, _, _) => + snapshots.traverse { case GlobalSnapshotWithState(snapshot, _, _, _) => logger.info(s"Pulled following global snapshot: ${getSnapshotReference(snapshot).show}") } } .evalMap { _.tailRecM { - case (state @ GlobalSnapshotWithState(snapshot, _, _)) :: nextSnapshots + case (state @ GlobalSnapshotWithState(snapshot, _, _, _)) :: nextSnapshots if configuration.terminalSnapshotOrdinal.forall(snapshot.ordinal <= _) => val hasher = HasherSelector[F].getForOrdinal(snapshot.ordinal) process(state, hasher).as { @@ -258,6 +280,7 @@ object SnapshotProcessor { case class GlobalSnapshotWithState( snapshot: Hashed[GlobalIncrementalSnapshot], snapshotInfo: GlobalSnapshotInfo, + prevSnapshotInfo: Option[GlobalSnapshotInfo], currencySnapshots: Map[Address, NonEmptyList[ Either[Hashed[ CurrencySnapshot diff --git a/src/main/scala/org/constellation/snapshotstreaming/opensearch/UpdateRequestBuilder.scala b/src/main/scala/org/constellation/snapshotstreaming/opensearch/UpdateRequestBuilder.scala index 209b6de..fac66b9 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/opensearch/UpdateRequestBuilder.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/opensearch/UpdateRequestBuilder.scala @@ -14,6 +14,7 @@ import org.constellation.snapshotstreaming.opensearch.mapper.CurrencySnapshotMap import org.constellation.snapshotstreaming.opensearch.mapper.GlobalSnapshotMapper import org.constellation.snapshotstreaming.opensearch.schema._ import org.constellation.snapshotstreaming.Configuration +import org.constellation.snapshotstreaming.storage.LastCurrencySnapshotStorage import org.tessellation.json.JsonSerializer trait UpdateRequestBuilder[F[_]] { @@ -28,39 +29,76 @@ trait UpdateRequestBuilder[F[_]] { object UpdateRequestBuilder { - def make[F[_]: Async: KryoSerializer: JsonSerializer: HasherSelector](config: Configuration, txHasher: Hasher[F]): UpdateRequestBuilder[F] = - make(GlobalSnapshotMapper.make(), CurrencySnapshotMapper.make(), config, txHasher: Hasher[F]) + def make[F[_]: Async: KryoSerializer: JsonSerializer: HasherSelector]( + config: Configuration, + txHasher: Hasher[F] + ): F[UpdateRequestBuilder[F]] = + LastCurrencySnapshotStorage.make(config.lastCurrencySnapshotsPath).map { storage => + make( + GlobalSnapshotMapper.make(), + CurrencySnapshotMapper.make(storage), + config, + txHasher: Hasher[F] + ) + } + - def make[F[_]: Async: HasherSelector](globalMapper: GlobalSnapshotMapper[F], currencyMapper: CurrencySnapshotMapper[F], config: Configuration, txHasher: Hasher[F]): UpdateRequestBuilder[F] = + def make[F[_]: Async]( + globalMapper: GlobalSnapshotMapper[F], + currencyMapper: CurrencySnapshotMapper[F], + config: Configuration, + txHasher: Hasher[F] + ): UpdateRequestBuilder[F] = new UpdateRequestBuilder[F] { def bulkUpdateRequests( globalSnapshotWithState: GlobalSnapshotWithState, timestamp: Date, hasher: Hasher[F] - ): F[Seq[Seq[UpdateRequest]]] = + ): F[ + Seq[Seq[UpdateRequest]] + ] = for { _ <- Async[F].unit - GlobalSnapshotWithState(globalSnapshot, snapshotInfo, currencySnapshots) = globalSnapshotWithState + GlobalSnapshotWithState(globalSnapshot, snapshotInfo, maybePrevSnapshotInfo, currencySnapshots) = + globalSnapshotWithState - mappedGlobalData <- globalMapper.mapGlobalSnapshot(globalSnapshot, snapshotInfo, timestamp, txHasher, hasher) + mappedGlobalData <- globalMapper.mapGlobalSnapshot( + globalSnapshot, + maybePrevSnapshotInfo, + snapshotInfo, + timestamp, + txHasher, + hasher + ) (snapshot, blocks, transactions, balances) = mappedGlobalData mappedCurrencyData <- currencyMapper.mapCurrencySnapshots(currencySnapshots, timestamp, txHasher, hasher) - (currSnapshot, currIncrementalSnapshots, currBlocks, currTransactions, currBalances) = mappedCurrencyData + (currSnapshot, currIncrementalSnapshots, currBlocks, currTransactions, currBalances) = + mappedCurrencyData - } yield updateRequests(snapshot, blocks, transactions, balances, currSnapshot, currIncrementalSnapshots, currBlocks, currTransactions, currBalances).grouped(config.bulkSize).toSeq + } yield updateRequests( + snapshot, + blocks, + transactions, + balances, + currSnapshot, + currIncrementalSnapshots, + currBlocks, + currTransactions, + currBalances + ).grouped(config.bulkSize).toSeq def updateRequests[T]( - snapshot: Snapshot, - blocks: Seq[Block], - transactions: Seq[Transaction], - balances: Seq[AddressBalance], - currencySnapshots: Seq[CurrencyData[Snapshot]], - currencyIncrementalSnapshots: Seq[CurrencyData[CurrencySnapshot]], - currencyBlocks: Seq[CurrencyData[Block]], - currencyTransactions: Seq[CurrencyData[Transaction]], - currencyBalances: Seq[CurrencyData[AddressBalance]] + snapshot: Snapshot, + blocks: Seq[Block], + transactions: Seq[Transaction], + balances: Seq[AddressBalance], + currencySnapshots: Seq[CurrencyData[Snapshot]], + currencyIncrementalSnapshots: Seq[CurrencyData[CurrencySnapshot]], + currencyBlocks: Seq[CurrencyData[Block]], + currencyTransactions: Seq[CurrencyData[Transaction]], + currencyBalances: Seq[CurrencyData[AddressBalance]] ): Seq[UpdateRequest] = Seq(updateById(config.snapshotsIndex, snapshot.hash).docAsUpsert(snapshot)) ++ blocks.map(block => updateById(config.blocksIndex, block.hash).docAsUpsert(block)) ++ diff --git a/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/CurrencyFullSnapshotMapper.scala b/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/CurrencyFullSnapshotMapper.scala index f3c4d77..4e37f5d 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/CurrencyFullSnapshotMapper.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/CurrencyFullSnapshotMapper.scala @@ -13,10 +13,11 @@ import org.tessellation.syntax.sortedCollection._ import eu.timepit.refined.auto._ import io.estatico.newtype.ops._ import org.constellation.snapshotstreaming.opensearch.schema._ +import org.tessellation.currency.schema.currency.CurrencySnapshotInfo import java.util.Date -abstract class CurrencyFullSnapshotMapper[F[_]: Async] extends SnapshotMapper[F, OriginalCurrencySnapshot] { +abstract class CurrencyFullSnapshotMapper[F[_]: Async] extends SnapshotMapper[F, OriginalCurrencySnapshot, CurrencySnapshotInfo] { def mapSnapshot(snapshot: Hashed[OriginalCurrencySnapshot], timestamp: Date, hasher: Hasher[F]): F[Snapshot] } diff --git a/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/CurrencyIncrementalSnapshotMapper.scala b/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/CurrencyIncrementalSnapshotMapper.scala index 01b14f8..25e036d 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/CurrencyIncrementalSnapshotMapper.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/CurrencyIncrementalSnapshotMapper.scala @@ -22,7 +22,7 @@ import org.tessellation.statechannel.StateChannelSnapshotBinary import java.util.Date abstract class CurrencyIncrementalSnapshotMapper[F[_]: Async: JsonSerializer] - extends SnapshotMapper[F, CurrencyIncrementalSnapshot] { + extends SnapshotMapper[F, CurrencyIncrementalSnapshot, CurrencySnapshotInfo] { def mapSnapshot( snapshot: Hashed[CurrencyIncrementalSnapshot], diff --git a/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/CurrencySnapshotMapper.scala b/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/CurrencySnapshotMapper.scala index f74eaa0..4b51b57 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/CurrencySnapshotMapper.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/CurrencySnapshotMapper.scala @@ -1,12 +1,10 @@ package org.constellation.snapshotstreaming.opensearch.mapper import java.util.Date - import cats.data.NonEmptyList +import cats.data.OptionT import cats.effect.Async -import cats.syntax.flatMap._ -import cats.syntax.foldable._ -import cats.syntax.functor._ +import cats.syntax.all._ import org.tessellation.currency.schema.currency.CurrencyIncrementalSnapshot import org.tessellation.currency.schema.currency.CurrencySnapshot import org.tessellation.currency.schema.currency.CurrencySnapshotInfo @@ -15,10 +13,15 @@ import org.tessellation.schema.address.Address import org.tessellation.security.Hashed import org.tessellation.security.Hasher import org.constellation.snapshotstreaming.opensearch.schema._ +import org.constellation.snapshotstreaming.storage.LastCurrencySnapshotStorage import org.tessellation.json.JsonSerializer +import org.tessellation.kryo.KryoSerializer +import org.tessellation.schema.balance.Balance import org.tessellation.security.signature.Signed import org.tessellation.statechannel.StateChannelSnapshotBinary +import scala.collection.immutable.SortedMap + trait CurrencySnapshotMapper[F[_]] { def mapCurrencySnapshots( @@ -44,12 +47,15 @@ trait CurrencySnapshotMapper[F[_]] { object CurrencySnapshotMapper { - def make[F[_]: Async: JsonSerializer](): CurrencySnapshotMapper[F] = - make(CurrencyFullSnapshotMapper.make(), CurrencyIncrementalSnapshotMapper.make()) + def make[F[_]: Async: JsonSerializer: KryoSerializer]( + lastCurrencySnapshotStorage: LastCurrencySnapshotStorage[F] + ): CurrencySnapshotMapper[F] = + make(CurrencyFullSnapshotMapper.make(), CurrencyIncrementalSnapshotMapper.make(), lastCurrencySnapshotStorage) private def make[F[_]: Async]( fullMapper: CurrencyFullSnapshotMapper[F], - incrementalMapper: CurrencyIncrementalSnapshotMapper[F] + incrementalMapper: CurrencyIncrementalSnapshotMapper[F], + lastCurrencySnapshotStorage: LastCurrencySnapshotStorage[F] ): CurrencySnapshotMapper[F] = new CurrencySnapshotMapper[F] { @@ -68,70 +74,83 @@ object CurrencySnapshotMapper { Seq[CurrencyData[Transaction]], Seq[CurrencyData[AddressBalance]] ) - ] = - snapshots.toList.flatMap { case (i, s) => s.toList.map((i, _)) } - .foldLeftM( - ( - Seq.empty[CurrencyData[Snapshot]], - Seq.empty[CurrencyData[OSCurrencySnapshot]], - Seq.empty[CurrencyData[Block]], - Seq.empty[CurrencyData[Transaction]], - Seq.empty[CurrencyData[AddressBalance]] - ) - ) { - case ( - (aggSnap, aggCurrencyIncrementalSnap, aggBlocks, aggTxs, aggBalances), - (identifier, fullOrIncremental) - ) => - val identifierStr = identifier.value.value + ] = snapshots.toList.flatMap { case (i, s) => s.toList.map((i, _)) } + .foldLeftM( + ( + Seq.empty[CurrencyData[Snapshot]], + Seq.empty[CurrencyData[OSCurrencySnapshot]], + Seq.empty[CurrencyData[Block]], + Seq.empty[CurrencyData[Transaction]], + Seq.empty[CurrencyData[AddressBalance]], + Map.empty[Address, SortedMap[Address, Balance]] + ) + ) { + case ( + (aggSnap, aggCurrencyIncrementalSnap, aggBlocks, aggTxs, aggBalances, aggLastBalances), + (identifier, fullOrIncremental) + ) => + val identifierStr = identifier.value.value - fullOrIncremental match { - case Left(full) => - for { - snapshot <- fullMapper - .mapSnapshot(full, timestamp, hasher) - .map(CurrencyData(identifierStr, _)) - blocks <- fullMapper - .mapBlocks(full, timestamp, txHasher, hasher) - .map(_.map(CurrencyData(identifierStr, _))) - transactions <- fullMapper - .mapTransactions(full, timestamp, txHasher, hasher) - .map(_.map(CurrencyData(identifierStr, _))) - balances = fullMapper - .mapBalances(full, full.info.balances, timestamp) - .map(CurrencyData(identifierStr, _)) - } yield ( - aggSnap :+ snapshot, - aggCurrencyIncrementalSnap, - aggBlocks ++ blocks, - aggTxs ++ transactions, - aggBalances ++ balances - ) + fullOrIncremental match { + case Left(full) => + for { + snapshot <- fullMapper + .mapSnapshot(full, timestamp, hasher) + .map(CurrencyData(identifierStr, _)) + blocks <- fullMapper + .mapBlocks(full, timestamp, txHasher, hasher) + .map(_.map(CurrencyData(identifierStr, _))) + transactions <- fullMapper + .mapTransactions(full, timestamp, txHasher, hasher) + .map(_.map(CurrencyData(identifierStr, _))) + balances = fullMapper + .mapBalances(full, full.info.balances, timestamp) + .map(CurrencyData(identifierStr, _)) + } yield ( + aggSnap :+ snapshot, + aggCurrencyIncrementalSnap, + aggBlocks ++ blocks, + aggTxs ++ transactions, + aggBalances ++ balances, + aggLastBalances + (identifier -> full.info.balances) + ) - case Right((incremental, info, binary)) => - for { - snapshot <- incrementalMapper - .mapSnapshot(incremental, binary, info, timestamp, hasher) - .map(CurrencyData(identifierStr, _)) - blocks <- incrementalMapper - .mapBlocks(incremental, timestamp, txHasher, hasher) - .map(_.map(CurrencyData(identifierStr, _))) - transactions <- incrementalMapper - .mapTransactions(incremental, timestamp, txHasher, hasher) - .map(_.map(CurrencyData(identifierStr, _))) - filteredBalances = incrementalMapper.snapshotReferredBalancesInfo(incremental, info) - balances = incrementalMapper - .mapBalances(incremental, filteredBalances, timestamp) - .map(CurrencyData(identifierStr, _)) - } yield ( - aggSnap, - aggCurrencyIncrementalSnap :+ snapshot, - aggBlocks ++ blocks, - aggTxs ++ transactions, - aggBalances ++ balances + case Right((incremental, info, binary)) => + for { + snapshot <- incrementalMapper + .mapSnapshot(incremental, binary, info, timestamp, hasher) + .map(CurrencyData(identifierStr, _)) + blocks <- incrementalMapper + .mapBlocks(incremental, timestamp, txHasher, hasher) + .map(_.map(CurrencyData(identifierStr, _))) + transactions <- incrementalMapper + .mapTransactions(incremental, timestamp, txHasher, hasher) + .map(_.map(CurrencyData(identifierStr, _))) + prevBalances <- OptionT + .fromOption(aggLastBalances.get(identifier)) + .orElseF(lastCurrencySnapshotStorage.getLastBalances(identifier, hasher)) + .value + filteredBalances = incrementalMapper.balanceDiff( + incremental, + prevBalances, + info ) - } - } + balances = incrementalMapper + .mapBalances(incremental, filteredBalances, timestamp) + .map(CurrencyData(identifierStr, _)) + } yield ( + aggSnap, + aggCurrencyIncrementalSnap :+ snapshot, + aggBlocks ++ blocks, + aggTxs ++ transactions, + aggBalances ++ balances, + aggLastBalances + (identifier -> filteredBalances) + ) + } + } + .map { case (aggSnap, aggCurrencyIncrementalSnap, aggBlocks, aggTxs, aggBalances, _) => + (aggSnap, aggCurrencyIncrementalSnap, aggBlocks, aggTxs, aggBalances) + } } diff --git a/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/GlobalSnapshotMapper.scala b/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/GlobalSnapshotMapper.scala index ff99076..70c210b 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/GlobalSnapshotMapper.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/GlobalSnapshotMapper.scala @@ -20,12 +20,13 @@ import org.tessellation.security.Hasher import org.tessellation.security.HasherSelector abstract class GlobalSnapshotMapper[F[_]: Async: KryoSerializer: HasherSelector] - extends SnapshotMapper[F, GlobalIncrementalSnapshot] { + extends SnapshotMapper[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo] { def mapSnapshot(snapshot: Hashed[GlobalIncrementalSnapshot], timestamp: Date, hasher: Hasher[F]): F[Snapshot] def mapGlobalSnapshot( globalSnapshot: Hashed[GlobalIncrementalSnapshot], + prevInfo: Option[GlobalSnapshotInfo], info: GlobalSnapshotInfo, timestamp: Date, txHasher: Hasher[F], @@ -36,9 +37,10 @@ abstract class GlobalSnapshotMapper[F[_]: Async: KryoSerializer: HasherSelector] snapshot <- mapSnapshot(globalSnapshot, timestamp, hasher) blocks <- mapBlocks(globalSnapshot, timestamp, txHasher, hasher) transactions <- mapTransactions(globalSnapshot, timestamp, txHasher, hasher) - filteredBalances = snapshotReferredBalancesInfo( + filteredBalances = balanceDiff( globalSnapshot.signed.value, - info + prevInfo.map(prev => prev.balances), + info, ) balances = mapBalances(globalSnapshot, filteredBalances, timestamp) } yield (snapshot, blocks, transactions, balances) diff --git a/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/SnapshotMapper.scala b/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/SnapshotMapper.scala index b205cb1..a507951 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/SnapshotMapper.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/opensearch/mapper/SnapshotMapper.scala @@ -2,9 +2,7 @@ package org.constellation.snapshotstreaming.opensearch.mapper import java.util.Date import cats.effect.Async -import cats.syntax.flatMap._ -import cats.syntax.functor._ -import cats.syntax.traverse._ +import cats.syntax.all._ import scala.collection.immutable.SortedMap import scala.collection.immutable.SortedSet @@ -25,7 +23,7 @@ import org.constellation.snapshotstreaming.opensearch.schema._ case class SnapshotReferredAddresses(source: Set[Address], destination: Set[Address]) -abstract class SnapshotMapper[F[_]: Async, S <: OriginalSnapshot] { +abstract class SnapshotMapper[F[_]: Async, S <: OriginalSnapshot, SI <: SnapshotInfo[_]] { def fetchRewards(snapshot: S): SortedSet[OriginalRewardTransaction] @@ -47,7 +45,13 @@ abstract class SnapshotMapper[F[_]: Async, S <: OriginalSnapshot] { .sequence } yield blocks - private def mapBlock(snapshotHash: String, snapshotOrdinal: Long, timestamp: Date, txHasher: Hasher[F], hasher: Hasher[F])( + private def mapBlock( + snapshotHash: String, + snapshotOrdinal: Long, + timestamp: Date, + txHasher: Hasher[F], + hasher: Hasher[F] + )( block: Signed[OriginalBlock] ): F[Block] = for { @@ -118,24 +122,32 @@ abstract class SnapshotMapper[F[_]: Async, S <: OriginalSnapshot] { private def mapTransactionRef(nodeRef: OriginalTransactionReference): TransactionReference = TransactionReference(nodeRef.hash.value, nodeRef.ordinal.value) - def snapshotReferredBalancesInfo( + def balanceDiff( snapshot: S, - info: SnapshotInfo[_] - ): SortedMap[Address, Balance] = { - val snapshotReferredAddresses = extractSnapshotReferredAddresses(snapshot) - val bothAddresses = snapshotReferredAddresses.source ++ snapshotReferredAddresses.destination - val rewardsAddresses = fetchRewards(snapshot).toList.map(_.destination) - val addressesToKeep = bothAddresses ++ rewardsAddresses - val filteredBalances = info.balances.filter { case (address, _) => addressesToKeep.contains(address) } - - val srcTransactions = snapshotReferredAddresses.source - val setZeroBalances = - (srcTransactions.toSet -- info.balances.keys.toSet) - .map(address => address -> Balance(0L)) - .toSortedMap - - filteredBalances ++ setZeroBalances - } + prevBalances: Option[SortedMap[Address, Balance]], + info: SnapshotInfo[_], + ): SortedMap[Address, Balance] = + prevBalances match { + case Some(prev) => + val changed = info.balances.filterNot { case (address, balance) => + prev.get(address).exists(_ === balance) + } + + /* NOTE: SnapshotInfo calculation optimization gets rid of addresses that have empty balances. + It is fine for node but in snapshot-streaming we need to keep such addresses set to Balance.empty. + We do that by finding addresses that are missing in info but are referenced in the transactions. + */ + val explicitlyZeroed = { + val srcTransactions = extractSnapshotReferredAddresses(snapshot).source + (srcTransactions -- info.balances.keys) + .map(address => address -> Balance.empty) + .toSortedMap + } + + changed ++ explicitlyZeroed + case None => + info.balances + } def extractSnapshotReferredAddresses(snapshot: S): SnapshotReferredAddresses diff --git a/src/main/scala/org/constellation/snapshotstreaming/storage/BaseLastIncrementalSnapshotStorage.scala b/src/main/scala/org/constellation/snapshotstreaming/storage/BaseLastIncrementalSnapshotStorage.scala new file mode 100644 index 0000000..5039c85 --- /dev/null +++ b/src/main/scala/org/constellation/snapshotstreaming/storage/BaseLastIncrementalSnapshotStorage.scala @@ -0,0 +1,98 @@ +package org.constellation.snapshotstreaming.storage + +import java.nio.file.NoSuchFileException +import cats.effect.Async +import cats.syntax.all._ +import cats.Applicative +import cats.MonadThrow +import cats.effect.std.Mutex +import org.tessellation.node.shared.domain.snapshot.Validator.isNextSnapshot +import org.tessellation.node.shared.domain.snapshot.storage.LastSnapshotStorage +import org.tessellation.schema._ +import org.tessellation.schema.height.Height +import org.tessellation.security._ +import fs2.io.file._ +import fs2.Stream +import fs2.text +import io.circe.Codec +import io.circe.parser.decode +import io.circe.syntax._ +import org.tessellation.schema.snapshot.IncrementalSnapshot +import org.tessellation.schema.snapshot.SnapshotInfo +import org.tessellation.schema.snapshot.StateProof + +abstract class BaseLastIncrementalSnapshotStorage[F[_]: Async, P <: StateProof, S <: IncrementalSnapshot[P], SI <: SnapshotInfo[P]](mutex: Mutex[F], path: Path)(implicit codec: Codec[SnapshotWithState[S, SI]]) extends LastSnapshotStorage[F, S, SI] { + + protected def validateStateProof(snapshot: Hashed[S], state: SI): F[Unit] + + def set(snapshot: Hashed[S], state: SI): F[Unit] = + mutex.lock.surround { + validateStateProof(snapshot, state) >> + get.flatMap { + case Some(last) if isNextSnapshot(last, snapshot.signed.value) => + Stream + .emit(SnapshotWithState(snapshot, state).asJson.spaces2) + .through(text.utf8.encode) + .through(Files[F].writeAll(path)) + .compile + .drain + case Some(last) => + MonadThrow[F].raiseError[Unit]( + new IllegalStateException( + s"Snapshot is not the next one! last: ${SnapshotReference + .fromHashedSnapshot(last)}, lastHash: ${last.hash}, next: ${SnapshotReference + .fromHashedSnapshot(snapshot)}, prevHash: ${snapshot.signed.value.lastSnapshotHash}" + ) + ) + case None => + MonadThrow[F].raiseError[Unit]( + new IllegalStateException("Previous snapshot not found when setting next global snapshot!") + ) + } + } + + def setInitial(snapshot: Hashed[S], state: SI): F[Unit] = + mutex.lock.surround { + validateStateProof(snapshot, state) >> + Stream + .emit(SnapshotWithState(snapshot, state).asJson.spaces2) + .through(text.utf8.encode) + .through(Files[F].writeAll(path, Flags(Flag.Write, Flag.CreateNew))) + .compile + .drain + } + + def get: F[Option[Hashed[S]]] = + getSnapshotWithState(_.snapshot) + + def getCombined: F[Option[(Hashed[S], SI)]] = + getSnapshotWithState(sws => (sws.snapshot, sws.state)) + + def getCombinedStream: Stream[F, Option[(Hashed[S], SI)]] = + ??? + + private def getSnapshotWithState[A](extract: SnapshotWithState[S, SI] => A): F[Option[A]] = Files[F] + .readAll(path) + .through(text.utf8.decode) + .compile + .toList + .map(_.mkString) + .map(decode[SnapshotWithState[S, SI]]) + .flatMap(_.liftTo[F]) + .map(extract) + .map(_.some) + .handleErrorWith { + case _: NoSuchFileException => Applicative[F].pure(None) + case e => e.raiseError[F, Option[A]] + } + + def getOrdinal: F[Option[SnapshotOrdinal]] = + get.map(_.map(_.ordinal)) + + def getHeight: F[Option[Height]] = + get.map(_.map(_.height)) +} + +object BaseLastIncrementalSnapshotStorage {} + +case class SnapshotWithState[S, SI](snapshot: Hashed[S], state: SI) diff --git a/src/main/scala/org/constellation/snapshotstreaming/storage/FileBasedLastFullSnapshotStorage.scala b/src/main/scala/org/constellation/snapshotstreaming/storage/FileBasedLastFullSnapshotStorage.scala new file mode 100644 index 0000000..0fb1614 --- /dev/null +++ b/src/main/scala/org/constellation/snapshotstreaming/storage/FileBasedLastFullSnapshotStorage.scala @@ -0,0 +1,61 @@ +package org.constellation.snapshotstreaming.storage + +import cats.Applicative +import cats.effect.Async +import cats.syntax.applicativeError._ +import cats.syntax.either._ +import cats.syntax.flatMap._ +import cats.syntax.functor._ +import cats.syntax.option._ +import org.tessellation.security.Hashed +import org.tessellation.security.signature.Signed +import fs2.io.file.Path +import fs2.io.file._ +import fs2.Stream +import fs2.text +import io.circe.parser.decode +import io.circe.syntax._ +import io.circe.Decoder +import io.circe.Encoder +import io.circe.generic.semiauto.deriveDecoder +import io.circe.generic.semiauto.deriveEncoder +import org.tessellation.schema.snapshot.Snapshot + +trait FileBasedLastFullSnapshotStorage[F[_], S <: Snapshot] { + def set(snapshot: Hashed[S]): F[Unit] + def get: F[Option[Signed[S]]] +} + +object FileBasedLastFullSnapshotStorage { + + def make[F[_]: Async, S <: Snapshot: Decoder: Encoder](path: Path): FileBasedLastFullSnapshotStorage[F, S] = + new FileBasedLastFullSnapshotStorage[F, S] { + implicit val decoder: Decoder[Hashed[S]] = deriveDecoder[Hashed[S]] + implicit val encoder: Encoder[Hashed[S]] = deriveEncoder[Hashed[S]] + + def set(snapshot: Hashed[S]): F[Unit] = + Stream + .emit(snapshot.asJson.spaces2) + .through(text.utf8.encode) + .through(Files[F].writeAll(path)) + .compile + .drain + + def get: F[Option[Signed[S]]] = Files[F] + .readAll(path) + .through(text.utf8.decode) + .compile + .toList + .map(_.mkString) + .map(decode[Hashed[S]]) + .flatMap(_.liftTo[F]) + .map(_.signed) + .map(_.some) + .handleErrorWith { + case _: NoSuchFileException => Applicative[F].pure(None) + case e => e.raiseError[F, Option[Signed[S]]] + } + + } + +} diff --git a/src/main/scala/org/constellation/snapshotstreaming/storage/FileBasedLastIncrementalCurrencySnapshotStorage.scala b/src/main/scala/org/constellation/snapshotstreaming/storage/FileBasedLastIncrementalCurrencySnapshotStorage.scala new file mode 100644 index 0000000..2d68f67 --- /dev/null +++ b/src/main/scala/org/constellation/snapshotstreaming/storage/FileBasedLastIncrementalCurrencySnapshotStorage.scala @@ -0,0 +1,42 @@ +package org.constellation.snapshotstreaming.storage + +import cats.effect.Async +import cats.effect.std.Mutex +import cats.syntax.applicative._ +import cats.syntax.applicativeError._ +import cats.syntax.flatMap._ +import cats.syntax.functor._ +import org.tessellation.security._ +import fs2.io.file._ +import io.circe.Codec +import io.circe.generic.semiauto.deriveCodec +import org.tessellation.currency.schema.currency.CurrencyIncrementalSnapshot +import org.tessellation.currency.schema.currency.CurrencySnapshotInfo +import org.tessellation.currency.schema.currency.CurrencySnapshotStateProof +import org.tessellation.merkletree.StateProofValidator + +class FileBasedLastIncrementalCurrencySnapshotStorage[F[_]: Async: Hasher](mutex: Mutex[F], path: Path)(implicit + codec: Codec[SnapshotWithState[CurrencyIncrementalSnapshot, CurrencySnapshotInfo]] +) extends BaseLastIncrementalSnapshotStorage[F, CurrencySnapshotStateProof, CurrencyIncrementalSnapshot, CurrencySnapshotInfo](mutex, path) { + + def validateStateProof(snapshot: Hashed[CurrencyIncrementalSnapshot], state: CurrencySnapshotInfo): F[Unit] = + StateProofValidator + .validate(snapshot, state) + .flatMap(Async[F].fromValidated) + +} + +object FileBasedLastIncrementalCurrencySnapshotStorage { + + def make[F[_]: Async: Hasher](mutex: Mutex[F], path: Path): FileBasedLastIncrementalCurrencySnapshotStorage[F] = { + implicit val codec: Codec[Hashed[CurrencyIncrementalSnapshot]] = deriveCodec[Hashed[CurrencyIncrementalSnapshot]] + implicit val snapshotWithInfoCodec: Codec[SnapshotWithState[CurrencyIncrementalSnapshot, CurrencySnapshotInfo]] = + deriveCodec[SnapshotWithState[CurrencyIncrementalSnapshot, CurrencySnapshotInfo]] + new FileBasedLastIncrementalCurrencySnapshotStorage[F](mutex, path) + } + + + def make[F[_]: Async: Hasher](path: Path): F[FileBasedLastIncrementalCurrencySnapshotStorage[F]] = + Mutex[F].map(make(_, path)) + +} diff --git a/src/main/scala/org/constellation/snapshotstreaming/storage/FileBasedLastIncrementalGlobalSnapshotStorage.scala b/src/main/scala/org/constellation/snapshotstreaming/storage/FileBasedLastIncrementalGlobalSnapshotStorage.scala new file mode 100644 index 0000000..545864d --- /dev/null +++ b/src/main/scala/org/constellation/snapshotstreaming/storage/FileBasedLastIncrementalGlobalSnapshotStorage.scala @@ -0,0 +1,49 @@ +package org.constellation.snapshotstreaming.storage + +import cats.effect.Async +import cats.effect.std.Mutex +import cats.syntax.applicative._ +import cats.syntax.applicativeError._ +import cats.syntax.flatMap._ +import cats.syntax.functor._ +import org.tessellation.kryo.KryoSerializer +import org.tessellation.merkletree.StateProofValidator +import org.tessellation.node.shared.domain.snapshot.storage.LastSnapshotStorage +import org.tessellation.schema._ +import org.tessellation.security._ +import fs2.io.file._ +import io.circe.Codec +import io.circe.generic.semiauto.deriveCodec + +class FileBasedLastIncrementalGlobalSnapshotStorage[F[_]: Async: HasherSelector](mutex: Mutex[F], path: Path)( + implicit codec: Codec[SnapshotWithState[GlobalIncrementalSnapshot, GlobalSnapshotInfo]] +) extends BaseLastIncrementalSnapshotStorage[ + F, + GlobalSnapshotStateProof, + GlobalIncrementalSnapshot, + GlobalSnapshotInfo + ](mutex, path) { + + protected def validateStateProof(snapshot: Hashed[GlobalIncrementalSnapshot], state: GlobalSnapshotInfo): F[Unit] = + HasherSelector[F].forOrdinal(snapshot.ordinal) { implicit hasher => + (hasher.getLogic(snapshot.ordinal) match { + case JsonHash => StateProofValidator.validate(snapshot, state) + case KryoHash => StateProofValidator.validate(snapshot, GlobalSnapshotInfoV2.fromGlobalSnapshotInfo(state)) + }).flatMap(Async[F].fromValidated) + } + +} + +object FileBasedLastIncrementalGlobalSnapshotStorage { + + def make[F[_]: Async: Files: KryoSerializer: HasherSelector]( + path: Path + ): F[LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo]] = + Mutex[F].map { mutex => + implicit val codec: Codec[Hashed[GlobalIncrementalSnapshot]] = deriveCodec[Hashed[GlobalIncrementalSnapshot]] + implicit val snapshotWithInfoCodec: Codec[SnapshotWithState[GlobalIncrementalSnapshot, GlobalSnapshotInfo]] = + deriveCodec[SnapshotWithState[GlobalIncrementalSnapshot, GlobalSnapshotInfo]] + new FileBasedLastIncrementalGlobalSnapshotStorage[F](mutex, path) + } + +} diff --git a/src/main/scala/org/constellation/snapshotstreaming/storage/LastCurrencySnapshotStorage.scala b/src/main/scala/org/constellation/snapshotstreaming/storage/LastCurrencySnapshotStorage.scala new file mode 100644 index 0000000..ce084b0 --- /dev/null +++ b/src/main/scala/org/constellation/snapshotstreaming/storage/LastCurrencySnapshotStorage.scala @@ -0,0 +1,99 @@ +package org.constellation.snapshotstreaming.storage + +import cats.effect.Async +import cats.syntax.all._ +import cats.Applicative +import cats.effect.std.Mutex +import fs2.io.file.Files +import fs2.io.file.Path +import org.tessellation.currency.schema.currency.CurrencyIncrementalSnapshot +import org.tessellation.currency.schema.currency.CurrencySnapshot +import org.tessellation.currency.schema.currency.CurrencySnapshotInfo +import org.tessellation.kryo.KryoSerializer +import org.tessellation.schema.address.Address +import org.tessellation.schema.balance.Balance +import org.tessellation.security.signature.Signed +import org.tessellation.security.Hashed +import org.tessellation.security.Hasher +import org.tessellation.statechannel.StateChannelSnapshotBinary + +import scala.collection.immutable.SortedMap + +trait LastCurrencySnapshotStorage[F[_]] { + + def set( + identifier: Address, + snapshot: Either[Hashed[ + CurrencySnapshot + ], (Hashed[CurrencyIncrementalSnapshot], CurrencySnapshotInfo, Signed[StateChannelSnapshotBinary])], + hasher: Hasher[F] + ): F[Unit] + + def getLastBalances(identifier: Address, hasher: Hasher[F]): F[Option[SortedMap[Address, Balance]]] + +} + +object LastCurrencySnapshotStorage { + + def make[F[_]: Async: KryoSerializer](basePath: Path): F[LastCurrencySnapshotStorage[F]] = + Mutex[F].map(make(_, basePath)) + + def make[F[_]: Async: KryoSerializer]( + mutex: Mutex[F], + basePath: Path + ): LastCurrencySnapshotStorage[F] = new LastCurrencySnapshotStorage[F] { + + private def metagraphPath(identifier: Address) = basePath / identifier.toString + + private def lastSnapshotPath(identifier: Address) = metagraphPath(identifier) / "lastSnapshot.json" + + private def lastIncrementalSnapshotPath(identifier: Address) = + metagraphPath(identifier) / "lastIncrementalSnapshot.json" + + private def createDirectoryIfDoesntExist(identifier: Address): F[Unit] = Files[F] + .exists(metagraphPath(identifier)) + .ifM( + Applicative[F].unit, + Files[F].createDirectories(metagraphPath(identifier)) + ) + + def set( + identifier: Address, + snapshot: Either[Hashed[ + CurrencySnapshot + ], (Hashed[CurrencyIncrementalSnapshot], CurrencySnapshotInfo, Signed[StateChannelSnapshotBinary])], + hasher: Hasher[F] + ): F[Unit] = + createDirectoryIfDoesntExist(identifier).flatMap { _ => + snapshot match { + case Left(full) => + FileBasedLastFullSnapshotStorage.make[F, CurrencySnapshot](lastSnapshotPath(identifier)).set(full) + case Right((incremental, info, _)) => + implicit val hs: Hasher[F] = hasher + val storage = + FileBasedLastIncrementalCurrencySnapshotStorage.make(mutex, lastIncrementalSnapshotPath(identifier)) + storage.get.flatMap { + case Some(_) => storage.set(incremental, info) + case None => storage.setInitial(incremental, info) + } + } + } + + def getLastBalances(identifier: Address, hasher: Hasher[F]): F[Option[SortedMap[Address, Balance]]] = { + implicit val hs: Hasher[F] = hasher + FileBasedLastIncrementalCurrencySnapshotStorage + .make(lastIncrementalSnapshotPath(identifier)) + .flatMap(_.getCombined) + .flatMap { + case Some((_, info)) => info.balances.some.pure[F] + case None => + FileBasedLastFullSnapshotStorage + .make[F, CurrencySnapshot](lastSnapshotPath(identifier)) + .get + .map(_.map(_.info.balances)) + } + } + + } + +} diff --git a/src/test/scala/org/constellation/snapshotstreaming/CurrencySnapshotMapperSuite.scala b/src/test/scala/org/constellation/snapshotstreaming/CurrencySnapshotMapperSuite.scala index 03e171d..323fbfd 100644 --- a/src/test/scala/org/constellation/snapshotstreaming/CurrencySnapshotMapperSuite.scala +++ b/src/test/scala/org/constellation/snapshotstreaming/CurrencySnapshotMapperSuite.scala @@ -10,8 +10,6 @@ import scala.collection.immutable.SortedMap import scala.collection.immutable.SortedSet import org.tessellation.ext.cats.effect.ResourceIO import org.tessellation.kryo.KryoSerializer -import org.tessellation.schema.address.Address -import org.tessellation.schema.balance.Balance import org.tessellation.schema.transaction._ import org.tessellation.node.shared.nodeSharedKryoRegistrar import org.tessellation.security.hash.Hash @@ -21,6 +19,7 @@ import org.tessellation.security.SecurityProvider import org.tessellation.shared.sharedKryoRegistrar import org.tessellation.syntax.sortedCollection._ import eu.timepit.refined.auto._ +import org.constellation.snapshotstreaming.data.applyTransactions import org.constellation.snapshotstreaming.data.createBalances import org.constellation.snapshotstreaming.data.createBlocksWithTransactions import org.constellation.snapshotstreaming.data.createRewards @@ -34,6 +33,7 @@ import org.tessellation.currency.schema.currency.CurrencySnapshotInfo import weaver.MutableIOSuite import org.tessellation.security.Hasher import org.tessellation.json.JsonSerializer +import org.tessellation.schema.balance.Balance import org.tessellation.security.Hashed import org.tessellation.security.HasherSelector @@ -74,194 +74,171 @@ object CurrencySnapshotMapperSuite extends MutableIOSuite { ): IO[Hashed[CurrencyIncrementalSnapshot]] = incrementalCurrencySnapshot(100L, 10L, 20L, Hash("abc"), Hash("def")) - private val address5 = Address("DAG2AUdecqFwEGcgAcH1ac2wrsg8acrgGwrQojzw") - private val address6 = Address("DAG2EUdecqFwEGcgAcH1ac2wrsg8acrgGwrQivxq") - private val address7 = Address("DAG2EUdecqFwEGcgAcH1ac2wrsg8acrgGwrQitrs") - - test("set balance to 0 for source when not in info") { res => - implicit val (h, ks, js, sp, key1, key2, key3, key4) = res + test("explicitly sets balance to 0 for addressees missing in in info") { res => + implicit val (h, ks, js, sp, key1, key2, key3, _) = res val address1 = key1.getPublic.toAddress val address2 = key2.getPublic.toAddress - val address4 = key4.getPublic.toAddress - val totalInfo = emptyCurrencySnapshotInfo + val address3 = key3.getPublic.toAddress + val initialBalances = createBalances(address1, address2) - val rewards = createRewards(address1, address2, address7) for { - txn1 <- createTxn(address1, key1, address2) - txn2 <- createTxn(address2, key2, address1) - txn3 <- createTxn(address2, key2, address4) + txn1 <- createTxn(address1, key1, address2, TransactionAmount(1000L)) + txn2 <- createTxn(address2, key2, address3, TransactionAmount(2000L)) blocks <- createBlocksWithTransactions( key1, - NonEmptySet.fromSetUnsafe(SortedSet(txn1, txn2)), - NonEmptySet.fromSetUnsafe(SortedSet(txn3)) + NonEmptySet.fromSetUnsafe(SortedSet(txn1)), + NonEmptySet.fromSetUnsafe(SortedSet(txn2)) ) - snapshot <- incrementalCurrencySnapshot[IO]( - 100L, - 10L, - 20L, - Hash("abc"), - Hash("def"), - totalInfo, - blocks, - rewards = rewards, - feeTransactions = None - ) - - result = CurrencyIncrementalSnapshotMapper.make().snapshotReferredBalancesInfo(snapshot, totalInfo) - } yield expect.same( - result, - SortedMap(address1 -> Balance(0L), address2 -> Balance(0L)) - ) - } + updatedBalances = applyTransactions( + initialBalances, + blocks.flatMap(_.block.transactions.toList).toList, + List.empty + ) - test("leave balances for addresses from transactions") { res => - implicit val (h, ks, js, sp, key1, key2, key3, key4) = res - val address1 = key1.getPublic.toAddress - val address2 = key2.getPublic.toAddress - val balances = createBalances(address1, address2, address5, address6, address7) - val totalInfo = CurrencySnapshotInfo(SortedMap.empty, balances, None, None) + updatedInfo = CurrencySnapshotInfo(SortedMap.empty, updatedBalances, None, None) - for { - txn1 <- createTxn(address1, key1, address2) - txn2 <- createTxn(address2, key2, address1) - txn3 <- createTxn(address2, key2, address7) - blocks <- createBlocksWithTransactions( - key1, - NonEmptySet.fromSetUnsafe(SortedSet(txn1, txn2)), - NonEmptySet.fromSetUnsafe(SortedSet(txn3)) - ) snapshot <- incrementalCurrencySnapshot[IO]( 100L, 10L, 20L, Hash("abc"), Hash("def"), - totalInfo, + updatedInfo, blocks, feeTransactions = None ) - result = CurrencyIncrementalSnapshotMapper.make().snapshotReferredBalancesInfo(snapshot, totalInfo) - expectedBalances = createBalances(address1, address2, address7) - } yield expect.same(result, expectedBalances) + result = CurrencyIncrementalSnapshotMapper + .make() + .balanceDiff(snapshot, initialBalances.some, emptyCurrencySnapshotInfo) + } yield expect.all( + initialBalances(address1) === Balance(1000L), + initialBalances(address2) === Balance(1000L), + !initialBalances.contains(address3), + !updatedBalances.contains(address1), + !updatedBalances.contains(address2), + updatedBalances(address3) === Balance(2000L), + result === SortedMap(address1 -> Balance(0L), address2 -> Balance(0L)) + ) } - test( - "leave balances for addresses from transactions, but not set zero for destinations not in balances " - ) { res => - implicit val (h, ks, js, sp, key1, key2, key3, key4) = res + test("removes addresses that have transactions but the result balance hasn't changed") { res => + implicit val (h, ks, js, sp, key1, key2, _, _) = res val address1 = key1.getPublic.toAddress val address2 = key2.getPublic.toAddress - - val balances = createBalances(address1, address5, address6) - val totalInfo = CurrencySnapshotInfo(SortedMap.empty, balances, None, None) + val initialBalances = createBalances(address1, address2) for { - txn1 <- createTxn(address1, key1, address2) - txn2 <- createTxn(address2, key2, address1) - txn3 <- createTxn(address2, key2, address7) - + txn1 <- createTxn(address1, key1, address2, TransactionAmount(1L)) + txn2 <- createTxn(address2, key2, address1, TransactionAmount(1L)) blocks <- createBlocksWithTransactions( key1, NonEmptySet.fromSetUnsafe(SortedSet(txn1, txn2)), - NonEmptySet.fromSetUnsafe(SortedSet(txn3)) ) + updatedBalances = applyTransactions( + initialBalances, + blocks.flatMap(_.block.transactions.toList).toList, + List.empty + ) + updatedInfo = CurrencySnapshotInfo(SortedMap.empty, updatedBalances, None, None) + snapshot <- incrementalCurrencySnapshot[IO]( 100L, 10L, 20L, Hash("abc"), Hash("def"), - totalInfo, + updatedInfo, blocks, feeTransactions = None ) - result = CurrencyIncrementalSnapshotMapper.make().snapshotReferredBalancesInfo(snapshot, totalInfo) - expectedBalances = SortedMap( - address1 -> Balance(1000L), - address2 -> Balance(0L), - ) - } yield expect.same(result, expectedBalances) + result = CurrencyIncrementalSnapshotMapper + .make() + .balanceDiff(snapshot, initialBalances.some, updatedInfo) + } yield expect.same( + result, + updatedBalances - address1 - address2 + ) } - test( - "leave balances for addresses from transactions, but not set zero for destinations not in balances " - ) { res => + test("leaves addresses that changed") { res => implicit val (h, ks, js, sp, key1, key2, key3, key4) = res val address1 = key1.getPublic.toAddress val address2 = key2.getPublic.toAddress val address3 = key3.getPublic.toAddress val address4 = key4.getPublic.toAddress - val balances = createBalances(address1, address3, address6) - val totalInfo = CurrencySnapshotInfo(SortedMap.empty, balances, None, None) + val initialBalances = createBalances(address1, address2, address3, address4) for { - txn1 <- createTxn(address1, key1, address2) - txn2 <- createTxn(address2, key2, address1) - txn3 <- createTxn(address2, key2, address5) + txn1 <- createTxn(address1, key1, address2, TransactionAmount(3L)) + txn2 <- createTxn(address2, key2, address1, TransactionAmount(5L)) + txn3 <- createTxn(address2, key2, address3, TransactionAmount(13L)) blocks <- createBlocksWithTransactions( key1, NonEmptySet.fromSetUnsafe(SortedSet(txn1, txn2)), NonEmptySet.fromSetUnsafe(SortedSet(txn3)) ) - snapshot <- incrementalCurrencySnapshot[IO](100L, 10L, 20L, Hash("abc"), Hash("def"), totalInfo, blocks) - - result = CurrencyIncrementalSnapshotMapper.make().snapshotReferredBalancesInfo(snapshot, totalInfo) - expectedBalances = SortedMap(address1 -> Balance(1000L), address2 -> Balance(0L)) - } yield expect.same(result, expectedBalances) - } - - test("leave balances for addresses from rewards") { res => - implicit val (h, ks, js, sp, key1, key2, key3, key4) = res - val address1 = key1.getPublic.toAddress - val address2 = key2.getPublic.toAddress - val address3 = key3.getPublic.toAddress - val address4 = key4.getPublic.toAddress - val balances = createBalances(address1, address2, address7, address6, address7) - val totalInfo = CurrencySnapshotInfo(SortedMap.empty, balances, None, None) - - val rewards = createRewards(address1, address2, address7) - for { + updatedBalances = applyTransactions( + initialBalances, + blocks.flatMap(_.block.transactions.toList).toList, + List.empty + ) + updatedInfo = CurrencySnapshotInfo(SortedMap.empty, updatedBalances, None, None) snapshot <- incrementalCurrencySnapshot[IO]( 100L, 10L, 20L, Hash("abc"), Hash("def"), - totalInfo, - rewards = rewards + updatedInfo, + blocks, + feeTransactions = None ) - result = CurrencyIncrementalSnapshotMapper.make().snapshotReferredBalancesInfo(snapshot, totalInfo) - expectedBalances = createBalances(address1, address2, address7) - } yield expect.same(result, expectedBalances) + result = CurrencyIncrementalSnapshotMapper + .make() + .balanceDiff(snapshot, initialBalances.some, updatedInfo) + } yield expect.same( + result, + updatedBalances - address4 + ) } - test("leave balances for addresses from rewards, but not set zero for these not in balances") { res => - implicit val (h, ks, js, sp, key1, key2, key3, key4) = res - val address1 = key1.getPublic.toAddress - val address2 = key2.getPublic.toAddress - val balances = createBalances(address1, address2, address5, address6) - val totalInfo = CurrencySnapshotInfo(SortedMap.empty, balances, None, None) - - val rewards = createRewards(address1, address2, address7) - for { - snapshot <- incrementalCurrencySnapshot[IO]( - 100L, - 10L, - 20L, - Hash("abc"), - Hash("def"), - totalInfo, - rewards = rewards - ) + test("leave balances for addresses from rewards") { res => + implicit val (h, ks, js, sp, key1, key2, key3, key4) = res + val address1 = key1.getPublic.toAddress + val address2 = key2.getPublic.toAddress + val address3 = key3.getPublic.toAddress + val address4 = key4.getPublic.toAddress - result = CurrencyIncrementalSnapshotMapper.make().snapshotReferredBalancesInfo(snapshot, totalInfo) - expectedBalances = createBalances(address1, address2) - } yield expect.same(result, expectedBalances) - } + val initialBalances = createBalances(address1, address2, address3, address4) + val rewards = createRewards(address1, address2) + val updatedBalances = applyTransactions( + initialBalances, + List.empty, + rewards.toList + ) + val updatedInfo = CurrencySnapshotInfo(SortedMap.empty, updatedBalances, None, None) + + for { + snapshot <- incrementalCurrencySnapshot[IO]( + 100L, + 10L, + 20L, + Hash("abc"), + Hash("def"), + updatedInfo, + rewards = rewards + ) + + result = CurrencyIncrementalSnapshotMapper.make().balanceDiff(snapshot, initialBalances.some, updatedInfo) + } yield expect.same( + result, + updatedBalances - address3 - address4 + ) + } } diff --git a/src/test/scala/org/constellation/snapshotstreaming/FileBasedLastIncrementalGlobalSnapshotStorageSuite.scala b/src/test/scala/org/constellation/snapshotstreaming/FileBasedLastIncrementalGlobalSnapshotStorageSuite.scala index 5f2710a..1ac9aad 100644 --- a/src/test/scala/org/constellation/snapshotstreaming/FileBasedLastIncrementalGlobalSnapshotStorageSuite.scala +++ b/src/test/scala/org/constellation/snapshotstreaming/FileBasedLastIncrementalGlobalSnapshotStorageSuite.scala @@ -21,6 +21,7 @@ import fs2.io.file.Files import fs2.io.file.Path import org.constellation.snapshotstreaming.data.hashSelect import org.constellation.snapshotstreaming.data.incrementalGlobalSnapshot +import org.constellation.snapshotstreaming.storage.FileBasedLastIncrementalGlobalSnapshotStorage import weaver.MutableIOSuite import org.tessellation.security.Hasher import org.tessellation.json.JsonSerializer diff --git a/src/test/scala/org/constellation/snapshotstreaming/GlobalSnapshotMapperSuite.scala b/src/test/scala/org/constellation/snapshotstreaming/GlobalSnapshotMapperSuite.scala index 59811a4..c9e86c9 100644 --- a/src/test/scala/org/constellation/snapshotstreaming/GlobalSnapshotMapperSuite.scala +++ b/src/test/scala/org/constellation/snapshotstreaming/GlobalSnapshotMapperSuite.scala @@ -4,15 +4,13 @@ import java.security.KeyPair import cats.data.NonEmptySet import cats.effect.IO import cats.effect.Resource -import cats.syntax.functor._ -import cats.syntax.traverse._ +import cats.implicits.catsSyntaxOptionId +import cats.syntax.all._ import scala.collection.immutable.SortedMap import scala.collection.immutable.SortedSet import org.tessellation.ext.cats.effect.ResourceIO import org.tessellation.kryo.KryoSerializer -import org.tessellation.schema.address.Address -import org.tessellation.schema.balance.Balance import org.tessellation.schema.transaction._ import org.tessellation.schema.GlobalSnapshotInfo import org.tessellation.node.shared.nodeSharedKryoRegistrar @@ -23,6 +21,7 @@ import org.tessellation.security.SecurityProvider import org.tessellation.shared.sharedKryoRegistrar import org.tessellation.syntax.sortedCollection._ import eu.timepit.refined.auto._ +import org.constellation.snapshotstreaming.data.applyTransactions import org.constellation.snapshotstreaming.data.createBalances import org.constellation.snapshotstreaming.data.createBlocksWithTransactions import org.constellation.snapshotstreaming.data.createRewards @@ -34,12 +33,13 @@ import weaver.MutableIOSuite import org.tessellation.security.Hasher import org.tessellation.json.JsonSerializer import org.tessellation.schema.GlobalIncrementalSnapshot +import org.tessellation.schema.balance.Balance import org.tessellation.security.Hashed import org.tessellation.security.HasherSelector object GlobalSnapshotMapperSuite extends MutableIOSuite { - type Res = (HasherSelector[IO], KryoSerializer[IO], SecurityProvider[IO], KeyPair, KeyPair) + type Res = (HasherSelector[IO], KryoSerializer[IO], SecurityProvider[IO], KeyPair, KeyPair, KeyPair, KeyPair) override def sharedResource: Resource[IO, Res] = SecurityProvider.forAsync[IO].flatMap { implicit sp => @@ -47,131 +47,184 @@ object GlobalSnapshotMapperSuite extends MutableIOSuite { for { key1 <- KeyPairGenerator.makeKeyPair[IO].asResource key2 <- KeyPairGenerator.makeKeyPair[IO].asResource + key3 <- KeyPairGenerator.makeKeyPair[IO].asResource + key4 <- KeyPairGenerator.makeKeyPair[IO].asResource js <- JsonSerializer.forSync[IO].asResource hasherSelector = { implicit val j: JsonSerializer[IO] = js HasherSelector.forSync[IO](Hasher.forJson[IO], Hasher.forKryo[IO], hashSelect) } - } yield (hasherSelector, kp, sp, key1, key2) + } yield (hasherSelector, kp, sp, key1, key2, key3, key4) } } def mkInitialSnapshot()(implicit ks: KryoSerializer[IO], h: HasherSelector[IO]): IO[Hashed[GlobalIncrementalSnapshot]] = incrementalGlobalSnapshot(100L, 10L, 20L, Hash("abc"), Hash("def")) - private val address3 = Address("DAG2AUdecqFwEGcgAcH1ac2wrsg8acrgGwrQojzw") - private val address4 = Address("DAG2EUdecqFwEGcgAcH1ac2wrsg8acrgGwrQivxq") - private val address5 = Address("DAG2EUdecqFwEGcgAcH1ac2wrsg8acrgGwrQitrs") - - test("set balance to 0 for source when not in info") { res => - implicit val (h, ks, sp, key1, key2) = res + test("explicitly sets balance to 0 for addressees missing in in info") { res => + implicit val (h, ks, sp, key1, key2, key3, _) = res val address1 = key1.getPublic.toAddress val address2 = key2.getPublic.toAddress - val totalInfo = GlobalSnapshotInfo.empty + val address3 = key3.getPublic.toAddress + val initialBalances = createBalances(address1, address2) - val rewards = createRewards(address1, address2, address5) for { - txn1 <- createTxn(address1, key1, address2) - txn2 <- createTxn(address2, key2, address1) - txn3 <- createTxn(address2, key2, address5) + txn1 <- createTxn(address1, key1, address2, TransactionAmount(1000L)) + txn2 <- createTxn(address2, key2, address3, TransactionAmount(2000L)) + blocks <- createBlocksWithTransactions( key1, - NonEmptySet.fromSetUnsafe(SortedSet(txn1, txn2)), - NonEmptySet.fromSetUnsafe(SortedSet(txn3)) + NonEmptySet.fromSetUnsafe(SortedSet(txn1)), + NonEmptySet.fromSetUnsafe(SortedSet(txn2)) ) + + updatedBalances = applyTransactions( + initialBalances, + blocks.flatMap(_.block.transactions.toList).toList, + List.empty + ) + + updatedInfo = GlobalSnapshotInfo(SortedMap.empty, SortedMap.empty, updatedBalances, SortedMap.empty, SortedMap.empty) + snapshot <- incrementalGlobalSnapshot[IO]( 100L, 10L, 20L, Hash("abc"), Hash("def"), - totalInfo, + updatedInfo, blocks, - rewards = rewards ) - result = GlobalSnapshotMapper.make().snapshotReferredBalancesInfo(snapshot, totalInfo) - } yield expect.same(result, SortedMap(address1 -> Balance(0L), address2 -> Balance(0L))) - + result = GlobalSnapshotMapper + .make() + .balanceDiff(snapshot, initialBalances.some, GlobalSnapshotInfo.empty) + } yield expect.all( + initialBalances(address1) === Balance(1000L), + initialBalances(address2) === Balance(1000L), + !initialBalances.contains(address3), + !updatedBalances.contains(address1), + !updatedBalances.contains(address2), + updatedBalances(address3) === Balance(2000L), + result === SortedMap(address1 -> Balance(0L), address2 -> Balance(0L)) + ) } - test("leave balances for addresses from transactions") { res => - implicit val (h, ks, sp, key1, key2) = res + + test("removes addresses that have transactions but the result balance hasn't changed") { res => + implicit val (h, ks, sp, key1, key2, _, _) = res val address1 = key1.getPublic.toAddress val address2 = key2.getPublic.toAddress - val balances = createBalances(address1, address2, address3, address4, address5) - val totalInfo = GlobalSnapshotInfo(SortedMap.empty, SortedMap.empty, balances, SortedMap.empty, SortedMap.empty) + val initialBalances = createBalances(address1, address2) for { - txn1 <- createTxn(address1, key1, address2) - txn2 <- createTxn(address2, key2, address1) - txn3 <- createTxn(address2, key2, address5) + txn1 <- createTxn(address1, key1, address2, TransactionAmount(1L)) + txn2 <- createTxn(address2, key2, address1, TransactionAmount(1L)) blocks <- createBlocksWithTransactions( key1, NonEmptySet.fromSetUnsafe(SortedSet(txn1, txn2)), - NonEmptySet.fromSetUnsafe(SortedSet(txn3)) ) - snapshot <- incrementalGlobalSnapshot[IO](100L, 10L, 20L, Hash("abc"), Hash("def"), totalInfo, blocks) + updatedBalances = applyTransactions( + initialBalances, + blocks.flatMap(_.block.transactions.toList).toList, + List.empty + ) + updatedInfo = GlobalSnapshotInfo(SortedMap.empty, SortedMap.empty, updatedBalances, SortedMap.empty, SortedMap.empty) + + snapshot <- incrementalGlobalSnapshot[IO]( + 100L, + 10L, + 20L, + Hash("abc"), + Hash("def"), + updatedInfo, + blocks, + ) - result = GlobalSnapshotMapper.make().snapshotReferredBalancesInfo(snapshot, totalInfo) - expectedBalances = createBalances(address1, address2, address5) - } yield expect.same(result, expectedBalances) + result = GlobalSnapshotMapper + .make() + .balanceDiff(snapshot, initialBalances.some, updatedInfo) + } yield expect.same( + result, + updatedBalances - address1 - address2 + ) } - test("leave balances for addresses from transactions, but not set zero for destinations not in balances ") { res => - implicit val (h, ks, sp, key1, key2) = res + test("leaves addresses that changed") { res => + implicit val (h, ks, sp, key1, key2, key3, key4) = res val address1 = key1.getPublic.toAddress val address2 = key2.getPublic.toAddress - val balances = createBalances(address1, address3, address4) - val totalInfo = GlobalSnapshotInfo(SortedMap.empty, SortedMap.empty, balances, SortedMap.empty, SortedMap.empty) + val address3 = key3.getPublic.toAddress + val address4 = key4.getPublic.toAddress + val initialBalances = createBalances(address1, address2, address3, address4) for { - txn1 <- createTxn(address1, key1, address2) - txn2 <- createTxn(address2, key2, address1) - txn3 <- createTxn(address2, key2, address5) + txn1 <- createTxn(address1, key1, address2, TransactionAmount(3L)) + txn2 <- createTxn(address2, key2, address1, TransactionAmount(5L)) + txn3 <- createTxn(address2, key2, address3, TransactionAmount(13L)) blocks <- createBlocksWithTransactions( key1, NonEmptySet.fromSetUnsafe(SortedSet(txn1, txn2)), NonEmptySet.fromSetUnsafe(SortedSet(txn3)) ) - snapshot <- incrementalGlobalSnapshot[IO](100L, 10L, 20L, Hash("abc"), Hash("def"), totalInfo, blocks) + updatedBalances = applyTransactions( + initialBalances, + blocks.flatMap(_.block.transactions.toList).toList, + List.empty + ) + updatedInfo = GlobalSnapshotInfo(SortedMap.empty, SortedMap.empty, updatedBalances, SortedMap.empty, SortedMap.empty) + snapshot <- incrementalGlobalSnapshot[IO]( + 100L, + 10L, + 20L, + Hash("abc"), + Hash("def"), + updatedInfo, + blocks, + ) - result = GlobalSnapshotMapper.make().snapshotReferredBalancesInfo(snapshot, totalInfo) - expectedBalances = SortedMap(address1 -> Balance(1000L), address2 -> Balance(0L)) - } yield expect.same(result, expectedBalances) + result = GlobalSnapshotMapper + .make() + .balanceDiff(snapshot, initialBalances.some, updatedInfo) + } yield expect.same( + result, + updatedBalances - address4 + ) } test("leave balances for addresses from rewards") { res => - implicit val (h, ks, _, key1, key2) = res + implicit val (h, ks, sp, key1, key2, key3, key4) = res val address1 = key1.getPublic.toAddress val address2 = key2.getPublic.toAddress - val balances = createBalances(address1, address2, address3, address4, address5) - val totalInfo = GlobalSnapshotInfo(SortedMap.empty, SortedMap.empty, balances, SortedMap.empty, SortedMap.empty) + val address3 = key3.getPublic.toAddress + val address4 = key4.getPublic.toAddress - val rewards = createRewards(address1, address2, address5) - for { - snapshot <- incrementalGlobalSnapshot[IO](100L, 10L, 20L, Hash("abc"), Hash("def"), totalInfo, rewards = rewards) + val initialBalances = createBalances(address1, address2, address3, address4) + val rewards = createRewards(address1, address2) - result = GlobalSnapshotMapper.make().snapshotReferredBalancesInfo(snapshot, totalInfo) - expectedBalances = createBalances(address1, address2, address5) - } yield expect.same(result, expectedBalances) - } + val updatedBalances = applyTransactions( + initialBalances, + List.empty, + rewards.toList + ) + val updatedInfo = GlobalSnapshotInfo(SortedMap.empty, SortedMap.empty, updatedBalances, SortedMap.empty, SortedMap.empty) - test("leave balances for addresses from rewards, but not set zero for these not in balances") { res => - implicit val (h, ks, _, key1, key2) = res - val address1 = key1.getPublic.toAddress - val address2 = key2.getPublic.toAddress - val balances = createBalances(address1, address2, address3, address4) - val totalInfo = GlobalSnapshotInfo(SortedMap.empty, SortedMap.empty, balances, SortedMap.empty, SortedMap.empty) - - val rewards = createRewards(address1, address2, address5) for { - snapshot <- incrementalGlobalSnapshot[IO](100L, 10L, 20L, Hash("abc"), Hash("def"), totalInfo, rewards = rewards) + snapshot <- incrementalGlobalSnapshot[IO]( + 100L, + 10L, + 20L, + Hash("abc"), + Hash("def"), + updatedInfo, + rewards = rewards + ) - result = GlobalSnapshotMapper.make().snapshotReferredBalancesInfo(snapshot, totalInfo) - expectedBalances = createBalances(address1, address2) - } yield expect.same(result, expectedBalances) + result = GlobalSnapshotMapper.make().balanceDiff(snapshot, initialBalances.some, updatedInfo) + } yield expect.same( + result, + updatedBalances - address3 - address4 + ) } - } diff --git a/src/test/scala/org/constellation/snapshotstreaming/data.scala b/src/test/scala/org/constellation/snapshotstreaming/data.scala index 4c27d7b..9bad5e6 100644 --- a/src/test/scala/org/constellation/snapshotstreaming/data.scala +++ b/src/test/scala/org/constellation/snapshotstreaming/data.scala @@ -122,6 +122,27 @@ object data { def createBalances(addresses: Address*) = addresses.map(address => address -> Balance(1000L)).toMap.toSortedMap + def applyTransactions( + balances: SortedMap[Address, Balance], + txs: List[Signed[Transaction]], + rewards: List[RewardTransaction] + ): SortedMap[Address, Balance] = { + val txApplied = txs.foldLeft(balances.view.mapValues(_.value.toLong).toMap) { case (acc, tx) => + acc + .updatedWith(tx.source)(existing => (existing.getOrElse(0L) - tx.amount.value).some) + .updatedWith(tx.destination)(existing => (existing.getOrElse(0L) + tx.amount.value).some) + } + + val rewardsApplied = rewards.foldLeft(txApplied) { case (acc, tx) => + acc + .updatedWith(tx.destination)(existing => (existing.getOrElse(0L) + tx.amount.value).some) + } + + val nonEmpty = rewardsApplied.filterNot { case (_, balance) => balance === 0L } + + nonEmpty.view.mapValues(v => Balance(NonNegLong.unsafeFrom(v))).toMap.toSortedMap + } + def createRewards(addresses: Address*) = addresses.map(address => RewardTransaction(address, TransactionAmount(1000L))).toSortedSet @@ -143,7 +164,8 @@ object data { def createTxn[F[_]: Async: KryoSerializer: HasherSelector: SecurityProvider]( src: Address, srcKey: KeyPair, - dst: Address + dst: Address, + amount: TransactionAmount = TransactionAmount(1L) ): F[Signed[Transaction]] = { implicit val hasher: Hasher[F] = HasherSelector[F].getCurrent @@ -151,7 +173,7 @@ object data { Transaction( src, dst, - TransactionAmount(1L), + amount, TransactionFee.zero, TransactionReference.empty, TransactionSalt(0L)