Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jan 2, 2025
1 parent aa13148 commit 16bafea
Showing 1 changed file with 15 additions and 13 deletions.
28 changes: 15 additions & 13 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
const (
GooglePathToCredentialsEnvKey = "GOOGLE_APPLICATION_CREDENTIALS"
// Storage Write API is limited to 10 MiB, subtract 400 KiB to account for request overhead.
maxRequestByteSize = (10 * 1024 * 1024) - (400 * 1024)
maxRequestByteSize = (1 * 1024 * 1024) - (400 * 1024)
)

type Store struct {
Expand Down Expand Up @@ -103,7 +103,8 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati
}

if s.auditRows {
var actualRowCounts uint64
var stagingTableRowsCount uint64
expectedRowCount := uint64(len(tableData.Rows()))
// The streaming metadata does not appear right away, we'll wait up to 5s for it to appear.
for i := 0; i < 10; i++ {
time.Sleep(500 * time.Millisecond)
Expand All @@ -112,23 +113,21 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati
return fmt.Errorf("failed to get %q metadata: %w", tempTableID.FullyQualifiedName(), err)
}

actualRowCounts = resp.NumRows
if resp.StreamingBuffer != nil {
actualRowCounts += resp.StreamingBuffer.EstimatedRows
if stagingTableRowsCount == 0 {
stagingTableRowsCount = resp.NumRows
}

if actualRowCounts > 0 {
break
if resp.StreamingBuffer != nil {
stagingTableRowsCount += resp.StreamingBuffer.EstimatedRows
}
}

expectedRowCount := uint64(len(tableData.Rows()))
// [resp.NumRows] could be higher since AppendRows is at least once delivery.
if actualRowCounts >= expectedRowCount {
return nil
// [stagingTableRowsCount] could be higher since AppendRows is at least once delivery.
if stagingTableRowsCount >= expectedRowCount {
return nil
}
}

return fmt.Errorf("temporary table row count mismatch, expected: %d, got: %d", expectedRowCount, actualRowCounts)
return fmt.Errorf("temporary table row count mismatch, expected: %d, got: %d", expectedRowCount, stagingTableRowsCount)
}

return nil
Expand Down Expand Up @@ -216,7 +215,10 @@ func (s *Store) putTable(ctx context.Context, bqTableID dialect.TableIdentifier,
return bytes, nil
}

var count int
return batch.BySize(tableData.Rows(), maxRequestByteSize, false, encoder, func(chunk [][]byte) error {
count += 1
fmt.Println(fmt.Sprintf("Count %d", count))

Check failure on line 221 in clients/bigquery/bigquery.go

View workflow job for this annotation

GitHub Actions / test

should use fmt.Printf instead of fmt.Println(fmt.Sprintf(...)) (but don't forget the newline) (S1038)
result, err := managedStream.AppendRows(ctx, chunk)
if err != nil {
return fmt.Errorf("failed to append rows: %w", err)
Expand Down

0 comments on commit 16bafea

Please sign in to comment.