Skip to content

Commit

Permalink
Clean.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jan 2, 2025
1 parent b953ac2 commit fd69216
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package bigquery

import (
"cloud.google.com/go/bigquery"
"cloud.google.com/go/bigquery/storage/managedwriter"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
"context"
"fmt"
_ "github.com/viant/bigquery"
"google.golang.org/api/option"
"google.golang.org/protobuf/proto"
"log/slog"
"os"
"strconv"
"strings"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/bigquery/storage/managedwriter"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
_ "github.com/viant/bigquery"
"google.golang.org/api/option"
"google.golang.org/protobuf/proto"

"github.com/artie-labs/transfer/clients/bigquery/dialect"
"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/batch"
Expand Down Expand Up @@ -102,21 +103,21 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati
}

if s.auditRows {
return s.auditStagingTable(bqTempTableID, tableData)
return s.auditStagingTable(ctx, bqTempTableID, tableData)
}

return nil
}

func (s *Store) auditStagingTable(bqTempTableID dialect.TableIdentifier, tableData *optimization.TableData) error {
func (s *Store) auditStagingTable(ctx context.Context, bqTempTableID dialect.TableIdentifier, tableData *optimization.TableData) error {
var stagingTableRowsCount uint64
expectedRowCount := uint64(len(tableData.Rows()))
// The streaming metadata does not appear right away, we'll wait up to 5s for it to appear.
for i := 0; i < 10; i++ {
time.Sleep(500 * time.Millisecond)
resp, err := s.bqClient.Dataset(bqTempTableID.Dataset()).Table(bqTempTableID.Table()).Metadata(ctx)
if err != nil {
return fmt.Errorf("failed to get %q metadata: %w", tempTableID.FullyQualifiedName(), err)
return fmt.Errorf("failed to get %q metadata: %w", bqTempTableID.FullyQualifiedName(), err)
}

if stagingTableRowsCount == 0 {
Expand Down

0 comments on commit fd69216

Please sign in to comment.