Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update SignersProcessor to use FastBatchInsertBuilder #5105

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions services/horizon/internal/db2/history/account_signers.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func (q *Q) AccountsForSigner(ctx context.Context, signer string, page db2.PageQ
// CreateAccountSigner creates a row in the accounts_signers table.
// Returns number of rows affected and error.
func (q *Q) CreateAccountSigner(ctx context.Context, account, signer string, weight int32, sponsor *string) (int64, error) {
// This function is not used in the ingestion code path. SignersProcessor now uses bulk insertion with
urvisavla marked this conversation as resolved.
Show resolved Hide resolved
// FastBatchInsertBuilder, but this function is used in unit tests when only a single row needs to be inserted.

sql := sq.Insert("accounts_signers").
Columns("account_id", "signer", "weight", "sponsor").
Values(account, signer, weight, sponsor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
)

func (i *accountSignersBatchInsertBuilder) Add(ctx context.Context, signer AccountSigner) error {
return i.builder.Row(ctx, map[string]interface{}{
func (i *accountSignersBatchInsertBuilder) Add(signer AccountSigner) error {
return i.builder.Row(map[string]interface{}{
"account_id": signer.Account,
"signer": signer.Signer,
"weight": signer.Weight,
Expand All @@ -14,5 +14,5 @@ func (i *accountSignersBatchInsertBuilder) Add(ctx context.Context, signer Accou
}

func (i *accountSignersBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx)
return i.builder.Exec(ctx, i.session, i.table)
}
17 changes: 9 additions & 8 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,15 @@ type AccountSigner struct {
}

type AccountSignersBatchInsertBuilder interface {
Add(ctx context.Context, signer AccountSigner) error
Add(signer AccountSigner) error
Exec(ctx context.Context) error
}

// accountSignersBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder
type accountSignersBatchInsertBuilder struct {
builder db.BatchInsertBuilder
builder db.FastBatchInsertBuilder
session db.SessionInterface
table string
}

// Data is a row of data from the `account_data` table
Expand Down Expand Up @@ -742,7 +744,7 @@ type QSigners interface {
GetLastLedgerIngest(ctx context.Context) (uint32, error)
UpdateLastLedgerIngest(ctx context.Context, ledgerSequence uint32) error
AccountsForSigner(ctx context.Context, signer string, page db2.PageQuery) ([]AccountSigner, error)
NewAccountSignersBatchInsertBuilder(maxBatchSize int) AccountSignersBatchInsertBuilder
NewAccountSignersBatchInsertBuilder() AccountSignersBatchInsertBuilder
CreateAccountSigner(ctx context.Context, account, signer string, weight int32, sponsor *string) (int64, error)
RemoveAccountSigner(ctx context.Context, account, signer string) (int64, error)
SignersForAccounts(ctx context.Context, accounts []string) ([]AccountSigner, error)
Expand Down Expand Up @@ -846,12 +848,11 @@ type QTrustLines interface {
RemoveTrustLines(ctx context.Context, ledgerKeys []string) (int64, error)
}

func (q *Q) NewAccountSignersBatchInsertBuilder(maxBatchSize int) AccountSignersBatchInsertBuilder {
func (q *Q) NewAccountSignersBatchInsertBuilder() AccountSignersBatchInsertBuilder {
return &accountSignersBatchInsertBuilder{
builder: db.BatchInsertBuilder{
Table: q.GetTable("accounts_signers"),
MaxBatchSize: maxBatchSize,
},
session: q,
builder: db.FastBatchInsertBuilder{},
table: "accounts_signers",
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ type MockAccountSignersBatchInsertBuilder struct {
mock.Mock
}

func (m *MockAccountSignersBatchInsertBuilder) Add(ctx context.Context, signer AccountSigner) error {
a := m.Called(ctx, signer)
func (m *MockAccountSignersBatchInsertBuilder) Add(signer AccountSigner) error {
a := m.Called(signer)
return a.Error(0)
}

Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/db2/history/mock_q_signers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func (m *MockQSigners) AccountsForSigner(ctx context.Context, signer string, pag
return a.Get(0).([]AccountSigner), a.Error(1)
}

func (m *MockQSigners) NewAccountSignersBatchInsertBuilder(maxBatchSize int) AccountSignersBatchInsertBuilder {
a := m.Called(maxBatchSize)
func (m *MockQSigners) NewAccountSignersBatchInsertBuilder() AccountSignersBatchInsertBuilder {
a := m.Called()
return a.Get(0).(AccountSignersBatchInsertBuilder)
}

Expand Down
29 changes: 13 additions & 16 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) {
ctx := context.Background()
maxBatchSize := 100000

mockSession := &db.MockSession{}

Expand All @@ -41,15 +40,15 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) {

mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder)
mockAccountSignersBatchInsertBuilder.On("Add", ctx, history.AccountSigner{
mockAccountSignersBatchInsertBuilder.On("Add", history.AccountSigner{
Account: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7",
Signer: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7",
Weight: 1,
Sponsor: null.String{},
}).Return(nil).Once()
mockAccountSignersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize).
Return(mockAccountSignersBatchInsertBuilder).Once()
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder").
Return(mockAccountSignersBatchInsertBuilder).Twice()

mockClaimableBalanceBatchInsertBuilder := &history.MockClaimableBalanceBatchInsertBuilder{}
q.MockQClaimableBalances.On("NewClaimableBalanceBatchInsertBuilder").
Expand Down Expand Up @@ -81,7 +80,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) {

func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) {
ctx := context.Background()
maxBatchSize := 100000

config := Config{
NetworkPassphrase: network.PublicNetworkPassphrase,
Expand Down Expand Up @@ -120,14 +118,14 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) {

mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder)
mockAccountSignersBatchInsertBuilder.On("Add", ctx, history.AccountSigner{
mockAccountSignersBatchInsertBuilder.On("Add", history.AccountSigner{
Account: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7",
Signer: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7",
Weight: 1,
}).Return(nil).Once()
mockAccountSignersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize).
Return(mockAccountSignersBatchInsertBuilder).Once()
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder").
Return(mockAccountSignersBatchInsertBuilder).Twice()

mockClaimableBalanceBatchInsertBuilder := &history.MockClaimableBalanceBatchInsertBuilder{}
q.MockQClaimableBalances.On("NewClaimableBalanceBatchInsertBuilder").
Expand Down Expand Up @@ -158,7 +156,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) {

func TestProcessorRunnerRunHistoryArchiveIngestionProtocolVersionNotSupported(t *testing.T) {
ctx := context.Background()
maxBatchSize := 100000

config := Config{
NetworkPassphrase: network.PublicNetworkPassphrase,
Expand All @@ -174,7 +171,7 @@ func TestProcessorRunnerRunHistoryArchiveIngestionProtocolVersionNotSupported(t

mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder)
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize).
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder").
Return(mockAccountSignersBatchInsertBuilder).Once()

mockClaimableBalanceBatchInsertBuilder := &history.MockClaimableBalanceBatchInsertBuilder{}
Expand Down Expand Up @@ -211,13 +208,12 @@ func TestProcessorRunnerRunHistoryArchiveIngestionProtocolVersionNotSupported(t

func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
ctx := context.Background()
maxBatchSize := 100000

q := &mockDBQ{}
defer mock.AssertExpectationsForObjects(t, q)

// Twice = checking ledgerSource and historyArchiveSource
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize).
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder").
Return(&history.MockAccountSignersBatchInsertBuilder{}).Twice()

mockClaimableBalanceBatchInsertBuilder := &history.MockClaimableBalanceBatchInsertBuilder{}
Expand Down Expand Up @@ -467,8 +463,8 @@ func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *t
Return(mockTransactionsBatchInsertBuilder).Twice()

mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{}
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize).
Return(mockAccountSignersBatchInsertBuilder).Once()
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder").
Return(mockAccountSignersBatchInsertBuilder).Twice()

mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{}
q.MockQOperations.On("NewOperationBatchInsertBuilder").
Expand Down Expand Up @@ -501,8 +497,9 @@ func mockBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Cont
Return(mockTransactionsBatchInsertBuilder)

mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{}
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize).
Return(mockAccountSignersBatchInsertBuilder).Once()
mockAccountSignersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder").
Return(mockAccountSignersBatchInsertBuilder).Twice()

mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{}
mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once()
Expand Down
Loading
Loading