Skip to content

Commit

Permalink
Update.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 29, 2024
1 parent a8ad83f commit 906c223
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 4 deletions.
3 changes: 1 addition & 2 deletions clients/databricks/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/ext"
)

type DatabricksDialect struct{}
Expand Down
5 changes: 3 additions & 2 deletions clients/databricks/store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package databricks

import (
"github.com/artie-labs/transfer/clients/databricks/dialect"
"github.com/artie-labs/transfer/clients/shared"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/db"
Expand All @@ -24,11 +25,11 @@ func (s Store) Append(tableData *optimization.TableData, useTempTable bool) erro
}

func (s Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sql.TableIdentifier {
//return NewTableIdentifier(topicConfig.Schema, table)
return NewTableIdentifier(topicConfig.Database, topicConfig.Schema, table)
}

func (s Store) Dialect() sql.Dialect {
//return dialect.DatabricksDialect{}
return dialect.DatabricksDialect{}
}

func (s Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, includeArtieUpdatedAt bool) error {
Expand Down
48 changes: 48 additions & 0 deletions clients/databricks/tableid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package databricks

import (
"fmt"

"github.com/artie-labs/transfer/clients/databricks/dialect"
"github.com/artie-labs/transfer/lib/sql"
)

var _dialect = dialect.DatabricksDialect{}

type TableIdentifier struct {
database string
schema string
table string
}

func NewTableIdentifier(database, schema, table string) TableIdentifier {
return TableIdentifier{
database: database,
schema: schema,
table: table,
}
}

func (ti TableIdentifier) Database() string {
return ti.database
}

func (ti TableIdentifier) Schema() string {
return ti.schema
}

func (ti TableIdentifier) EscapedTable() string {
return _dialect.QuoteIdentifier(ti.table)
}

func (ti TableIdentifier) Table() string {
return ti.table
}

func (ti TableIdentifier) WithTable(table string) sql.TableIdentifier {
return NewTableIdentifier(ti.database, ti.schema, table)
}

func (ti TableIdentifier) FullyQualifiedName() string {
return fmt.Sprintf("%s.%s.%s", ti.database, ti.schema, ti.EscapedTable())
}

0 comments on commit 906c223

Please sign in to comment.