[ASTERIXDB-3557][TX] ignore and log failure to read atomic txn log files

- user model changes: no
- storage format changes: no
- interface changes: no

Ext-ref: MB-65039

Change-Id: I5a2e3849cb6fe4a78e0499fb2591f7e734908d04
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19372
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index a5c9867..d509ba6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -186,9 +186,25 @@
     public void rollback() throws Exception {
         Set<FileReference> txnLogFileRefs = ioManager.list(ioManager.resolve(StorageConstants.GLOBAL_TXN_DIR_NAME));
         for (FileReference txnLogFileRef : txnLogFileRefs) {
-            IGlobalTransactionContext context = new GlobalTransactionContext(txnLogFileRef, ioManager);
-            txnContextRepository.put(context.getJobId(), context);
-            sendJobRollbackMessages(context);
+            try {
+                IGlobalTransactionContext context = new GlobalTransactionContext(txnLogFileRef, ioManager);
+                txnContextRepository.put(context.getJobId(), context);
+                sendJobRollbackMessages(context);
+            } catch (Exception e) {
+                LOGGER.error("Error rolling back transaction for {}", txnLogFileRef, e);
+                cleanup(txnLogFileRef);
+            }
+        }
+    }
+
+    private void cleanup(FileReference resourceFile) {
+        if (resourceFile.getFile().exists()) {
+            try {
+                ioManager.delete(resourceFile);
+            } catch (Throwable th) {
+                LOGGER.error("Error cleaning up corrupted resource {}", resourceFile, th);
+                ExitUtil.halt(ExitUtil.EC_FAILED_TO_DELETE_CORRUPTED_RESOURCES);
+            }
         }
     }
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index 4c7f53b..3663e83 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -45,6 +45,7 @@
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.hyracks.util.ExitUtil;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
@@ -61,6 +62,7 @@
     private final ITransactionSubsystem txnSubsystem;
     private final Map<TxnId, ITransactionContext> txnCtxRepository = new ConcurrentHashMap<>();
     private final AtomicLong maxTxnId = new AtomicLong(0);
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
     public TransactionManager(ITransactionSubsystem provider) {
         this.txnSubsystem = provider;
@@ -209,23 +211,39 @@
                             .get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
                                     StorageConstants.PARTITION_DIR_PREFIX + StorageConstants.METADATA_PARTITION)
                             .toString()));
-            ObjectMapper objectMapper = new ObjectMapper();
             for (FileReference txnLogFileRef : txnLogFileRefs) {
-                ObjectNode atomicTransactionLog =
-                        objectMapper.readValue(new String(ioManager.readAllBytes(txnLogFileRef)), ObjectNode.class);
-                TxnId txnId = new TxnId(atomicTransactionLog.get("txnId").asInt());
-                JsonNode jsonNode = atomicTransactionLog.get("resourceMap");
-                Map<String, ILSMComponentId> resourceMap = getResourceMapFromJson(jsonNode);
-                AtomicNoWALTransactionContext context =
-                        new AtomicNoWALTransactionContext(txnId, txnSubsystem.getApplicationContext());
-                context.rollback(resourceMap);
-                context.deleteLogFile();
+                try {
+                    ObjectNode atomicTransactionLog = OBJECT_MAPPER
+                            .readValue(new String(ioManager.readAllBytes(txnLogFileRef)), ObjectNode.class);
+                    TxnId txnId = new TxnId(atomicTransactionLog.get("txnId").asInt());
+                    JsonNode jsonNode = atomicTransactionLog.get("resourceMap");
+                    Map<String, ILSMComponentId> resourceMap = getResourceMapFromJson(jsonNode);
+                    AtomicNoWALTransactionContext context =
+                            new AtomicNoWALTransactionContext(txnId, txnSubsystem.getApplicationContext());
+                    context.rollback(resourceMap);
+                    context.deleteLogFile();
+                } catch (Exception e) {
+                    LOGGER.error("Error rolling back atomic statement for {}", txnLogFileRef, e);
+                    cleanup(txnLogFileRef);
+                }
             }
         } catch (Exception e) {
             throw new ACIDException(e);
         }
     }
 
+    private void cleanup(FileReference resourceFile) {
+        IIOManager ioManager = txnSubsystem.getApplicationContext().getPersistenceIoManager();
+        if (resourceFile.getFile().exists()) {
+            try {
+                ioManager.delete(resourceFile);
+            } catch (Throwable th) {
+                LOGGER.error("Error cleaning up corrupted resource {}", resourceFile, th);
+                ExitUtil.halt(ExitUtil.EC_FAILED_TO_DELETE_CORRUPTED_RESOURCES);
+            }
+        }
+    }
+
     private Map<String, ILSMComponentId> getResourceMapFromJson(JsonNode jsonNode) {
         Map<String, ILSMComponentId> resourceMap = new HashMap<>();
         for (Iterator<String> it = jsonNode.fieldNames(); it.hasNext();) {