Skip to content

Commit

Permalink
chore(flink): clean up memtable support to use hooks and finalization
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Sep 23, 2024
1 parent c899267 commit 1e7904c
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,19 +369,22 @@ def compile(
expr, params=params, pretty=pretty
) # Discard `limit` and other kwargs.

def execute(self, expr: ir.Expr, **kwargs: Any) -> Any:
"""Execute an expression."""
self._register_udfs(expr)

table_expr = expr.as_table()

if null_columns := table_expr.schema().null_fields:
def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
if null_columns := op.schema.null_fields:
raise exc.IbisTypeError(

Check warning on line 374 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L374

Added line #L374 was not covered by tests
f"{self.name} cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
)
self.create_view(op.name, op.data.to_frame(), temp=True)

Check warning on line 378 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L378

Added line #L378 was not covered by tests

def _finalize_memtable(self, name: str) -> None:
self.drop_view(name, temp=True, force=True)

Check warning on line 381 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L381

Added line #L381 was not covered by tests

def execute(self, expr: ir.Expr, **kwargs: Any) -> Any:
"""Execute an expression."""
self._run_pre_execute_hooks(expr)

Check warning on line 385 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L385

Added line #L385 was not covered by tests

sql = self.compile(table_expr, **kwargs)
sql = self.compile(expr.as_table(), **kwargs)

Check warning on line 387 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L387

Added line #L387 was not covered by tests
df = self._table_env.sql_query(sql).to_pandas()

return expr.__pandas_result__(df)
Expand Down

0 comments on commit 1e7904c

Please sign in to comment.