Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fee transactions support #69

Merged
merged 2 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class Configuration(sharedConfigReader: SharedConfigReader) {
val currencySnapshotsIndex: String = opensearch.getString("indexes.currency.snapshots")
val currencyBlocksIndex: String = opensearch.getString("indexes.currency.blocks")
val currencyTransactionsIndex: String = opensearch.getString("indexes.currency.transactions")
val currencyFeeTransactionsIndex: String = opensearch.getString("indexes.currency.fee-transactions")
val currencyBalancesIndex: String = opensearch.getString("indexes.currency.balances")
val bulkSize: Int = opensearch.getInt("bulkSize")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object UpdateRequestBuilder {
txHasher,
hasher
)
(currSnapshot, currIncrementalSnapshots, currBlocks, currTransactions, currBalances) =
(currSnapshot, currIncrementalSnapshots, currBlocks, currTransactions, currFeeTransactions, currBalances) =
mappedCurrencyData

} yield updateRequests(
Expand All @@ -74,6 +74,7 @@ object UpdateRequestBuilder {
currIncrementalSnapshots,
currBlocks,
currTransactions,
currFeeTransactions,
currBalances
).grouped(config.bulkSize).toSeq

Expand All @@ -86,8 +87,10 @@ object UpdateRequestBuilder {
currencyIncrementalSnapshots: Seq[CurrencyData[CurrencySnapshot]],
currencyBlocks: Seq[CurrencyData[Block]],
currencyTransactions: Seq[CurrencyData[Transaction]],
currencyFeeTransactions: Seq[CurrencyData[FeeTransaction]],
currencyBalances: Seq[CurrencyData[AddressBalance]]
): Seq[UpdateRequest] =
): Seq[UpdateRequest] = {

Seq(updateById(config.snapshotsIndex, snapshot.hash).docAsUpsert(snapshot)) ++
blocks.map(block => updateById(config.blocksIndex, block.hash).docAsUpsert(block)) ++
transactions.map(transaction =>
Expand All @@ -110,11 +113,15 @@ object UpdateRequestBuilder {
val id = s"$identifier${data.hash}"
updateById(config.currencyTransactionsIndex, id).docAsUpsert(cd)
} ++
currencyFeeTransactions.map { case cd @ CurrencyData(identifier, data) =>
val id = s"$identifier${data.hash}"
updateById(config.currencyFeeTransactionsIndex, id).docAsUpsert(cd)
} ++
currencyBalances.map { case cd @ CurrencyData(identifier, data) =>
val id = s"$identifier${data.docId}"
updateById(config.currencyBalancesIndex, id).docAsUpsert(cd)
}

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import cats.syntax.all._
import eu.timepit.refined.auto._
import io.estatico.newtype.ops._
import org.constellation.snapshotstreaming.opensearch.schema.CurrencySnapshot
import org.constellation.snapshotstreaming.opensearch.schema.FeeTransaction
import org.constellation.snapshotstreaming.opensearch.schema.FeeTransactionReference
import org.constellation.snapshotstreaming.opensearch.schema.RewardTransaction
import org.tessellation.syntax.sortedCollection._

import scala.collection.immutable.SortedSet
import org.tessellation.currency.schema.currency.CurrencyIncrementalSnapshot
import org.tessellation.currency.schema.currency.CurrencySnapshotInfo
import org.tessellation.currency.schema.feeTransaction.{FeeTransaction => OriginalFeeTransaction}
import org.tessellation.currency.schema.feeTransaction.{FeeTransactionReference => OriginalFeeTransactionReference}
import org.tessellation.json.JsonSerializer
import org.tessellation.json.SizeCalculator
import org.tessellation.schema.currencyMessage.MessageType
Expand All @@ -32,6 +37,12 @@ abstract class CurrencyIncrementalSnapshotMapper[F[_]: Async: JsonSerializer]
hasher: Hasher[F]
): F[CurrencySnapshot]

def mapFeeTransactions(
snapshot: Hashed[CurrencyIncrementalSnapshot],
timestamp: Date,
hasher: Hasher[F]
): F[List[FeeTransaction]]

}

object CurrencyIncrementalSnapshotMapper {
Expand All @@ -44,11 +55,23 @@ object CurrencyIncrementalSnapshotMapper {

def extractSnapshotReferredAddresses(snapshot: CurrencyIncrementalSnapshot): SnapshotReferredAddresses = {
val transactions = snapshot.blocks.flatMap(_.block.transactions.toSortedSet)
val source = transactions.map(_.source)
val destination = transactions.map(_.destination)
val feeTransactions = snapshot.feeTransactions.getOrElse(SortedSet.empty[Signed[OriginalFeeTransaction]])
val source = transactions.map(_.source) ++ feeTransactions.map(_.source)
val destination = transactions.map(_.destination) ++ feeTransactions.map(_.destination)
SnapshotReferredAddresses(source, destination)
}

def mapFeeTransactions(
snapshot: Hashed[CurrencyIncrementalSnapshot],
timestamp: Date,
hasher: Hasher[F]
): F[List[FeeTransaction]] = {
implicit val hs: Hasher[F] = hasher
snapshot.feeTransactions.toList.flatTraverse(
_.toList.traverse(mapFeeTransaction(snapshot.hash.value, snapshot.ordinal.value, timestamp))
)
}

def mapSnapshot(
snapshot: Hashed[CurrencyIncrementalSnapshot],
binary: Signed[StateChannelSnapshotBinary],
Expand Down Expand Up @@ -78,6 +101,26 @@ object CurrencyIncrementalSnapshotMapper {
sizeInKB = sizeInKb.toLong
)

private def mapFeeTransaction(snapshotHash: String, snapshotOrdinal: Long, timestamp: Date)(
feeTransaction: Signed[OriginalFeeTransaction]
)(implicit hasher: Hasher[F]): F[FeeTransaction] =
feeTransaction.toHashed.map { feeTx =>
FeeTransaction(
feeTx.hash.value,
feeTx.amount.value,
feeTx.source.value,
feeTx.destination.value,
mapFeeTransactionRef(feeTx.parent),
feeTx.salt.value,
snapshotHash,
snapshotOrdinal,
timestamp
)
}

private def mapFeeTransactionRef(ref: OriginalFeeTransactionReference): FeeTransactionReference =
FeeTransactionReference(ref.hash.value, ref.ordinal.value)

private def getMessageAddress(messageType: MessageType, info: CurrencySnapshotInfo): Option[String] =
info.lastMessages.flatMap(_.get(messageType)).map(_.address.value.value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ trait CurrencySnapshotMapper[F[_]] {
Seq[CurrencyData[OSCurrencySnapshot]],
Seq[CurrencyData[Block]],
Seq[CurrencyData[Transaction]],
Seq[CurrencyData[FeeTransaction]],
Seq[CurrencyData[AddressBalance]]
)
]
Expand All @@ -58,6 +59,25 @@ object CurrencySnapshotMapper {
): CurrencySnapshotMapper[F] =
new CurrencySnapshotMapper[F] {

type Acc = (
Seq[CurrencyData[Snapshot]],
Seq[CurrencyData[OSCurrencySnapshot]],
Seq[CurrencyData[Block]],
Seq[CurrencyData[Transaction]],
Seq[CurrencyData[FeeTransaction]],
Seq[CurrencyData[AddressBalance]],
Map[Address, SortedMap[Address, Balance]]
)

type CurrencySnapshotMapperResult = (
Seq[CurrencyData[Snapshot]],
Seq[CurrencyData[OSCurrencySnapshot]],
Seq[CurrencyData[Block]],
Seq[CurrencyData[Transaction]],
Seq[CurrencyData[FeeTransaction]],
Seq[CurrencyData[AddressBalance]]
)

def mapCurrencySnapshots(
snapshots: Map[Address, NonEmptyList[Either[Hashed[
CurrencySnapshot
Expand All @@ -68,15 +88,7 @@ object CurrencySnapshotMapper {
timestamp: Date,
txHasher: Hasher[F],
hasher: Hasher[F]
): F[
(
Seq[CurrencyData[Snapshot]],
Seq[CurrencyData[OSCurrencySnapshot]],
Seq[CurrencyData[Block]],
Seq[CurrencyData[Transaction]],
Seq[CurrencyData[AddressBalance]]
)
] = {
): F[CurrencySnapshotMapperResult] = {

val initialAccBalances = maybeLastSnapshots.map {
_.map {
Expand All @@ -85,19 +97,20 @@ object CurrencySnapshotMapper {
}
}.getOrElse(SortedMap.empty[Address, SortedMap[Address, Balance]])

val initialAcc: Acc = (
Seq.empty[CurrencyData[Snapshot]],
Seq.empty[CurrencyData[OSCurrencySnapshot]],
Seq.empty[CurrencyData[Block]],
Seq.empty[CurrencyData[Transaction]],
Seq.empty[CurrencyData[FeeTransaction]],
Seq.empty[CurrencyData[AddressBalance]],
initialAccBalances
)

snapshots.toList.flatMap { case (i, s) => s.toList.map((i, _)) }
.foldLeftM(
(
Seq.empty[CurrencyData[Snapshot]],
Seq.empty[CurrencyData[OSCurrencySnapshot]],
Seq.empty[CurrencyData[Block]],
Seq.empty[CurrencyData[Transaction]],
Seq.empty[CurrencyData[AddressBalance]],
initialAccBalances
)
) {
.foldLeftM[F, Acc](initialAcc) {
case (
(aggSnap, aggCurrencyIncrementalSnap, aggBlocks, aggTxs, aggBalances, aggLastBalances),
(aggSnap, aggCurrencyIncrementalSnap, aggBlocks, aggTxs, aggFeeTxs, aggBalances, aggLastBalances),
(identifier, fullOrIncremental)
) =>
val identifierStr = identifier.value.value
Expand All @@ -122,6 +135,7 @@ object CurrencySnapshotMapper {
aggCurrencyIncrementalSnap,
aggBlocks ++ blocks,
aggTxs ++ transactions,
aggFeeTxs,
aggBalances ++ balances,
aggLastBalances + (identifier -> full.info.balances)
)
Expand All @@ -137,9 +151,10 @@ object CurrencySnapshotMapper {
transactions <- incrementalMapper
.mapTransactions(incremental, timestamp, txHasher, hasher)
.map(_.map(CurrencyData(identifierStr, _)))
feeTransactions <- incrementalMapper
.mapFeeTransactions(incremental, timestamp, hasher)
.map(_.map(CurrencyData(identifierStr, _)))
prevBalances = aggLastBalances.get(identifier)
_ =
println(s"Balance diff between ${}")
filteredBalances = incrementalMapper.balanceDiff(
incremental,
prevBalances,
Expand All @@ -153,13 +168,14 @@ object CurrencySnapshotMapper {
aggCurrencyIncrementalSnap :+ snapshot,
aggBlocks ++ blocks,
aggTxs ++ transactions,
aggFeeTxs ++ feeTransactions,
aggBalances ++ balances,
aggLastBalances + (identifier -> filteredBalances)
)
}
}
.map { case (aggSnap, aggCurrencyIncrementalSnap, aggBlocks, aggTxs, aggBalances, _) =>
(aggSnap, aggCurrencyIncrementalSnap, aggBlocks, aggTxs, aggBalances)
.map { case (aggSnap, aggCurrencyIncrementalSnap, aggBlocks, aggTxs, aggFeeTxs, aggBalances, _) =>
(aggSnap, aggCurrencyIncrementalSnap, aggBlocks, aggTxs, aggFeeTxs, aggBalances)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package org.constellation.snapshotstreaming.opensearch.schema

import io.circe.Encoder
import io.circe.generic.semiauto.deriveEncoder

case class CurrencyData[A](identifier: String, data: A)

object CurrencyData {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.constellation.snapshotstreaming.opensearch.schema

import java.util.Date
import io.circe.Encoder
import io.circe.generic.semiauto._
import schema._

final case class FeeTransaction(
hash: String,
amount: Long,
source: String,
destination: String,
parent: FeeTransactionReference,
salt: Long,
snapshotHash: String,
snapshotOrdinal: Long,
timestamp: Date
)

object FeeTransaction {
implicit def feeTransactionEncoder: Encoder[FeeTransaction] = deriveEncoder
}


case class FeeTransactionReference(hash: String, ordinal: Long)

object FeeTransactionReference {
implicit val feeTransactionReferenceEncoder: Encoder[FeeTransactionReference] = deriveEncoder
}
33 changes: 31 additions & 2 deletions src/test/scala/org/constellation/snapshotstreaming/data.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ import cats.effect.Async
import org.tessellation.currency.schema.currency.CurrencyIncrementalSnapshot
import org.tessellation.currency.schema.currency.CurrencySnapshotInfo
import org.tessellation.currency.schema.feeTransaction.FeeTransaction
import org.tessellation.currency.schema.feeTransaction.FeeTransactionReference
import org.tessellation.kryo.KryoSerializer
import org.tessellation.schema.address.Address
import org.tessellation.schema.balance.Amount
import org.tessellation.schema.balance.Balance
import org.tessellation.schema.transaction.Transaction
import org.tessellation.schema.transaction.TransactionAmount
import org.tessellation.schema.transaction.TransactionFee
import org.tessellation.schema.transaction.TransactionOrdinal
import org.tessellation.schema.transaction.TransactionReference
import org.tessellation.schema.transaction.TransactionSalt
import org.tessellation.security.HasherSelector
Expand Down Expand Up @@ -125,7 +128,8 @@ object data {
def applyTransactions(
balances: SortedMap[Address, Balance],
txs: List[Signed[Transaction]],
rewards: List[RewardTransaction]
rewards: List[RewardTransaction],
feeTxs: List[Signed[FeeTransaction]]
): SortedMap[Address, Balance] = {
val txApplied = txs.foldLeft(balances.view.mapValues(_.value.toLong).toMap) { case (acc, tx) =>
acc
Expand All @@ -138,7 +142,13 @@ object data {
.updatedWith(tx.destination)(existing => (existing.getOrElse(0L) + tx.amount.value).some)
}

val nonEmpty = rewardsApplied.filterNot { case (_, balance) => balance === 0L }
val feeTxsApplied = feeTxs.foldLeft(rewardsApplied) { case (acc, feeTx) =>
acc
.updatedWith(feeTx.source)(existing => (existing.getOrElse(0L) - feeTx.amount.value).some)
.updatedWith(feeTx.destination)(existing => (existing.getOrElse(0L) + feeTx.amount.value).some)
}

val nonEmpty = feeTxsApplied.filterNot { case (_, balance) => balance === 0L }

nonEmpty.view.mapValues(v => Balance(NonNegLong.unsafeFrom(v))).toMap.toSortedMap
}
Expand Down Expand Up @@ -182,6 +192,25 @@ object data {
)
}

def createFeeTxn[F[_]: Async: KryoSerializer: HasherSelector: SecurityProvider](
src: Address,
srcKey: KeyPair,
dst: Address
): F[Signed[FeeTransaction]] = {
implicit val hasher: Hasher[F] = HasherSelector[F].getCurrent

forAsyncHasher[F, FeeTransaction](
FeeTransaction(
src,
dst,
Amount(1L),
FeeTransactionReference(TransactionOrdinal(0L), Hash.empty),
TransactionSalt(0L)
),
srcKey
)
}

def incrementalCurrencySnapshot[F[_]: Sync: HasherSelector](
ordinal: NonNegLong,
height: NonNegLong,
Expand Down
Loading
Loading