Skip to content

Commit

Permalink
Add FeeTransaction handling for balances
Browse files Browse the repository at this point in the history
  • Loading branch information
marcinwadon authored and kpudlik committed Jun 12, 2024
1 parent 9961e74 commit 436fa02
Show file tree
Hide file tree
Showing 17 changed files with 802 additions and 295 deletions.
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object Dependencies {
val logstash = "7.2"
val organizeImports = "0.6.0"
val refined = "0.10.1"
val tessellation = "2.5.0"
val tessellation = "2.7.0"
val weaver = "0.8.1"
}

Expand Down
23 changes: 13 additions & 10 deletions src/main/scala/org/constellation/snapshotstreaming/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package org.constellation.snapshotstreaming
import cats.effect._
import cats.effect.std.Random
import cats.syntax.all._

import org.tessellation._
import org.tessellation.ext.cats.effect._
import org.tessellation.json.JsonSerializer
Expand All @@ -12,7 +11,6 @@ import org.tessellation.node.shared.config.types.SharedConfigReader
import org.tessellation.node.shared.ext.pureconfig._
import org.tessellation.schema.SnapshotOrdinal
import org.tessellation.security._

import eu.timepit.refined.pureconfig._
import org.typelevel.log4cats.slf4j.Slf4jLogger
import pureconfig.ConfigSource
Expand All @@ -24,12 +22,13 @@ object App extends IOApp {
private val logger = Slf4jLogger.getLogger[IO]

def run(args: List[String]): IO[ExitCode] =
ConfigSource.default.loadF[IO, SharedConfigReader]().flatMap { sharedCfg =>
IO(new Configuration).flatMap { configuration =>
ConfigSource.default
.loadF[IO, SharedConfigReader]()
.map(new Configuration(_))
.flatMap { configuration =>
val hashSelect = new HashSelect {
def select(ordinal: SnapshotOrdinal): HashLogic =
if (ordinal <= sharedCfg.lastKryoHashOrdinal.getOrElse(configuration.environment, SnapshotOrdinal.MinValue)) KryoHash
else JsonHash
if (ordinal <= configuration.lastKryoHashOrdinal) KryoHash else JsonHash
}

Random
Expand All @@ -38,11 +37,16 @@ object App extends IOApp {
.use { implicit random =>
KryoSerializer.forAsync[IO](shared.sharedKryoRegistrar).use { implicit ks =>
JsonSerializer.forSync[IO].asResource.use { implicit jsonSerializer =>
implicit val hasher = Hasher.forSync[IO](hashSelect)
implicit val hasherSelector =
HasherSelector.forSync[IO](Hasher.forJson[IO], Hasher.forKryo[IO], hashSelect)
val txHasher = Hasher.forKryo[IO]

SecurityProvider.forAsync[IO].use { implicit sp =>
SnapshotProcessor
.make[IO](configuration, hashSelect, sharedCfg.snapshot.size)
.make[IO](
configuration,
txHasher
)
.use { snapshotProcessor =>
snapshotProcessor.runtime.compile.drain
.flatTap(_ => logger.info("Done!"))
Expand All @@ -52,8 +56,7 @@ object App extends IOApp {
}
}
}
.handleErrorWith(e => logger.error(e)(e.getMessage).as(ExitCode.Error))
}
.handleErrorWith(e => logger.error(e)(e.getMessage).as(ExitCode.Error))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,27 @@ package org.constellation.snapshotstreaming
import cats.data.NonEmptyMap

import scala.collection.immutable.SortedMap
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._
import scala.util.Try

import org.tessellation.env.AppEnvironment
import org.tessellation.schema.SnapshotOrdinal
import org.tessellation.schema.balance.Amount
import org.tessellation.schema.peer.{L0Peer, PeerId}

import com.typesafe.config.{Config, ConfigFactory}
import eu.timepit.refined.types.numeric.{NonNegLong, PosLong}
import org.tessellation.schema.peer.L0Peer
import org.tessellation.schema.peer.PeerId
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import eu.timepit.refined.types.numeric.NonNegLong
import eu.timepit.refined.types.numeric.PosLong
import fs2.io.file.Path
import io.circe.parser.decode
import org.http4s.Uri
import org.tessellation.node.shared.config.types
import org.tessellation.node.shared.config.types.SharedConfigReader
import org.tessellation.node.shared.domain.statechannel.FeeCalculatorConfig

class Configuration {
class Configuration(sharedConfigReader: SharedConfigReader) {
private val config: Config = ConfigFactory.load().resolve()

private val httpClient = config.getConfig("snapshotStreaming.httpClient")
Expand All @@ -29,7 +34,18 @@ class Configuration {
val lastFullSnapshotPath: Path = Path(config.getString("snapshotStreaming.lastSnapshotPath"))
val lastIncrementalSnapshotPath: Path = Path(config.getString("snapshotStreaming.lastIncrementalSnapshotPath"))
val collateral: Amount = Amount(NonNegLong.unsafeFrom(config.getLong("snapshotStreaming.collateral")))
val environment: AppEnvironment = AppEnvironment.withNameInsensitive(config.getString("snapshotStreaming.environment"))

val environment: AppEnvironment =
AppEnvironment.withNameInsensitive(config.getString("snapshotStreaming.environment"))

val lastKryoHashOrdinal: SnapshotOrdinal =
sharedConfigReader.lastKryoHashOrdinal.getOrElse(environment, SnapshotOrdinal.MinValue)

val snapshotSize: types.SnapshotSizeConfig = sharedConfigReader.snapshot.size

val feeConfigs: SortedMap[SnapshotOrdinal, FeeCalculatorConfig] = sharedConfigReader.feeConfigs.get(environment)
.map(configs => SortedMap.from(configs))
.getOrElse(SortedMap.empty[SnapshotOrdinal, FeeCalculatorConfig])

val l0Peers: NonEmptyMap[PeerId, L0Peer] = NonEmptyMap.fromMapUnsafe(
SortedMap.from(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.tessellation.node.shared.domain.snapshot.Validator.isNextSnapshot
import org.tessellation.node.shared.domain.snapshot.storage.LastSnapshotStorage
import org.tessellation.schema._
import org.tessellation.schema.height.Height
import org.tessellation.security.{HashSelect, Hashed, Hasher}
import org.tessellation.security._

import fs2.io.file._
import fs2.{Stream, text}
Expand All @@ -29,17 +29,15 @@ import io.circe.syntax._

object FileBasedLastIncrementalGlobalSnapshotStorage {

def make[F[_]: Async: Files: KryoSerializer: Hasher](
path: Path,
hashSelect: HashSelect
def make[F[_]: Async: Files: KryoSerializer: HasherSelector](
path: Path
): F[LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo]] =
Semaphore[F](1)
.map(make(path, _, hashSelect))
.map(make(path, _))

private def make[F[_]: Async: Files: KryoSerializer: Hasher](
private def make[F[_]: Async: Files: KryoSerializer: HasherSelector](
path: Path,
semaphore: Semaphore[F],
hashSelect: HashSelect
semaphore: Semaphore[F]
): LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo] =
new LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo] {

Expand Down Expand Up @@ -84,18 +82,22 @@ object FileBasedLastIncrementalGlobalSnapshotStorage {
}

private def validateStateProof(snapshot: Hashed[GlobalIncrementalSnapshot], state: GlobalSnapshotInfo) =
StateProofValidator
.validate(snapshot, state, hashSelect)
.map(_.isValid)
.flatMap(new Throwable("State proof doesn't match!").raiseError[F, Unit].unlessA)
HasherSelector[F].forOrdinal(snapshot.ordinal) { implicit hasher =>
(hasher.getLogic(snapshot.ordinal) match {
case JsonHash => StateProofValidator.validate(snapshot, state)
case KryoHash => StateProofValidator.validate(snapshot, GlobalSnapshotInfoV2.fromGlobalSnapshotInfo(state))
})
.map(_.isValid)
.flatMap(new Throwable("State proof doesn't match!").raiseError[F, Unit].unlessA)
}

def get: F[Option[Hashed[GlobalIncrementalSnapshot]]] =
getSnasphotWithState(_.snapshot)

def getCombined: F[Option[(Hashed[GlobalIncrementalSnapshot], GlobalSnapshotInfo)]] =
getSnasphotWithState(sws => (sws.snapshot, sws.state))

def getCombinedStream: Stream[F, Option[(Hashed[GlobalIncrementalSnapshot], GlobalSnapshotInfo)]] =
def getCombinedStream: Stream[F, Option[(Hashed[GlobalIncrementalSnapshot], GlobalSnapshotInfo)]] =
???

private def getSnasphotWithState[A](extract: SnapshotWithState => A): F[Option[A]] = Files[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,56 @@ import cats.syntax.traverse._

import org.tessellation.currency.schema.currency.{CurrencyIncrementalSnapshot, CurrencySnapshot, CurrencySnapshotInfo}
import org.tessellation.kryo.KryoSerializer
import org.tessellation.node.shared.infrastructure.snapshot.{GlobalSnapshotContextFunctions, GlobalSnapshotStateChannelEventsProcessor}
import org.tessellation.node.shared.infrastructure.snapshot.{
GlobalSnapshotContextFunctions,
GlobalSnapshotStateChannelEventsProcessor
}
import org.tessellation.schema.{GlobalIncrementalSnapshot, GlobalSnapshotInfo}
import org.tessellation.security.signature.Signed
import org.tessellation.security.{Hashed, Hasher}
import org.tessellation.security.{Hashed, Hasher, HasherSelector}

import org.constellation.snapshotstreaming.SnapshotProcessor.GlobalSnapshotWithState

trait GlobalSnapshotContextService[F[_]] {

def createContext(context: GlobalSnapshotInfo, lastArtifact: Signed[GlobalIncrementalSnapshot], artifact: Hashed[GlobalIncrementalSnapshot]): F[GlobalSnapshotWithState]
def createContext(
context: GlobalSnapshotInfo,
lastArtifact: Signed[GlobalIncrementalSnapshot],
artifact: Hashed[GlobalIncrementalSnapshot]
): F[GlobalSnapshotWithState]

}

object GlobalSnapshotContextService {

def make[F[_]: Async: KryoSerializer: Hasher](
def make[F[_]: Async: KryoSerializer: HasherSelector](
globalSnapshotStateChannelEventsProcessor: GlobalSnapshotStateChannelEventsProcessor[F],
globalSnapshotContextFns: GlobalSnapshotContextFunctions[F]
): GlobalSnapshotContextService[F] =
new GlobalSnapshotContextService[F] {
def createContext(context: GlobalSnapshotInfo, lastArtifact: Signed[GlobalIncrementalSnapshot], artifact: Hashed[GlobalIncrementalSnapshot]): F[GlobalSnapshotWithState] =
globalSnapshotContextFns.createContext(context, lastArtifact, artifact.signed)

def createContext(
context: GlobalSnapshotInfo,
lastArtifact: Signed[GlobalIncrementalSnapshot],
artifact: Hashed[GlobalIncrementalSnapshot]
): F[GlobalSnapshotWithState] =
HasherSelector[F]
.forOrdinal(artifact.ordinal) { implicit hasher =>
globalSnapshotContextFns.createContext(context, lastArtifact, artifact.signed)
}
.flatMap { newContext =>
globalSnapshotStateChannelEventsProcessor
.processCurrencySnapshots(context, artifact.signed.value.stateChannelSnapshots)
.flatMap(_.traverse(_.traverse {
case Left(full) => full.toHashed.map(_.asLeft[(Hashed[CurrencyIncrementalSnapshot], CurrencySnapshotInfo)])
case Right((inc, info)) => inc.toHashed.map((_, info).asRight[Hashed[CurrencySnapshot]])
}))
.map(GlobalSnapshotWithState(artifact, newContext, _))
HasherSelector[F].forOrdinal(artifact.ordinal) { implicit hasher =>
globalSnapshotStateChannelEventsProcessor
.processCurrencySnapshots(context, artifact.signed.value.stateChannelSnapshots)
.flatMap(_.traverse(_.traverse {
case Left(full) =>
full.toHashed.map(_.asLeft[(Hashed[CurrencyIncrementalSnapshot], CurrencySnapshotInfo)])
case Right((inc, info)) => inc.toHashed.map((_, info).asRight[Hashed[CurrencySnapshot]])
}))
.map(GlobalSnapshotWithState(artifact, newContext, _))
}
}

}

}
Loading

0 comments on commit 436fa02

Please sign in to comment.