Skip to content

Commit

Permalink
Refactor the code to reduce duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
urvisavla committed Nov 9, 2023
1 parent 1ad4718 commit cf80c34
Showing 1 changed file with 67 additions and 76 deletions.
143 changes: 67 additions & 76 deletions services/horizon/internal/ingest/processors/signers_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,13 @@ func (p *SignersProcessor) ProcessChange(ctx context.Context, change ingest.Chan
return nil
}

if !(change.Pre == nil && change.Post != nil) {
return errors.New("AssetStatsProSignersProcessorcessor is in insert only mode")
}

accountEntry := change.Post.Data.MustAccount()
account := accountEntry.AccountId.Address()

sponsors := accountEntry.SponsorPerSigner()
for signer, weight := range accountEntry.SignerSummary() {
var sponsor null.String
if sponsorDesc, isSponsored := sponsors[signer]; isSponsored {
sponsor = null.StringFrom(sponsorDesc.Address())
}

err := p.batchInsertBuilder.Add(history.AccountSigner{
Account: account,
Signer: signer,
Weight: weight,
Sponsor: sponsor,
})
if err != nil {
return errors.Wrap(err, "Error adding row to accountSignerBatch")
if change.Pre == nil && change.Post != nil {
postAccountEntry := change.Post.Data.MustAccount()
if err := p.addSigners(postAccountEntry); err != nil {
return err
}
} else {
return errors.New("AssetStatsProSignersProcessor is in insert only mode")
}

return nil
Expand All @@ -87,64 +71,26 @@ func (p *SignersProcessor) ProcessChange(ctx context.Context, change ingest.Chan
func (p *SignersProcessor) Commit(ctx context.Context) error {
defer p.reset()

if !p.useLedgerEntryCache {
err := p.batchInsertBuilder.Exec(ctx)
if err != nil {
return errors.Wrap(err, "error executing AccountSignersBatchInsertBuilder")
}
return nil
}

changes := p.cache.GetChanges()
for _, change := range changes {
if !change.AccountSignersChanged() {
continue
}

// The code below removes all Pre signers adds Post signers but
// can be improved by finding a diff (check performance first).
if change.Pre != nil {
preAccountEntry := change.Pre.Data.MustAccount()
for signer := range preAccountEntry.SignerSummary() {
rowsAffected, err := p.signersQ.RemoveAccountSigner(ctx, preAccountEntry.AccountId.Address(), signer)
if err != nil {
return errors.Wrap(err, "Error removing a signer")
}

if rowsAffected != 1 {
return ingest.NewStateError(errors.Errorf(
"Expected account=%s signer=%s in database but not found when removing (rows affected = %d)",
preAccountEntry.AccountId.Address(),
signer,
rowsAffected,
))
}
if p.useLedgerEntryCache {
changes := p.cache.GetChanges()
for _, change := range changes {
if !change.AccountSignersChanged() {
continue
}
}

if change.Post != nil {
postAccountEntry := change.Post.Data.MustAccount()
sponsorsPerSigner := postAccountEntry.SponsorPerSigner()
for signer, weight := range postAccountEntry.SignerSummary() {

// Ignore master key
var sponsor *string
if signer != postAccountEntry.AccountId.Address() {
if s, ok := sponsorsPerSigner[signer]; ok {
a := s.Address()
sponsor = &a
}
// The code below removes all Pre signers adds Post signers but
// can be improved by finding a diff (check performance first).
if change.Pre != nil {
preAccountEntry := change.Pre.Data.MustAccount()
if err := p.removeSigners(ctx, preAccountEntry); err != nil {
return err
}
}

err := p.batchInsertBuilder.Add(history.AccountSigner{
Account: postAccountEntry.AccountId.Address(),
Signer: signer,
Weight: weight,
Sponsor: null.StringFromPtr(sponsor),
})
if err != nil {
return errors.Wrapf(err, "Error adding signer (%s) to AccountSignersBatchInsertBuilder",
signer)
if change.Post != nil {
postAccountEntry := change.Post.Data.MustAccount()
if err := p.addSigners(postAccountEntry); err != nil {
return err
}
}
}
Expand All @@ -154,5 +100,50 @@ func (p *SignersProcessor) Commit(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "error executing AccountSignersBatchInsertBuilder")
}

return nil
}

func (p *SignersProcessor) removeSigners(ctx context.Context, preAccountEntry xdr.AccountEntry) error {
for signer := range preAccountEntry.SignerSummary() {
rowsAffected, err := p.signersQ.RemoveAccountSigner(ctx, preAccountEntry.AccountId.Address(), signer)
if err != nil {
return errors.Wrap(err, "Error removing a signer")
}

if rowsAffected != 1 {
return ingest.NewStateError(errors.Errorf(
"Expected account=%s signer=%s in database but not found when removing (rows affected = %d)",
preAccountEntry.AccountId.Address(),
signer,
rowsAffected,
))
}
}
return nil
}

func (p *SignersProcessor) addSigners(accountEntry xdr.AccountEntry) error {
sponsorsPerSigner := accountEntry.SponsorPerSigner()
for signer, weight := range accountEntry.SignerSummary() {
// Ignore master key
var sponsor null.String
if signer != accountEntry.AccountId.Address() {
if sponsorDesc, isSponsored := sponsorsPerSigner[signer]; isSponsored {
sponsor = null.StringFrom(sponsorDesc.Address())
}
}

err := p.batchInsertBuilder.Add(history.AccountSigner{
Account: accountEntry.AccountId.Address(),
Signer: signer,
Weight: weight,
Sponsor: sponsor,
})
if err != nil {
return errors.Wrapf(err, "Error adding signer (%s) to AccountSignersBatchInsertBuilder",
signer)
}
}
return nil
}

0 comments on commit cf80c34

Please sign in to comment.