Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Use COPY to speed up ClaimableBalanceChangeProcessor #5104

Merged
83 changes: 38 additions & 45 deletions services/horizon/internal/actions/claimable_balance_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package actions

import (
"database/sql"
"net/http/httptest"
"testing"

Expand All @@ -18,7 +19,12 @@ func TestGetClaimableBalanceByID(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &history.Q{tt.HorizonSession()}
q := &history.Q{SessionInterface: tt.HorizonSession()}

tt.Assert.NoError(q.BeginTx(tt.Ctx, &sql.TxOptions{}))
defer func() {
_ = q.Rollback()
}()

accountID := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"
asset := xdr.MustNewCreditAsset("USD", accountID)
Expand All @@ -43,20 +49,22 @@ func TestGetClaimableBalanceByID(t *testing.T) {
LastModifiedLedger: 123,
}

err = q.UpsertClaimableBalances(tt.Ctx, []history.ClaimableBalance{cBalance})
tt.Assert.NoError(err)
balanceInsertBuilder := q.NewClaimableBalanceBatchInsertBuilder()
tt.Assert.NoError(balanceInsertBuilder.Add(cBalance))

claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10)
claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder()
for _, claimant := range cBalance.Claimants {
claimant := history.ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cBalance.LastModifiedLedger,
}
err = claimantsInsertBuilder.Add(tt.Ctx, claimant)
tt.Assert.NoError(err)
tt.Assert.NoError(claimantsInsertBuilder.Add(claimant))
}

tt.Assert.NoError(balanceInsertBuilder.Exec(tt.Ctx))
tt.Assert.NoError(claimantsInsertBuilder.Exec(tt.Ctx))

handler := GetClaimableBalanceByIDHandler{}
response, err := handler.GetResource(httptest.NewRecorder(), makeRequest(
t,
Expand Down Expand Up @@ -148,6 +156,11 @@ func TestGetClaimableBalances(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &history.Q{tt.HorizonSession()}

tt.Assert.NoError(q.BeginTx(tt.Ctx, &sql.TxOptions{}))
defer func() {
_ = q.Rollback()
}()

entriesMeta := []struct {
id xdr.Hash
accountID string
Expand Down Expand Up @@ -187,25 +200,25 @@ func TestGetClaimableBalances(t *testing.T) {
hCBs = append(hCBs, cb)
}

err := q.UpsertClaimableBalances(tt.Ctx, hCBs)
tt.Assert.NoError(err)
balanceInsertbuilder := q.NewClaimableBalanceBatchInsertBuilder()

claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10)
claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder()

for _, cBalance := range hCBs {
tt.Assert.NoError(balanceInsertbuilder.Add(cBalance))

for _, claimant := range cBalance.Claimants {
claimant := history.ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cBalance.LastModifiedLedger,
}
err = claimantsInsertBuilder.Add(tt.Ctx, claimant)
tt.Assert.NoError(err)
tt.Assert.NoError(claimantsInsertBuilder.Add(claimant))
}
}

err = claimantsInsertBuilder.Exec(tt.Ctx)
tt.Assert.NoError(err)
tt.Assert.NoError(balanceInsertbuilder.Exec(tt.Ctx))
tt.Assert.NoError(claimantsInsertBuilder.Exec(tt.Ctx))

handler := GetClaimableBalancesHandler{}
response, err := handler.GetResourcePage(httptest.NewRecorder(), makeRequest(
Expand Down Expand Up @@ -284,11 +297,9 @@ func TestGetClaimableBalances(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Len(response, 0)

// new claimable balances are ingest and one of them updated, they should appear in the next pages
cbToBeUpdated := hCBs[3]
cbToBeUpdated.Sponsor = null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML")
cbToBeUpdated.LastModifiedLedger = 1238
q.UpsertClaimableBalances(tt.Ctx, []history.ClaimableBalance{cbToBeUpdated})
// new claimable balances are ingested, they should appear in the next pages
balanceInsertbuilder = q.NewClaimableBalanceBatchInsertBuilder()
claimantsInsertBuilder = q.NewClaimableBalanceClaimantBatchInsertBuilder()

entriesMeta = []struct {
id xdr.Hash
Expand All @@ -310,29 +321,26 @@ func TestGetClaimableBalances(t *testing.T) {
},
}

hCBs = nil
for _, e := range entriesMeta {
entry := buildClaimableBalance(tt, e.id, e.accountID, e.ledger, e.asset)
hCBs = append(hCBs, entry)
}

err = q.UpsertClaimableBalances(tt.Ctx, hCBs)
tt.Assert.NoError(err)
for _, cBalance := range hCBs[4:] {
tt.Assert.NoError(balanceInsertbuilder.Add(cBalance))

for _, cBalance := range hCBs {
for _, claimant := range cBalance.Claimants {
claimant := history.ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cBalance.LastModifiedLedger,
}
err = claimantsInsertBuilder.Add(tt.Ctx, claimant)
tt.Assert.NoError(err)
tt.Assert.NoError(claimantsInsertBuilder.Add(claimant))
}
}

err = claimantsInsertBuilder.Exec(tt.Ctx)
tt.Assert.NoError(err)
tt.Assert.NoError(balanceInsertbuilder.Exec(tt.Ctx))
tt.Assert.NoError(claimantsInsertBuilder.Exec(tt.Ctx))

response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest(
t,
Expand All @@ -358,7 +366,7 @@ func TestGetClaimableBalances(t *testing.T) {
tt.Assert.Len(response, 2)

// response should be the first 2 elements of entries
for i, entry := range hCBs {
for i, entry := range hCBs[4:] {
tt.Assert.Equal(entry.BalanceID, response[i].(protocol.ClaimableBalance).BalanceID)
}

Expand All @@ -372,21 +380,6 @@ func TestGetClaimableBalances(t *testing.T) {
q,
))

tt.Assert.NoError(err)
tt.Assert.Len(response, 1)

tt.Assert.Equal(cbToBeUpdated.BalanceID, response[0].(protocol.ClaimableBalance).BalanceID)

response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest(
t,
map[string]string{
"limit": "2",
"cursor": response[0].(protocol.ClaimableBalance).PagingToken(),
},
map[string]string{},
q,
))

tt.Assert.NoError(err)
tt.Assert.Len(response, 0)

Expand All @@ -404,9 +397,9 @@ func TestGetClaimableBalances(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Len(response, 2)

tt.Assert.Equal(cbToBeUpdated.BalanceID, response[0].(protocol.ClaimableBalance).BalanceID)
tt.Assert.Equal(hCBs[5].BalanceID, response[0].(protocol.ClaimableBalance).BalanceID)

tt.Assert.Equal(hCBs[1].BalanceID, response[1].(protocol.ClaimableBalance).BalanceID)
tt.Assert.Equal(hCBs[4].BalanceID, response[1].(protocol.ClaimableBalance).BalanceID)

response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest(
t,
Expand Down Expand Up @@ -492,7 +485,7 @@ func TestGetClaimableBalances(t *testing.T) {
))

tt.Assert.NoError(err)
tt.Assert.Len(response, 3)
tt.Assert.Len(response, 2)
for _, resource := range response {
tt.Assert.Equal(
"GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package history

Check failure on line 1 in services/horizon/internal/db2/history/claimable_balance_batch_insert_builder.go

View workflow job for this annotation

GitHub Actions / golangci

1-40 lines are duplicate of `services/horizon/internal/db2/history/claimable_balance_claimant_batch_insert_builder.go:1-40` (dupl)

import (
"context"

"github.com/stellar/go/support/db"
)

// ClaimableBalanceBatchInsertBuilder is used to insert claimable balance into the
// claimable_balances table
type ClaimableBalanceBatchInsertBuilder interface {
Add(claimableBalance ClaimableBalance) error
Exec(ctx context.Context) error
}

// ClaimableBalanceBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder
type claimableBalanceBatchInsertBuilder struct {
session db.SessionInterface
builder db.FastBatchInsertBuilder
table string
}

// NewClaimableBalanceBatchInsertBuilder constructs a new ClaimableBalanceBatchInsertBuilder instance
func (q *Q) NewClaimableBalanceBatchInsertBuilder() ClaimableBalanceBatchInsertBuilder {
return &claimableBalanceBatchInsertBuilder{
session: q,
builder: db.FastBatchInsertBuilder{},
table: "claimable_balances",
}
}

// Add adds a new claimable balance to the batch
func (i *claimableBalanceBatchInsertBuilder) Add(claimableBalance ClaimableBalance) error {
return i.builder.RowStruct(claimableBalance)
}

// Exec writes the batch of claimable balances to the database.
func (i *claimableBalanceBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx, i.session, i.table)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,37 @@ import (
"context"

"github.com/stellar/go/support/db"
"github.com/stellar/go/xdr"
)

// ClaimableBalanceClaimantBatchInsertBuilder is used to insert transactions into the
// history_transactions table
// ClaimableBalanceClaimantBatchInsertBuilder is used to insert claimants into the
// claimable_balance_claimants table
type ClaimableBalanceClaimantBatchInsertBuilder interface {
Add(ctx context.Context, claimableBalanceClaimant ClaimableBalanceClaimant) error
Add(claimableBalanceClaimant ClaimableBalanceClaimant) error
Exec(ctx context.Context) error
}

// ClaimableBalanceClaimantBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder
// ClaimableBalanceClaimantBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder
type claimableBalanceClaimantBatchInsertBuilder struct {
encodingBuffer *xdr.EncodingBuffer
builder db.BatchInsertBuilder
session db.SessionInterface
builder db.FastBatchInsertBuilder
table string
}

// NewClaimableBalanceClaimantBatchInsertBuilder constructs a new ClaimableBalanceClaimantBatchInsertBuilder instance
func (q *Q) NewClaimableBalanceClaimantBatchInsertBuilder(maxBatchSize int) ClaimableBalanceClaimantBatchInsertBuilder {
func (q *Q) NewClaimableBalanceClaimantBatchInsertBuilder() ClaimableBalanceClaimantBatchInsertBuilder {
return &claimableBalanceClaimantBatchInsertBuilder{
encodingBuffer: xdr.NewEncodingBuffer(),
builder: db.BatchInsertBuilder{
Table: q.GetTable("claimable_balance_claimants"),
MaxBatchSize: maxBatchSize,
Suffix: "ON CONFLICT (id, destination) DO UPDATE SET last_modified_ledger=EXCLUDED.last_modified_ledger",
},
session: q,
builder: db.FastBatchInsertBuilder{},
table: "claimable_balance_claimants",
}
}

// Add adds a new transaction to the batch
func (i *claimableBalanceClaimantBatchInsertBuilder) Add(ctx context.Context, claimableBalanceClaimant ClaimableBalanceClaimant) error {
return i.builder.RowStruct(ctx, claimableBalanceClaimant)
// Add adds a new claimant to the batch
func (i *claimableBalanceClaimantBatchInsertBuilder) Add(claimableBalanceClaimant ClaimableBalanceClaimant) error {
return i.builder.RowStruct(claimableBalanceClaimant)
}

// Exec writes the batch of claimants to the database.
func (i *claimableBalanceClaimantBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx)
return i.builder.Exec(ctx, i.session, i.table)
}
40 changes: 7 additions & 33 deletions services/horizon/internal/db2/history/claimable_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ type ClaimableBalance struct {
type Claimants []Claimant

func (c Claimants) Value() (driver.Value, error) {
return json.Marshal(c)
// Convert the byte array into a string as a workaround to bypass buggy encoding in the pq driver
// (More info about this bug here https://github.com/stellar/go/issues/5086#issuecomment-1773215436).
// By doing so, the data will be written as a string rather than hex encoded bytes.
val, err := json.Marshal(c)
return string(val), err
}

func (c *Claimants) Scan(value interface{}) error {
Expand All @@ -126,12 +130,12 @@ type Claimant struct {

// QClaimableBalances defines claimable-balance-related related queries.
type QClaimableBalances interface {
UpsertClaimableBalances(ctx context.Context, cb []ClaimableBalance) error
RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error)
RemoveClaimableBalanceClaimants(ctx context.Context, ids []string) (int64, error)
GetClaimableBalancesByID(ctx context.Context, ids []string) ([]ClaimableBalance, error)
CountClaimableBalances(ctx context.Context) (int, error)
NewClaimableBalanceClaimantBatchInsertBuilder(maxBatchSize int) ClaimableBalanceClaimantBatchInsertBuilder
NewClaimableBalanceClaimantBatchInsertBuilder() ClaimableBalanceClaimantBatchInsertBuilder
NewClaimableBalanceBatchInsertBuilder() ClaimableBalanceBatchInsertBuilder
GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (map[string][]ClaimableBalanceClaimant, error)
}

Expand Down Expand Up @@ -171,36 +175,6 @@ func (q *Q) GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (
return claimantsMap, err
}

// UpsertClaimableBalances upserts a batch of claimable balances in the claimable_balances table.
// There's currently no limit of the number of offers this method can
// accept other than 2GB limit of the query string length what should be enough
// for each ledger with the current limits.
func (q *Q) UpsertClaimableBalances(ctx context.Context, cbs []ClaimableBalance) error {
var id, claimants, asset, amount, sponsor, lastModifiedLedger, flags []interface{}

for _, cb := range cbs {
id = append(id, cb.BalanceID)
claimants = append(claimants, cb.Claimants)
asset = append(asset, cb.Asset)
amount = append(amount, cb.Amount)
sponsor = append(sponsor, cb.Sponsor)
lastModifiedLedger = append(lastModifiedLedger, cb.LastModifiedLedger)
flags = append(flags, cb.Flags)
}

upsertFields := []upsertField{
{"id", "text", id},
{"claimants", "jsonb", claimants},
{"asset", "text", asset},
{"amount", "bigint", amount},
{"sponsor", "text", sponsor},
{"last_modified_ledger", "integer", lastModifiedLedger},
{"flags", "int", flags},
}

return q.upsertRows(ctx, "claimable_balances", "id", upsertFields)
}

// RemoveClaimableBalances deletes claimable balances table.
// Returns number of rows affected and error.
func (q *Q) RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error) {
Expand Down
Loading
Loading