-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-37133][table] Support Submitting Refresh Job of Materialized Table to Yarn/K8s #25988
Conversation
4e75a35
to
efec0ab
Compare
d04dd76
to
1a18003
Compare
flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh
Show resolved
Hide resolved
.../java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hackergin Thanks for your contribution, I left some comments.
Due to the e2e test only cover one case, so I think we need to test yarn and k8s manually. We should cover the following cases:
- yarn-application: continuous mode and full mode, create & suspend & resume & drop action
- yarn-session: continuous mode and full mode, create & suspend & resume & drop action
- k8s-application: continuous mode and full mode, create & suspend & resume & drop action
- k8s-session: continuous mode and full mode, create & suspend & resume & drop action
...link-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java
Outdated
Show resolved
Hide resolved
...utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java
Show resolved
Hide resolved
...ateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
Show resolved
Hide resolved
...l-gateway/src/main/java/org/apache/flink/table/gateway/service/application/ScriptRunner.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
Outdated
Show resolved
Hide resolved
flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh
Outdated
Show resolved
Hide resolved
flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh
Show resolved
Hide resolved
flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh
Show resolved
Hide resolved
flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh
Show resolved
Hide resolved
# 2. suspend & resume materialized table in continuous mode | ||
execute_statement $session_handle "alter materialized table my_materialized_table_in_continuous_mode suspend" | ||
|
||
kubectl delete deployment $APPLICATION_CLUSTER_ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to manually delete clusters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my test, I found that after "stop with Savepoint", the deployment does not end. I need to spend some time to verify it again. In theory, it should exit automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After my testing, it seems that the failure of deployment to exit normally is a bug in SqlDriver when splitting sql. When there is no semicolon at the end of the sql statement, it will enter an infinite loop and thus infinitely submit new jobs. Since SqlDriver needs time to fix this bug, currently we can add a semicolon by default when generate statement of the refresh job to avoid this problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems there are some issues with executing DML statements, so I will switch to executing a simple DDL statement instead.
We can make these test cases work as one part of crossing-team test, but we should try our best to start crossing-team test as soon as possible. |
Reviewed by Chi on 16/01/2025 Go back to the submitter with review comments. |
.../java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
Outdated
Show resolved
Hide resolved
Configuration executionConfig, | ||
OperationExecutor operationExecutor, | ||
OperationHandle operationHandle) { | ||
if (isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For minicluster, is it not remote?
.../java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
Outdated
Show resolved
Hide resolved
...link-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java
Show resolved
Hide resolved
191165a
to
bf1ae6d
Compare
public Optional<String> getRestorePath() { | ||
return Optional.ofNullable(restorePath); | ||
} | ||
|
||
@Override | ||
public String asSummaryString() { | ||
return String.format( | ||
"{\njobId=%s,\n executionTarget=%s%s\n}", | ||
"{\n jobId=%s,\n executionTarget=%s,\n clusterId=%s%s\n}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"{\n jobId=%s,\n executionTarget=%s,\n clusterId=%s%s\n}", | |
"{\n executionTarget=%s,\n clusterId=%s,\n jobId=%s%s\n}", |
} | ||
} | ||
|
||
private static @Nullable String getClusterIdKeyName(String targetName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can use Optional<String>
as the return type, it would more better.
…hout trailing semicolon
bf1ae6d
to
095a54f
Compare
...utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java
Show resolved
Hide resolved
@@ -194,6 +194,7 @@ public boolean hasNext() { | |||
return true; | |||
} | |||
} | |||
position = script.length(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I'm not familiar with related code logic, I'm curious why adding this line of code can fix the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The position
keeps track of the current location in the SQL text being parsed. Previously, the logic only updated the position when a semicolon was encountered. However, if there is no semicolon at the end of the text—meaning we’ve finished processing all the text without updating the position
—the next time we attempt to split, it will start from the previous position
again.
'connector' = 'datagen' | ||
); -- This is another comment | ||
|
||
DESCRIBE src |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change the select statement to describe statement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SELECT statement still requires a MiniCluster for execution. Therefore, I referred to other tests in this unit test and only verified DDL-related operations without relying on the MiniCluster.
.../java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
Outdated
Show resolved
Hide resolved
…ion when constructing table paths
…duler when closing without open
5840cd8
to
0bd4f8b
Compare
0bd4f8b
to
b34eeb3
Compare
@flinkbot run azure |
test_ci connect failed because : https://issues.apache.org/jira/browse/FLINK-36290 |
@flinkbot run azure |
@flinkbot run azure |
https://github.com/hackergin/flink/actions/runs/12860642915 github ci has passed |
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
The related tests have been verified manually, and merged it. |
What is the purpose of the change
Support submitting refresh task of materialized table to yarn/k8s
Brief change log
Support submitting refresh task of materialized table to yarn/k8s
Verifying this change
Some e2e test case is added to verify this feature.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation