Skip to content

Commit

Permalink
services/horizon: Use COPY to speed up ClaimableBalanceChangeProcessor (
Browse files Browse the repository at this point in the history
  • Loading branch information
urvisavla authored Nov 7, 2023
1 parent f849153 commit b1a8868
Show file tree
Hide file tree
Showing 12 changed files with 362 additions and 407 deletions.
83 changes: 38 additions & 45 deletions services/horizon/internal/actions/claimable_balance_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package actions

import (
"database/sql"
"net/http/httptest"
"testing"

Expand All @@ -18,7 +19,12 @@ func TestGetClaimableBalanceByID(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &history.Q{tt.HorizonSession()}
q := &history.Q{SessionInterface: tt.HorizonSession()}

tt.Assert.NoError(q.BeginTx(tt.Ctx, &sql.TxOptions{}))
defer func() {
_ = q.Rollback()
}()

accountID := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"
asset := xdr.MustNewCreditAsset("USD", accountID)
Expand All @@ -43,20 +49,22 @@ func TestGetClaimableBalanceByID(t *testing.T) {
LastModifiedLedger: 123,
}

err = q.UpsertClaimableBalances(tt.Ctx, []history.ClaimableBalance{cBalance})
tt.Assert.NoError(err)
balanceInsertBuilder := q.NewClaimableBalanceBatchInsertBuilder()
tt.Assert.NoError(balanceInsertBuilder.Add(cBalance))

claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10)
claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder()
for _, claimant := range cBalance.Claimants {
claimant := history.ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cBalance.LastModifiedLedger,
}
err = claimantsInsertBuilder.Add(tt.Ctx, claimant)
tt.Assert.NoError(err)
tt.Assert.NoError(claimantsInsertBuilder.Add(claimant))
}

tt.Assert.NoError(balanceInsertBuilder.Exec(tt.Ctx))
tt.Assert.NoError(claimantsInsertBuilder.Exec(tt.Ctx))

handler := GetClaimableBalanceByIDHandler{}
response, err := handler.GetResource(httptest.NewRecorder(), makeRequest(
t,
Expand Down Expand Up @@ -148,6 +156,11 @@ func TestGetClaimableBalances(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &history.Q{tt.HorizonSession()}

tt.Assert.NoError(q.BeginTx(tt.Ctx, &sql.TxOptions{}))
defer func() {
_ = q.Rollback()
}()

entriesMeta := []struct {
id xdr.Hash
accountID string
Expand Down Expand Up @@ -187,25 +200,25 @@ func TestGetClaimableBalances(t *testing.T) {
hCBs = append(hCBs, cb)
}

err := q.UpsertClaimableBalances(tt.Ctx, hCBs)
tt.Assert.NoError(err)
balanceInsertbuilder := q.NewClaimableBalanceBatchInsertBuilder()

claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10)
claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder()

for _, cBalance := range hCBs {
tt.Assert.NoError(balanceInsertbuilder.Add(cBalance))

for _, claimant := range cBalance.Claimants {
claimant := history.ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cBalance.LastModifiedLedger,
}
err = claimantsInsertBuilder.Add(tt.Ctx, claimant)
tt.Assert.NoError(err)
tt.Assert.NoError(claimantsInsertBuilder.Add(claimant))
}
}

err = claimantsInsertBuilder.Exec(tt.Ctx)
tt.Assert.NoError(err)
tt.Assert.NoError(balanceInsertbuilder.Exec(tt.Ctx))
tt.Assert.NoError(claimantsInsertBuilder.Exec(tt.Ctx))

handler := GetClaimableBalancesHandler{}
response, err := handler.GetResourcePage(httptest.NewRecorder(), makeRequest(
Expand Down Expand Up @@ -284,11 +297,9 @@ func TestGetClaimableBalances(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Len(response, 0)

// new claimable balances are ingest and one of them updated, they should appear in the next pages
cbToBeUpdated := hCBs[3]
cbToBeUpdated.Sponsor = null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML")
cbToBeUpdated.LastModifiedLedger = 1238
q.UpsertClaimableBalances(tt.Ctx, []history.ClaimableBalance{cbToBeUpdated})
// new claimable balances are ingested, they should appear in the next pages
balanceInsertbuilder = q.NewClaimableBalanceBatchInsertBuilder()
claimantsInsertBuilder = q.NewClaimableBalanceClaimantBatchInsertBuilder()

entriesMeta = []struct {
id xdr.Hash
Expand All @@ -310,29 +321,26 @@ func TestGetClaimableBalances(t *testing.T) {
},
}

hCBs = nil
for _, e := range entriesMeta {
entry := buildClaimableBalance(tt, e.id, e.accountID, e.ledger, e.asset)
hCBs = append(hCBs, entry)
}

err = q.UpsertClaimableBalances(tt.Ctx, hCBs)
tt.Assert.NoError(err)
for _, cBalance := range hCBs[4:] {
tt.Assert.NoError(balanceInsertbuilder.Add(cBalance))

for _, cBalance := range hCBs {
for _, claimant := range cBalance.Claimants {
claimant := history.ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cBalance.LastModifiedLedger,
}
err = claimantsInsertBuilder.Add(tt.Ctx, claimant)
tt.Assert.NoError(err)
tt.Assert.NoError(claimantsInsertBuilder.Add(claimant))
}
}

err = claimantsInsertBuilder.Exec(tt.Ctx)
tt.Assert.NoError(err)
tt.Assert.NoError(balanceInsertbuilder.Exec(tt.Ctx))
tt.Assert.NoError(claimantsInsertBuilder.Exec(tt.Ctx))

response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest(
t,
Expand All @@ -358,7 +366,7 @@ func TestGetClaimableBalances(t *testing.T) {
tt.Assert.Len(response, 2)

// response should be the first 2 elements of entries
for i, entry := range hCBs {
for i, entry := range hCBs[4:] {
tt.Assert.Equal(entry.BalanceID, response[i].(protocol.ClaimableBalance).BalanceID)
}

Expand All @@ -372,21 +380,6 @@ func TestGetClaimableBalances(t *testing.T) {
q,
))

tt.Assert.NoError(err)
tt.Assert.Len(response, 1)

tt.Assert.Equal(cbToBeUpdated.BalanceID, response[0].(protocol.ClaimableBalance).BalanceID)

response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest(
t,
map[string]string{
"limit": "2",
"cursor": response[0].(protocol.ClaimableBalance).PagingToken(),
},
map[string]string{},
q,
))

tt.Assert.NoError(err)
tt.Assert.Len(response, 0)

Expand All @@ -404,9 +397,9 @@ func TestGetClaimableBalances(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Len(response, 2)

tt.Assert.Equal(cbToBeUpdated.BalanceID, response[0].(protocol.ClaimableBalance).BalanceID)
tt.Assert.Equal(hCBs[5].BalanceID, response[0].(protocol.ClaimableBalance).BalanceID)

tt.Assert.Equal(hCBs[1].BalanceID, response[1].(protocol.ClaimableBalance).BalanceID)
tt.Assert.Equal(hCBs[4].BalanceID, response[1].(protocol.ClaimableBalance).BalanceID)

response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest(
t,
Expand Down Expand Up @@ -492,7 +485,7 @@ func TestGetClaimableBalances(t *testing.T) {
))

tt.Assert.NoError(err)
tt.Assert.Len(response, 3)
tt.Assert.Len(response, 2)
for _, resource := range response {
tt.Assert.Equal(
"GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package history

import (
"context"

"github.com/stellar/go/support/db"
)

// ClaimableBalanceBatchInsertBuilder is used to insert claimable balance into the
// claimable_balances table
type ClaimableBalanceBatchInsertBuilder interface {
Add(claimableBalance ClaimableBalance) error
Exec(ctx context.Context) error
}

// ClaimableBalanceBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder
type claimableBalanceBatchInsertBuilder struct {
session db.SessionInterface
builder db.FastBatchInsertBuilder
table string
}

// NewClaimableBalanceBatchInsertBuilder constructs a new ClaimableBalanceBatchInsertBuilder instance
func (q *Q) NewClaimableBalanceBatchInsertBuilder() ClaimableBalanceBatchInsertBuilder {
return &claimableBalanceBatchInsertBuilder{
session: q,
builder: db.FastBatchInsertBuilder{},
table: "claimable_balances",
}
}

// Add adds a new claimable balance to the batch
func (i *claimableBalanceBatchInsertBuilder) Add(claimableBalance ClaimableBalance) error {
return i.builder.RowStruct(claimableBalance)
}

// Exec writes the batch of claimable balances to the database.
func (i *claimableBalanceBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx, i.session, i.table)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,37 @@ import (
"context"

"github.com/stellar/go/support/db"
"github.com/stellar/go/xdr"
)

// ClaimableBalanceClaimantBatchInsertBuilder is used to insert transactions into the
// history_transactions table
// ClaimableBalanceClaimantBatchInsertBuilder is used to insert claimants into the
// claimable_balance_claimants table
type ClaimableBalanceClaimantBatchInsertBuilder interface {
Add(ctx context.Context, claimableBalanceClaimant ClaimableBalanceClaimant) error
Add(claimableBalanceClaimant ClaimableBalanceClaimant) error
Exec(ctx context.Context) error
}

// ClaimableBalanceClaimantBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder
// ClaimableBalanceClaimantBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder
type claimableBalanceClaimantBatchInsertBuilder struct {
encodingBuffer *xdr.EncodingBuffer
builder db.BatchInsertBuilder
session db.SessionInterface
builder db.FastBatchInsertBuilder
table string
}

// NewClaimableBalanceClaimantBatchInsertBuilder constructs a new ClaimableBalanceClaimantBatchInsertBuilder instance
func (q *Q) NewClaimableBalanceClaimantBatchInsertBuilder(maxBatchSize int) ClaimableBalanceClaimantBatchInsertBuilder {
func (q *Q) NewClaimableBalanceClaimantBatchInsertBuilder() ClaimableBalanceClaimantBatchInsertBuilder {
return &claimableBalanceClaimantBatchInsertBuilder{
encodingBuffer: xdr.NewEncodingBuffer(),
builder: db.BatchInsertBuilder{
Table: q.GetTable("claimable_balance_claimants"),
MaxBatchSize: maxBatchSize,
Suffix: "ON CONFLICT (id, destination) DO UPDATE SET last_modified_ledger=EXCLUDED.last_modified_ledger",
},
session: q,
builder: db.FastBatchInsertBuilder{},
table: "claimable_balance_claimants",
}
}

// Add adds a new transaction to the batch
func (i *claimableBalanceClaimantBatchInsertBuilder) Add(ctx context.Context, claimableBalanceClaimant ClaimableBalanceClaimant) error {
return i.builder.RowStruct(ctx, claimableBalanceClaimant)
// Add adds a new claimant to the batch
func (i *claimableBalanceClaimantBatchInsertBuilder) Add(claimableBalanceClaimant ClaimableBalanceClaimant) error {
return i.builder.RowStruct(claimableBalanceClaimant)
}

// Exec writes the batch of claimants to the database.
func (i *claimableBalanceClaimantBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx)
return i.builder.Exec(ctx, i.session, i.table)
}
40 changes: 7 additions & 33 deletions services/horizon/internal/db2/history/claimable_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ type ClaimableBalance struct {
type Claimants []Claimant

func (c Claimants) Value() (driver.Value, error) {
return json.Marshal(c)
// Convert the byte array into a string as a workaround to bypass buggy encoding in the pq driver
// (More info about this bug here https://github.com/stellar/go/issues/5086#issuecomment-1773215436).
// By doing so, the data will be written as a string rather than hex encoded bytes.
val, err := json.Marshal(c)
return string(val), err
}

func (c *Claimants) Scan(value interface{}) error {
Expand All @@ -126,12 +130,12 @@ type Claimant struct {

// QClaimableBalances defines claimable-balance-related related queries.
type QClaimableBalances interface {
UpsertClaimableBalances(ctx context.Context, cb []ClaimableBalance) error
RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error)
RemoveClaimableBalanceClaimants(ctx context.Context, ids []string) (int64, error)
GetClaimableBalancesByID(ctx context.Context, ids []string) ([]ClaimableBalance, error)
CountClaimableBalances(ctx context.Context) (int, error)
NewClaimableBalanceClaimantBatchInsertBuilder(maxBatchSize int) ClaimableBalanceClaimantBatchInsertBuilder
NewClaimableBalanceClaimantBatchInsertBuilder() ClaimableBalanceClaimantBatchInsertBuilder
NewClaimableBalanceBatchInsertBuilder() ClaimableBalanceBatchInsertBuilder
GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (map[string][]ClaimableBalanceClaimant, error)
}

Expand Down Expand Up @@ -171,36 +175,6 @@ func (q *Q) GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (
return claimantsMap, err
}

// UpsertClaimableBalances upserts a batch of claimable balances in the claimable_balances table.
// There's currently no limit of the number of offers this method can
// accept other than 2GB limit of the query string length what should be enough
// for each ledger with the current limits.
func (q *Q) UpsertClaimableBalances(ctx context.Context, cbs []ClaimableBalance) error {
var id, claimants, asset, amount, sponsor, lastModifiedLedger, flags []interface{}

for _, cb := range cbs {
id = append(id, cb.BalanceID)
claimants = append(claimants, cb.Claimants)
asset = append(asset, cb.Asset)
amount = append(amount, cb.Amount)
sponsor = append(sponsor, cb.Sponsor)
lastModifiedLedger = append(lastModifiedLedger, cb.LastModifiedLedger)
flags = append(flags, cb.Flags)
}

upsertFields := []upsertField{
{"id", "text", id},
{"claimants", "jsonb", claimants},
{"asset", "text", asset},
{"amount", "bigint", amount},
{"sponsor", "text", sponsor},
{"last_modified_ledger", "integer", lastModifiedLedger},
{"flags", "int", flags},
}

return q.upsertRows(ctx, "claimable_balances", "id", upsertFields)
}

// RemoveClaimableBalances deletes claimable balances table.
// Returns number of rows affected and error.
func (q *Q) RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error) {
Expand Down
Loading

0 comments on commit b1a8868

Please sign in to comment.