From 1e2f4e93cf408b1718448fdba828a6a7962a7637 Mon Sep 17 00:00:00 2001 From: Narendra Rajput Date: Mon, 9 Dec 2024 14:30:26 +0530 Subject: [PATCH 1/2] Error Handling for Cassandra --- .../templates/transforms/SourceWriterFn.java | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFn.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFn.java index f4af6d9780..a719a17b27 100644 --- a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFn.java +++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFn.java @@ -15,9 +15,13 @@ */ package com.google.cloud.teleport.v2.templates.transforms; +import com.datastax.oss.driver.api.core.DriverException; +import com.datastax.oss.driver.api.core.DriverExecutionException; +import com.datastax.oss.driver.api.core.DriverTimeoutException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.nosan.embedded.cassandra.CassandraException; import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.SpannerException; import com.google.cloud.teleport.v2.spanner.ddl.Ddl; @@ -52,6 +56,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -224,12 +229,37 @@ public void processElement(ProcessContext c) { outputWithTag(c, Constants.PERMANENT_ERROR_TAG, ex.getMessage(), spannerRec); } } catch (Exception ex) { + handleExceptions(ex, c, spannerRec); LOG.error("Failed to write to source", ex); outputWithTag(c, Constants.PERMANENT_ERROR_TAG, ex.getMessage(), spannerRec); } } } + private void handleExceptions(Exception ex, ProcessContext c, TrimmedShardedDataChangeRecord spannerRec) { + if (ex instanceof DriverException) { + LOG.error("Cassandra-related exception occurred: ", ex); + if (isRetryableCassandraError((com.datastax.driver.core.exceptions.DriverException) ex)) { + outputWithTag(c, Constants.RETRYABLE_ERROR_TAG, ex.getMessage(), spannerRec); + retryableRecordCountMetric.inc(); + } else { + outputWithTag(c, Constants.PERMANENT_ERROR_TAG, ex.getMessage(), spannerRec); + } + } else { + LOG.error("Unexpected exception while writing to source", ex); + outputWithTag(c, Constants.PERMANENT_ERROR_TAG, ex.getMessage(), spannerRec); + } + } + + private boolean isRetryableCassandraError(com.datastax.driver.core.exceptions.DriverException ex) { + return ex instanceof com.datastax.driver.core.exceptions.NoHostAvailableException || + ex instanceof com.datastax.driver.core.exceptions.UnavailableException || + ex instanceof com.datastax.driver.core.exceptions.ReadTimeoutException || + ex instanceof com.datastax.driver.core.exceptions.WriteTimeoutException || + ex instanceof com.datastax.driver.core.exceptions.OverloadedException || + ex instanceof com.datastax.driver.core.exceptions.OperationTimedOutException; + } + private Mutation getShadowTableMutation( String tableName, String shadowTableName, From b82568151c01b2897d7db8788120813abe201ea5 Mon Sep 17 00:00:00 2001 From: Narendra Rajput Date: Mon, 9 Dec 2024 15:15:30 +0530 Subject: [PATCH 2/2] Added error handling changes for cassandra --- .../cloud/teleport/v2/templates/transforms/SourceWriterFn.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFn.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFn.java index a719a17b27..ff65eb36c2 100644 --- a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFn.java +++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFn.java @@ -231,7 +231,7 @@ public void processElement(ProcessContext c) { } catch (Exception ex) { handleExceptions(ex, c, spannerRec); LOG.error("Failed to write to source", ex); - outputWithTag(c, Constants.PERMANENT_ERROR_TAG, ex.getMessage(), spannerRec); +// outputWithTag(c, Constants.PERMANENT_ERROR_TAG, ex.getMessage(), spannerRec); } } }