Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](hive) sync DDL command to other FE #46326

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@
import org.apache.doris.persist.BackendTabletsInfo;
import org.apache.doris.persist.BinlogGcInfo;
import org.apache.doris.persist.CleanQueryStatsInfo;
import org.apache.doris.persist.CreateDbInfo;
import org.apache.doris.persist.CreateTableInfo;
import org.apache.doris.persist.DropDbInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.EditLog;
import org.apache.doris.persist.GlobalVarPersistInfo;
Expand Down Expand Up @@ -3256,6 +3259,17 @@ public void replayCreateDb(Database db) {
getInternalCatalog().replayCreateDb(db, "");
}

public void replayNewCreateDb(CreateDbInfo info) {
if (info.getCtlName().equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
getInternalCatalog().replayCreateDb(info.getInternalDb(), "");
} else {
ExternalCatalog externalCatalog = (ExternalCatalog) catalogMgr.getCatalog(info.getCtlName());
if (externalCatalog != null) {
externalCatalog.replayCreateDb(info.getDbName());
}
}
}

public void dropDb(DropDbStmt stmt) throws DdlException {
CatalogIf<?> catalogIf;
if (StringUtils.isEmpty(stmt.getCtlName())) {
Expand All @@ -3266,8 +3280,15 @@ public void dropDb(DropDbStmt stmt) throws DdlException {
catalogIf.dropDb(stmt);
}

public void replayDropDb(String dbName, boolean isForceDrop, Long recycleTime) throws DdlException {
getInternalCatalog().replayDropDb(dbName, isForceDrop, recycleTime);
public void replayDropDb(DropDbInfo info) throws DdlException {
if (Strings.isNullOrEmpty(info.getCtlName())) {
getInternalCatalog().replayDropDb(info.getDbName(), info.isForceDrop(), info.getRecycleTime());
} else {
ExternalCatalog externalCatalog = (ExternalCatalog) catalogMgr.getCatalog(info.getCtlName());
if (externalCatalog != null) {
externalCatalog.replayDropDb(info.getDbName());
}
}
}

public void recoverDatabase(RecoverDbStmt recoverStmt) throws DdlException {
Expand Down Expand Up @@ -4146,8 +4167,15 @@ public static void getDdlStmt(DdlStmt ddlStmt, String dbName, TableIf table, Lis
}
}

public void replayCreateTable(String dbName, Table table) throws MetaNotFoundException {
getInternalCatalog().replayCreateTable(dbName, table);
public void replayCreateTable(CreateTableInfo info) throws MetaNotFoundException {
if (Strings.isNullOrEmpty(info.getCtlName())) {
getInternalCatalog().replayCreateTable(info.getDbName(), info.getTable());
} else {
ExternalCatalog externalCatalog = (ExternalCatalog) catalogMgr.getCatalog(info.getCtlName());
if (externalCatalog != null) {
externalCatalog.replayCreateTable(info.getDbName(), info.getTblName());
}
}
}

public void replayAlterExternalTableSchema(String dbName, String tableName, List<Column> newSchema)
Expand Down Expand Up @@ -5851,7 +5879,16 @@ public void truncateTable(TruncateTableStmt stmt) throws DdlException {
}

public void replayTruncateTable(TruncateTableInfo info) throws MetaNotFoundException {
getInternalCatalog().replayTruncateTable(info);
if (Strings.isNullOrEmpty(info.getCtl())) {
// In previous versions(before 2.1.8), there is no catalog info in TruncateTableInfo,
// So if the catalog info is empty, we assume it's internal table.
getInternalCatalog().replayTruncateTable(info);
} else {
ExternalCatalog ctl = (ExternalCatalog) catalogMgr.getCatalog(info.getCtl());
if (ctl != null) {
ctl.replayTruncateTable(info);
}
}
}

public void createFunction(CreateFunctionStmt stmt) throws UserException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
import org.apache.doris.datasource.test.TestExternalDatabase;
import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalDatabase;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.persist.CreateDbInfo;
import org.apache.doris.persist.CreateTableInfo;
import org.apache.doris.persist.DropDbInfo;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.TruncateTableInfo;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -926,12 +931,20 @@ public void createDb(CreateDbStmt stmt) throws DdlException {
}
try {
metadataOps.createDb(stmt);
CreateDbInfo info = new CreateDbInfo(getName(), stmt.getFullDbName(), null);
Env.getCurrentEnv().getEditLog().logNewCreateDb(info);
} catch (Exception e) {
LOG.warn("Failed to create a database.", e);
LOG.warn("Failed to create database {} in catalog {}.", stmt.getFullDbName(), getName(), e);
throw e;
}
}

public void replayCreateDb(String dbName) {
if (metadataOps != null) {
metadataOps.afterCreateDb(dbName);
}
}

@Override
public void dropDb(DropDbStmt stmt) throws DdlException {
makeSureInitialized();
Expand All @@ -941,12 +954,20 @@ public void dropDb(DropDbStmt stmt) throws DdlException {
}
try {
metadataOps.dropDb(stmt);
DropDbInfo info = new DropDbInfo(getName(), stmt.getDbName());
Env.getCurrentEnv().getEditLog().logDropDb(info);
} catch (Exception e) {
LOG.warn("Failed to drop a database.", e);
LOG.warn("Failed to drop database {} in catalog {}", stmt.getDbName(), getName(), e);
throw e;
}
}

public void replayDropDb(String dbName) {
if (metadataOps != null) {
metadataOps.afterDropDb(dbName);
}
}

@Override
public boolean createTable(CreateTableStmt stmt) throws UserException {
makeSureInitialized();
Expand All @@ -955,13 +976,25 @@ public boolean createTable(CreateTableStmt stmt) throws UserException {
return false;
}
try {
return metadataOps.createTable(stmt);
boolean res = metadataOps.createTable(stmt);
if (!res) {
// res == false means the table does not exist before, and we create it.
CreateTableInfo info = new CreateTableInfo(getName(), stmt.getDbName(), stmt.getTableName());
Env.getCurrentEnv().getEditLog().logCreateTable(info);
}
return res;
} catch (Exception e) {
LOG.warn("Failed to create a table.", e);
throw e;
}
}

public void replayCreateTable(String dbName, String tblName) {
if (metadataOps != null) {
metadataOps.afterCreateTable(dbName, tblName);
}
}

@Override
public void dropTable(DropTableStmt stmt) throws DdlException {
makeSureInitialized();
Expand All @@ -971,12 +1004,20 @@ public void dropTable(DropTableStmt stmt) throws DdlException {
}
try {
metadataOps.dropTable(stmt);
DropInfo info = new DropInfo(getName(), stmt.getDbName(), stmt.getTableName());
Env.getCurrentEnv().getEditLog().logDropTable(info);
} catch (Exception e) {
LOG.warn("Failed to drop a table", e);
throw e;
}
}

public void replayDropTable(String dbName, String tblName) {
if (metadataOps != null) {
metadataOps.afterDropTable(dbName, tblName);
}
}

public void unregisterDatabase(String dbName) {
throw new NotImplementedException("unregisterDatabase not implemented");
}
Expand Down Expand Up @@ -1076,11 +1117,23 @@ public void truncateTable(TruncateTableStmt stmt) throws DdlException {
}
metadataOps.truncateTable(tableName.getDb(), tableName.getTbl(), partitions);
} catch (Exception e) {
LOG.warn("Failed to drop a table", e);
LOG.warn("Failed to truncate table {}.{} in catalog {}", stmt.getTblRef().getName().getDb(),
stmt.getTblRef().getName().getTbl(), getName(), e);
throw e;
}
}

public void replayTruncateTable(TruncateTableInfo info) {
if (metadataOps != null) {
try {
metadataOps.truncateTable(info.getDb(), info.getTable(), info.getExtPartNames());
} catch (DdlException e) {
LOG.warn("Failed to replay truncate table {}.{} in catalog {}", info.getDb(), info.getTable(),
getName(), e);
}
}
}

public String getQualifiedName(String dbName) {
return String.join(".", name, dbName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.persist.TruncateTableInfo;
import org.apache.doris.qe.ConnectContext;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -106,7 +107,7 @@ private static HMSCachedClient createCachedClient(HiveConf hiveConf, int thriftC
}

@Override
public void createDb(CreateDbStmt stmt) throws DdlException {
public void createDbImpl(CreateDbStmt stmt) throws DdlException {
String fullDbName = stmt.getFullDbName();
Map<String, String> properties = stmt.getProperties();
long dbId = Env.getCurrentEnv().getNextId();
Expand All @@ -129,15 +130,19 @@ public void createDb(CreateDbStmt stmt) throws DdlException {
catalogDatabase.setProperties(properties);
catalogDatabase.setComment(properties.getOrDefault("comment", ""));
client.createDatabase(catalogDatabase);
catalog.onRefreshCache(true);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
LOG.info("createDb dbName = " + fullDbName + ", id = " + dbId);
}

@Override
public void dropDb(DropDbStmt stmt) throws DdlException {
public void afterCreateDb(String dbName) {
catalog.onRefreshCache(true);
}

@Override
public void dropDbImpl(DropDbStmt stmt) throws DdlException {
String dbName = stmt.getDbName();
if (!databaseExist(dbName)) {
if (stmt.isSetIfExists()) {
Expand All @@ -149,14 +154,18 @@ public void dropDb(DropDbStmt stmt) throws DdlException {
}
try {
client.dropDatabase(dbName);
catalog.onRefreshCache(true);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

@Override
public boolean createTable(CreateTableStmt stmt) throws UserException {
public void afterDropDb(String dbName) {
catalog.onRefreshCache(true);
}

@Override
public boolean createTableImpl(CreateTableStmt stmt) throws UserException {
String dbName = stmt.getDbName();
String tblName = stmt.getTableName();
ExternalDatabase<?> db = catalog.getDbNullable(dbName);
Expand Down Expand Up @@ -245,15 +254,22 @@ public boolean createTable(CreateTableStmt stmt) throws UserException {
comment);
}
client.createTable(hiveTableMeta, stmt.isSetIfNotExists());
db.setUnInitialized(true);
} catch (Exception e) {
throw new UserException(e.getMessage(), e);
}
return false;
}

@Override
public void dropTable(DropTableStmt stmt) throws DdlException {
public void afterCreateTable(String dbName, String tblName) {
ExternalDatabase<?> db = catalog.getDbNullable(dbName);
if (db != null) {
db.setUnInitialized(true);
}
}

@Override
public void dropTableImpl(DropTableStmt stmt) throws DdlException {
String dbName = stmt.getDbName();
String tblName = stmt.getTableName();
ExternalDatabase<?> db = catalog.getDbNullable(stmt.getDbName());
Expand All @@ -275,14 +291,22 @@ public void dropTable(DropTableStmt stmt) throws DdlException {
}
try {
client.dropTable(dbName, tblName);
db.setUnInitialized(true);
} catch (Exception e) {
throw new DdlException(e.getMessage(), e);
}
}

@Override
public void truncateTable(String dbName, String tblName, List<String> partitions) throws DdlException {
public void afterDropTable(String dbName, String tblName) {
ExternalDatabase<?> db = catalog.getDbNullable(dbName);
if (db != null) {
db.setUnInitialized(true);
}
}

@Override
public void truncateTableImpl(String dbName, String tblName, List<String> partitions)
throws DdlException {
ExternalDatabase<?> db = catalog.getDbNullable(dbName);
if (db == null) {
throw new DdlException("Failed to get database: '" + dbName + "' in catalog: " + catalog.getName());
Expand All @@ -292,9 +316,19 @@ public void truncateTable(String dbName, String tblName, List<String> partitions
} catch (Exception e) {
throw new DdlException(e.getMessage(), e);
}
TruncateTableInfo info = new TruncateTableInfo(catalog.getName(), dbName, tblName, partitions);
Env.getCurrentEnv().getEditLog().logTruncateTable(info);
}

@Override
public void afterTruncateTable(String dbName, String tblName) {
// Invalidate cache.
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), dbName, tblName);
db.setLastUpdateTime(System.currentTimeMillis());
db.setUnInitialized(true);
ExternalDatabase<?> db = catalog.getDbNullable(dbName);
if (db != null) {
db.setLastUpdateTime(System.currentTimeMillis());
db.setUnInitialized(true);
}
}

@Override
Expand Down
Loading
Loading