[ASTERIXDB-3557][TX] ignore and log failure to read atomic txn log files
- user model changes: no
- storage format changes: no
- interface changes: no
Ext-ref: MB-65039
Change-Id: I5a2e3849cb6fe4a78e0499fb2591f7e734908d04
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19372
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index a5c9867..d509ba6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -186,9 +186,25 @@
public void rollback() throws Exception {
Set<FileReference> txnLogFileRefs = ioManager.list(ioManager.resolve(StorageConstants.GLOBAL_TXN_DIR_NAME));
for (FileReference txnLogFileRef : txnLogFileRefs) {
- IGlobalTransactionContext context = new GlobalTransactionContext(txnLogFileRef, ioManager);
- txnContextRepository.put(context.getJobId(), context);
- sendJobRollbackMessages(context);
+ try {
+ IGlobalTransactionContext context = new GlobalTransactionContext(txnLogFileRef, ioManager);
+ txnContextRepository.put(context.getJobId(), context);
+ sendJobRollbackMessages(context);
+ } catch (Exception e) {
+ LOGGER.error("Error rolling back transaction for {}", txnLogFileRef, e);
+ cleanup(txnLogFileRef);
+ }
+ }
+ }
+
+ private void cleanup(FileReference resourceFile) {
+ if (resourceFile.getFile().exists()) {
+ try {
+ ioManager.delete(resourceFile);
+ } catch (Throwable th) {
+ LOGGER.error("Error cleaning up corrupted resource {}", resourceFile, th);
+ ExitUtil.halt(ExitUtil.EC_FAILED_TO_DELETE_CORRUPTED_RESOURCES);
+ }
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index 4c7f53b..3663e83 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -45,6 +45,7 @@
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.annotations.ThreadSafe;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -61,6 +62,7 @@
private final ITransactionSubsystem txnSubsystem;
private final Map<TxnId, ITransactionContext> txnCtxRepository = new ConcurrentHashMap<>();
private final AtomicLong maxTxnId = new AtomicLong(0);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public TransactionManager(ITransactionSubsystem provider) {
this.txnSubsystem = provider;
@@ -209,23 +211,39 @@
.get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
StorageConstants.PARTITION_DIR_PREFIX + StorageConstants.METADATA_PARTITION)
.toString()));
- ObjectMapper objectMapper = new ObjectMapper();
for (FileReference txnLogFileRef : txnLogFileRefs) {
- ObjectNode atomicTransactionLog =
- objectMapper.readValue(new String(ioManager.readAllBytes(txnLogFileRef)), ObjectNode.class);
- TxnId txnId = new TxnId(atomicTransactionLog.get("txnId").asInt());
- JsonNode jsonNode = atomicTransactionLog.get("resourceMap");
- Map<String, ILSMComponentId> resourceMap = getResourceMapFromJson(jsonNode);
- AtomicNoWALTransactionContext context =
- new AtomicNoWALTransactionContext(txnId, txnSubsystem.getApplicationContext());
- context.rollback(resourceMap);
- context.deleteLogFile();
+ try {
+ ObjectNode atomicTransactionLog = OBJECT_MAPPER
+ .readValue(new String(ioManager.readAllBytes(txnLogFileRef)), ObjectNode.class);
+ TxnId txnId = new TxnId(atomicTransactionLog.get("txnId").asInt());
+ JsonNode jsonNode = atomicTransactionLog.get("resourceMap");
+ Map<String, ILSMComponentId> resourceMap = getResourceMapFromJson(jsonNode);
+ AtomicNoWALTransactionContext context =
+ new AtomicNoWALTransactionContext(txnId, txnSubsystem.getApplicationContext());
+ context.rollback(resourceMap);
+ context.deleteLogFile();
+ } catch (Exception e) {
+ LOGGER.error("Error rolling back atomic statement for {}", txnLogFileRef, e);
+ cleanup(txnLogFileRef);
+ }
}
} catch (Exception e) {
throw new ACIDException(e);
}
}
+ private void cleanup(FileReference resourceFile) {
+ IIOManager ioManager = txnSubsystem.getApplicationContext().getPersistenceIoManager();
+ if (resourceFile.getFile().exists()) {
+ try {
+ ioManager.delete(resourceFile);
+ } catch (Throwable th) {
+ LOGGER.error("Error cleaning up corrupted resource {}", resourceFile, th);
+ ExitUtil.halt(ExitUtil.EC_FAILED_TO_DELETE_CORRUPTED_RESOURCES);
+ }
+ }
+ }
+
private Map<String, ILSMComponentId> getResourceMapFromJson(JsonNode jsonNode) {
Map<String, ILSMComponentId> resourceMap = new HashMap<>();
for (Iterator<String> it = jsonNode.fieldNames(); it.hasNext();) {