-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adopt snapshot streaming to handle snapshot fee balances #65
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One question about zeroing out some balances as I am not super familiar with the nuances of the implementation. Otherwise, seems straightforward enough, 🔥
@@ -135,7 +155,8 @@ object SnapshotProcessor { | |||
case Some(last) if Validator.isNextSnapshot(last, snapshot.signed.value) => | |||
s3DAO.uploadSnapshot(snapshot) >> | |||
prepareAndExecuteBulkUpdate(state, hasher) >> | |||
lastIncrementalGlobalSnapshotStorage.set(snapshot, snapshotInfo) | |||
lastIncrementalGlobalSnapshotStorage.set(snapshot, snapshotInfo) >> | |||
setLastCurrencySnapshots(globalSnapshotWithState.currencySnapshots, hasher) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this skip updating the last currency storage if there is an error encountered during the global snapshot step (i.e. is this atomic)? Or is that not a concern in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I think it is safer to revert the order and set currency snapshots first 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this but actually this way or reverted does not matter. In case of failure, we would end up with partial data indexed anyway. If we check the snapshot chain, it won't allow setting the same snapshot twice, so it won't be possible to reindex missing data automatically. Am I right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really @marcinwadon
If saving global succeeds but saving currencies fails then currencies are stuck forever because there will be a "gap" in the chain in currency snapshots after the process restart. Of course, until you manually fix the last currency snapshots or override the global one to the one that failed for currencies.
If we reverse the order then if saving currencies succeeds but global fails then after the restart we do not need to take any special actions. The currencies in the worst case will be indexed twice but at least the chain will be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we check the chain validity for currency snapshots too 🤔 If not, then you're right
} | ||
|
||
val explicitlyZeroed = { | ||
val srcTransactions = extractSnapshotReferredAddresses(snapshot).source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is not obvious to me why these accounts should be zeroed but I assume it is a detail about the currency snapshots. May be worth a comment sometime in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The state (aka SnapshotInfo
) is created by applying incremental snapshots onto the initial state.
One of the optimizations is that if you have an account A
which has for example 1000 DAG
and there is a transaction A -> B
for 1000 DAG
then the balance for A
gets deleted from Map
instead of zeroed.
In the case of node, it is fine but here in block-explorer we want it to explicitly be 0
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I'm not 100% familiar with snapshot streaming code but your PR seems pretty clear and functional. Just a nitpick, since 2.3.0 we don't have the module sdk
explicitly, this module was refactored to node-shared
, so maybe you can change the dependency from sdk
to node-shared
in Dependencies.scala. Another point is, that this is not critical but, maybe we can use tessellation 2.7.1, instead of 2.7.0.
|
||
val explicitlyZeroed = { | ||
val srcTransactions = extractSnapshotReferredAddresses(snapshot).source | ||
(srcTransactions -- info.balances.keys.toSet) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to construct a set
(srcTransactions -- info.balances.keys.toSet) | |
(srcTransactions -- info.balances.keys) |
Also, you can avoid unnecessary object creation by replacing Balance(0L)
below with Balance.empty
.drain | ||
case Some(last) => | ||
MonadThrow[F].raiseError[Unit]( | ||
new Throwable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new Throwable( | |
new IllegalStateException( |
) | ||
case None => | ||
MonadThrow[F].raiseError[Unit]( | ||
new Throwable("Previous snapshot not found when setting next global snapshot!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new Throwable("Previous snapshot not found when setting next global snapshot!") | |
new IllegalStateException("Previous snapshot not found when setting next global snapshot!") |
def getCombinedStream: Stream[F, Option[(Hashed[S], SI)]] = | ||
??? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't look like it's used anywhere. Can it be removed?
Also, can the caller do this if they want to by doing Stream.eval(getCombined)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it was like that before my refactorings. We reuse the org.tessellation.node.shared.domain.snapshot.storage.LastSnapshotStorage
on both snapshot-streaming
and core
/node
. In core
/node
it is used and implemented but here someone decided to make a shortcut by not implementing it as it is not used at all. That being said we can't remove it but we can provide a dummy implementation of Stream.eval(getCombined)
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I think Stream.eval(getCombined)
is an invalid implementation so it's better to provide nothing (???
) rather than invalid one. If someone would try to use it, one will clearly see a not implemented error instead of figuring out what is wrong in the code. Otherwise, we need to provide the correct implementation that will stream elements and won't finish after the first emit like Stream.eval(getCombined)
, but I see no point for doing it in here.
.map(_.isValid) | ||
.flatMap(new Throwable("State proof doesn't match!").raiseError[F, Unit].unlessA) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we simply to this? The invalid already has a similar message and it also provides the ordinal and hash
.map(_.isValid) | |
.flatMap(new Throwable("State proof doesn't match!").raiseError[F, Unit].unlessA) | |
.flatMap(Async[F].fromValidated) |
object FileBasedLastIncrementalCurrencySnapshotStorage { | ||
|
||
def make[F[_]: Async: Hasher](path: Path): F[FileBasedLastIncrementalCurrencySnapshotStorage[F]] = | ||
Semaphore[F](1).map { semaphore => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused about the use of the semaphore:
- If it is hard-coded to 1 then why not use a mutex? Is the intent to allow multiple concurrent fibers some time in the future?
- Maybe I missed something but it looks like
make
is called in a purely local context; doesn't that mean there can only be the one (current) fiber using it anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, someone used a Semaphore
with a single permit which is technically a Mutex. I think we can switch to Mutex
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sdavidp Semaphore[F](1)
is a creation of the semaphore. So yes, there is only one fiber that would create a semaphore. but the semaphore itself can be used by multiple threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@marcinwadon the semaphore(1) (or a mutex) can only be used by other fibers if it's exposed, which this one is not. It can have an effect if multiple fibers use the object returned by make
but I wanted to point out that in a couple cases that is not happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see it's being used by set
or setInitial
, so if those methods can be run from multiple places, it can be used from multiple threads, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@marcinwadon
Case 1: serializes access, only one fiber uses store at a time
store <- StorageUsingMutex.make
_ <- fiber_1_sets_storage(store)
_ <- fiber_2_sets_storage(store)
Case 2: mutex has no effect because each fiber has its own mutex, so no one else sees/uses it
_ <- fiber_1_sets_storage(StorageUsingMutex.make)
_ <- fiber_2_sets_storage(StorageUsingMutex.make)
I was pointing out that Case 2 is what's happening in LastCurrencySnapshotStorage
when using FileBasedLastIncrementalCurrencySnapshotStorage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sdavidp I have made the mutex a singleton for LastCurrencySnapshotStorage
and I am just passing it to temp storages. So only one currency snapshot can be proceeded at once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So only one currency snapshot can be proceeded at once.
@kpudlik I'm not certain this is true. Isn't my Case 2 an example of what we're doing? If so, do you agree that the mutex has no effect for external calls to set
? Or is the intention to serialize the stream inside set
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is like that currently so a little bit different than case 2 because indeed case 2 was incorrect (mutex was not a singleton):
// make method of Singleton storage
signletonMutex <- Mutex[F]
// set method of non-Singleton sub-storages
_ <- fiber_1_sets_storage(StorageUsingMutex.make(signletonMutex))
_ <- fiber_2_sets_storage(StorageUsingMutex.make(signletonMutex))
So even if we keep creating non-singleton storages then we reuse the mutex. Shouldn't it make sure that the 2 or more storages are not calling set
at the same time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kpudlik I did another pull and now I see your changes. Yes, for LastCurrencySnapshotStorage.set
since the mutex is now passed into FileBasedLastIncrementalCurrencySnapshotStorage.make
that will work. 👍
Just in case it's important, that's not happening for getLastBalances
-- it's still using its own mutex so no mutual exclusion there.
.map(_.isValid) | ||
.flatMap(new Throwable("State proof doesn't match!").raiseError[F, Unit].unlessA) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar refactor suggestion as earlier: flatMap to fromValidated ?
def make[F[_]: Async: Files: KryoSerializer: HasherSelector]( | ||
path: Path | ||
): F[LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo]] = | ||
Semaphore[F](1).map { semaphore => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here as earlier WRT using a mutex; in this case I can see the storage is passed to another service.
FYI I had to use 2.7.1 so I could look at the code. |
f68843f
61267f8
to
cce27f2
Compare
@IPadawans @sdavidp The dependency to Indeed, the old sdk has been refactored to node-shared, but we now have the sdk module that serves as the protocol's SDK. It should re-export all internal algorithms required to build metagraphs, etc. Hence, it should be the only module someone needs to add as a dependency. Relying on node-shared or any other internal module may be error-prone, as we won't maintain backward compatibility for our internal modules. The same holds for snapshot streaming. Since the sdk module doesn't re-export everything (a plan for the future), we can use the sdk dependency to access internal modules. However, we need to keep the above considerations in mind. |
No description provided.