[NO ISSUE][TX][TEST] Add test for atomic statments

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Add test that stops NCs at random times while upserts are
  taking place on standalone collections.
- Fix the issue with rollback and recovery for atomic statements
- Add timeout while waiting for AtomicJobComplete and
  AtomicJobRollback messages
- Remove datasets without type spec from checpoint thread
- Some refactoring

Change-Id: Ifcaa65690ca99681cc5bebd8f220e5389298d61b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17724
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Murtadha Al Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index f6456dc..b4223bc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.app.cc;
 
+import static org.apache.hyracks.util.ExitUtil.EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -38,8 +40,11 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.util.ExitUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -85,7 +90,8 @@
 
         synchronized (context) {
             try {
-                context.wait();
+                CCConfig config = ((ClusterControllerService) serviceContext.getControllerService()).getCCConfig();
+                context.wait(config.getGlobalTxCommitTimeout());
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw new ACIDException(e);
@@ -194,10 +200,12 @@
         }
         synchronized (context) {
             try {
-                context.wait();
+                CCConfig config = ((ClusterControllerService) serviceContext.getControllerService()).getCCConfig();
+                context.wait(config.getGlobalTxRollbackTimeout());
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
-                throw new ACIDException(e);
+                LOGGER.error("Error while rolling back atomic statement for {}, halting JVM", jobId);
+                ExitUtil.halt(EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT);
             }
         }
         txnContextRepository.remove(jobId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
index 71a641e..e611d8f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.app.message;
 
+import static org.apache.hyracks.util.ExitUtil.EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT;
+
 import java.util.List;
 import java.util.Map;
 
@@ -32,6 +34,9 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.util.ExitUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 /**
  * Message sent from CC to all NCs to rollback an atomic statement/job.
@@ -43,6 +48,8 @@
     private final List<Integer> datasetIds;
     private final Map<String, ILSMComponentId> componentIdMap;
 
+    private static final Logger LOGGER = LogManager.getLogger();
+
     public AtomicJobRollbackMessage(JobId jobId, List<Integer> datasetIds,
             Map<String, ILSMComponentId> componentIdMap) {
         this.jobId = jobId;
@@ -57,15 +64,19 @@
                 datasetLifecycleManager.getIndexCheckpointManagerProvider();
         componentIdMap.forEach((k, v) -> {
             try {
-                IIndexCheckpointManager checkpointManager = indexCheckpointManagerProvider.get(ResourceReference.of(k));
+                IIndexCheckpointManager checkpointManager =
+                        indexCheckpointManagerProvider.get(ResourceReference.ofIndex(k));
                 if (checkpointManager.getCheckpointCount() > 0) {
                     IndexCheckpoint checkpoint = checkpointManager.getLatest();
                     if (checkpoint.getLastComponentId() == v.getMaxId()) {
+                        LOGGER.info("Removing checkpoint for resource {} for component id {}", k,
+                                checkpoint.getLastComponentId());
                         checkpointManager.deleteLatest(v.getMaxId(), 1);
                     }
                 }
-            } catch (HyracksDataException e) {
-                throw new RuntimeException(e);
+            } catch (Exception e) {
+                LOGGER.error("Error while rolling back atomic statement for {}, halting JVM", jobId);
+                ExitUtil.halt(EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT);
             }
         });
         AtomicJobRollbackCompleteMessage message =
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
index 0936512..beaaac0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
@@ -70,6 +70,9 @@
                 deleteInvalidIndex(appContext, localResourceRepository, resource);
             }
         }
+        for (Integer partition : nodePartitions) {
+            localResourceRepository.cleanup(partition);
+        }
         try {
             broker.sendMessageToPrimaryCC(new VoidResponse(reqId, null));
         } catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 5494250..f6eb123 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -61,7 +61,6 @@
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.transactions.TxnId;
-import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
@@ -158,9 +157,6 @@
     @Override
     public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException {
         state = SystemState.RECOVERING;
-        if (appCtx.isCloudDeployment()) {
-            doMetadataRecovery();
-        }
         LOGGER.info("starting recovery for partitions {}", partitions);
         long readableSmallestLSN = logMgr.getReadableSmallestLSN();
         Checkpoint checkpointObject = checkpointManager.getLatest();
@@ -175,11 +171,6 @@
         replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN, true);
     }
 
-    public synchronized void doMetadataRecovery() {
-        LOGGER.info("starting recovery for metadata partition {}", StorageConstants.METADATA_PARTITION);
-        appCtx.getTransactionSubsystem().getTransactionManager().rollbackMetadataTransactionsWithoutWAL();
-    }
-
     public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN,
             boolean closeOnFlushRedo) throws IOException, ACIDException {
         try {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
index 0071ee0..435419b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
@@ -46,6 +46,10 @@
         deleteInvalidMetadataIndexes(localResourceRepository);
         final Set<Integer> nodePartitions = appContext.getReplicaManager().getPartitions();
         localResourceRepository.deleteCorruptedResources();
+        INcApplicationContext appCtx = (INcApplicationContext) cs.getApplicationContext();
+        if (appCtx.isCloudDeployment() && nodePartitions.contains(metadataPartitionId)) {
+            appCtx.getTransactionSubsystem().getTransactionManager().rollbackMetadataTransactionsWithoutWAL();
+        }
         for (Integer partition : nodePartitions) {
             localResourceRepository.cleanup(partition);
         }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 3f76a31..df2c25d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -112,12 +112,12 @@
         LOGGER.info("Starting Global Recovery");
         MetadataManager.INSTANCE.init();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        rollbackIncompleteAtomicTransactions(appCtx);
         if (appCtx.getStorageProperties().isStorageGlobalCleanup()) {
             int storageGlobalCleanupTimeout = appCtx.getStorageProperties().getStorageGlobalCleanupTimeout();
             performGlobalStorageCleanup(mdTxnCtx, storageGlobalCleanupTimeout);
         }
         mdTxnCtx = doRecovery(appCtx, mdTxnCtx);
-        rollbackIncompleteAtomicTransactions(appCtx);
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         recoveryCompleted = true;
         recovering = false;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
index 06380fe..60bcbb5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
@@ -91,6 +91,22 @@
     }
 
     /**
+     * Creates a dataset without type specification
+     * @param dataset The name of the dataset
+     * @param fields The fields composing the primary key
+     * @param pkAutogenerated Is the primary key autogenerated
+     * @throws Exception
+     */
+    public static void createDatasetWithoutType(String dataset, Map<String, String> fields, boolean pkAutogenerated)
+            throws Exception {
+        StringBuilder stringBuilder = new StringBuilder("");
+        fields.forEach((fName, fType) -> stringBuilder.append(fName).append(":").append(fType).append(","));
+        stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+        TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE DATASET " + dataset + " PRIMARY KEY (" + stringBuilder + ")"
+                + (pkAutogenerated ? "AUTOGENERATED;" : ";"), OUTPUT_FORMAT);
+    }
+
+    /**
      * Creates a secondary primary index
      * @param dataset the name of the dataset
      * @param indexName the name of the index
@@ -128,6 +144,22 @@
     }
 
     /**
+     * Creates a single insert statement with multiple records containing name field
+     * @param dataset The name of the dataset
+     * @param count Number of records in the insert statement
+     * @throws Exception
+     */
+    public static void insertBulkData(String dataset, long count) throws Exception {
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < count; i++) {
+            stringBuilder.append("{\"name\": \"name_" + i + "\"},");
+        }
+        stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+        TEST_EXECUTOR.executeSqlppUpdateOrDdl("INSERT INTO " + dataset + "([" + stringBuilder + "]);",
+                TestCaseContext.OutputFormat.CLEAN_JSON);
+    }
+
+    /**
      * Gets the number of records in dataset {@code dataset}
      *
      * @param datasetName
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicMetadataTransactionWithoutWALTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicMetadataTransactionWithoutWALTest.java
new file mode 100644
index 0000000..0d915de
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicMetadataTransactionWithoutWALTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.atomic_statements;
+
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.api.common.LocalCloudUtil;
+import org.apache.asterix.common.TestDataUtil;
+import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class AtomicMetadataTransactionWithoutWALTest {
+    public static final String RESOURCES_PATH = joinPath(System.getProperty("user.dir"), "src", "test", "resources");
+    public static final String CONFIG_FILE = joinPath(RESOURCES_PATH, "cc-cloud-storage.conf");
+    private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+    private static final TestExecutor TEST_EXECUTOR = new TestExecutor();
+    private static final TestCaseContext.OutputFormat OUTPUT_FORMAT = TestCaseContext.OutputFormat.CLEAN_JSON;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private static final String DATASET_NAME_PREFIX = "ds_";
+    private static final int NUM_DATASETS = 500;
+    private static final int NUM_RECOVERIES = 10;
+
+    @Before
+    public void setUp() throws Exception {
+        boolean cleanStart = Boolean.getBoolean("cleanup.start");
+        LocalCloudUtil.startS3CloudEnvironment(cleanStart);
+        integrationUtil.setGracefulShutdown(false);
+        integrationUtil.init(true, CONFIG_FILE);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        integrationUtil.deinit(true);
+    }
+
+    private void createDatasets() throws Exception {
+        String datasetName;
+        for (int i = 0; i < NUM_DATASETS; i++) {
+            datasetName = DATASET_NAME_PREFIX + i;
+            TestDataUtil.createDatasetWithoutType(datasetName, Map.of("id", "uuid"), true);
+        }
+    }
+
+    @Test
+    public void testAtomicityWithFailures() throws Exception {
+        final String leftJoinQuery = "SELECT VALUE COUNT(*) FROM Metadata.`Dataset` ds LEFT JOIN Metadata.`Index` i "
+                + "ON ds.DatasetName=i.DatasetName AND i.IsPrimary=true WHERE ds.DatasetId!=0 AND i.DatasetName IS MISSING;";
+        final String rightJoinQuery = "SELECT VALUE COUNT(*) FROM Metadata.`Dataset` ds RIGHT JOIN Metadata.`Index` i "
+                + "ON ds.DatasetName=i.DatasetName AND i.IsPrimary=true WHERE ds.DatasetId!=0 AND ds.DatasetName IS MISSING;";
+        for (int i = 0; i <= NUM_RECOVERIES; i++) {
+            Thread thread = new Thread(() -> {
+                try {
+                    createDatasets();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            });
+            thread.start();
+            Random rnd = new Random();
+            Thread.sleep(rnd.nextInt(100) + 10);
+            integrationUtil.deinit(true);
+            integrationUtil.init(true, CONFIG_FILE);
+
+            Assert.assertEquals(0, runCountQuery(leftJoinQuery));
+            Assert.assertEquals(0, runCountQuery(rightJoinQuery));
+        }
+    }
+
+    private int runCountQuery(String query) throws Exception {
+        InputStream responseStream = TEST_EXECUTOR.executeQueryService(query,
+                TEST_EXECUTOR.getEndpoint(Servlets.QUERY_SERVICE), OUTPUT_FORMAT, StandardCharsets.UTF_8);
+        ObjectNode response = OBJECT_MAPPER.readValue(responseStream, ObjectNode.class);
+        JsonNode result = response.get("results");
+        Assert.assertEquals(1, result.size());
+        return result.get(0).asInt();
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java
new file mode 100644
index 0000000..5219e011
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.atomic_statements;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.TestDataUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AtomicStatementsTest {
+    private static final String TEST_CONFIG_FILE_NAME = "cc.conf";
+    private static final String TEST_CONFIG_PATH = System.getProperty("user.dir") + File.separator + "src"
+            + File.separator + "main" + File.separator + "resources";
+    private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME;
+    private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
+    private static final String DATASET_NAME_PREFIX = "ds_";
+    private static final int NUM_DATASETS = 5;
+    private static final int BATCH_SIZE = 100;
+    private static final int NUM_UPSERTS = 100;
+    private static final int NUM_RECOVERIES = 10;
+
+    @Before
+    public void setUp() throws Exception {
+        integrationUtil.setGracefulShutdown(false);
+        integrationUtil.init(true, TEST_CONFIG_FILE_PATH);
+        createDatasets();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        integrationUtil.deinit(true);
+    }
+
+    private void createDatasets() throws Exception {
+        String datasetName;
+        for (int i = 0; i < NUM_DATASETS; i++) {
+            datasetName = DATASET_NAME_PREFIX + i;
+            TestDataUtil.createDatasetWithoutType(datasetName, Map.of("id", "uuid"), true);
+            TestDataUtil.createSecondaryBTreeIndex(datasetName, datasetName + "_sidx", "name:string");
+        }
+    }
+
+    private Thread insertRecords(String dataset) {
+        Thread thread = new Thread(() -> {
+            try {
+                for (int i = 0; i < NUM_UPSERTS; i++) {
+                    TestDataUtil.insertBulkData(dataset, BATCH_SIZE);
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+        thread.start();
+        return thread;
+    }
+
+    @Test
+    public void testAtomicityWithFailures() throws Exception {
+        for (int i = 0; i <= NUM_RECOVERIES; i++) {
+            List<Thread> threads = new ArrayList<>();
+            for (int j = 0; j < NUM_DATASETS; j++) {
+                threads.add(insertRecords(DATASET_NAME_PREFIX + j));
+            }
+            Random rnd = new Random();
+            Thread.sleep(rnd.nextInt(2000) + 500);
+            integrationUtil.deinit(false);
+            integrationUtil.init(false, TEST_CONFIG_FILE_PATH);
+
+            for (int j = 0; j < NUM_DATASETS; j++) {
+                final long countAfterRecovery = TestDataUtil.getDatasetCount(DATASET_NAME_PREFIX + j);
+                Assert.assertEquals(0, countAfterRecovery % BATCH_SIZE);
+            }
+        }
+    }
+
+    @Test
+    public void testAtomicityWithoutFailures() throws Exception {
+        List<Thread> threads = new ArrayList<>();
+        for (int j = 0; j < NUM_DATASETS; j++) {
+            threads.add(insertRecords(DATASET_NAME_PREFIX + j));
+            threads.add(insertRecords(DATASET_NAME_PREFIX + j));
+        }
+        for (Thread thread : threads) {
+            thread.join();
+        }
+        for (int j = 0; j < NUM_DATASETS; j++) {
+            long count = TestDataUtil.getDatasetCount(DATASET_NAME_PREFIX + j);
+            Assert.assertEquals(2 * NUM_UPSERTS * BATCH_SIZE, count);
+        }
+
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 7e38b8b..34bc587 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -279,7 +279,7 @@
         for (FlushOperation flush : lastFlushOperation.values()) {
             FileReference target = flush.getTarget();
             Map<String, Object> map = flush.getParameters();
-            final LSMComponentId id = (LSMComponentId) map.get(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID);
+            final LSMComponentId id = (LSMComponentId) map.get(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID);
             final ResourceReference ref = ResourceReference.of(target.getAbsolutePath());
             final long componentSequence = IndexComponentFileReference.of(ref.getName()).getSequenceEnd();
             indexCheckpointManagerProvider.get(ref).flushed(componentSequence, 0L, id.getMaxId());
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index be935d0..014bc5c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -37,6 +37,7 @@
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.external.IDataSourceAdapter;
+import org.apache.asterix.common.ioopcallbacks.AtomicLSMIndexIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMIndexIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMIndexPageWriteCallbackFactory;
 import org.apache.asterix.common.utils.StorageConstants;
@@ -403,8 +404,9 @@
                         : new SecondaryIndexOperationTrackerFactory(datasetId);
         ILSMComponentIdGeneratorFactory idGeneratorProvider = new DatasetLSMComponentIdGeneratorFactory(datasetId);
         DatasetInfoProvider datasetInfoProvider = new DatasetInfoProvider(datasetId);
-        ILSMIOOperationCallbackFactory ioOpCallbackFactory =
-                new LSMIndexIOOperationCallbackFactory(idGeneratorProvider, datasetInfoProvider);
+        ILSMIOOperationCallbackFactory ioOpCallbackFactory = appContext.isCloudDeployment()
+                ? new AtomicLSMIndexIOOperationCallbackFactory(idGeneratorProvider, datasetInfoProvider)
+                : new LSMIndexIOOperationCallbackFactory(idGeneratorProvider, datasetInfoProvider);
         ILSMPageWriteCallbackFactory pageWriteCallbackFactory = new LSMIndexPageWriteCallbackFactory();
 
         IStorageComponentProvider storageComponentProvider = appContext.getStorageComponentProvider();
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index f09248f..460f393 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -139,7 +139,8 @@
         return lsmIndex -> {
             if (lsmIndex.isPrimaryIndex()) {
                 PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) lsmIndex.getOperationTracker();
-                return currentTime - opTracker.getLastFlushTime() >= datasetCheckpointIntervalNanos;
+                return !lsmIndex.isAtomic()
+                        && currentTime - opTracker.getLastFlushTime() >= datasetCheckpointIntervalNanos;
             }
             return false;
         };
@@ -148,7 +149,7 @@
     private Predicate<ILSMIndex> newLaggingDatasetPredicate(long checkpointTargetLSN) {
         return lsmIndex -> {
             final LSMIOOperationCallback ioCallback = (LSMIOOperationCallback) lsmIndex.getIOOperationCallback();
-            return ioCallback.getPersistenceLsn() < checkpointTargetLSN;
+            return !lsmIndex.isAtomic() && ioCallback.getPersistenceLsn() < checkpointTargetLSN;
         };
     }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
index aa698be..151d9ef 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.transaction.management.service.transaction;
 
+import static org.apache.hyracks.util.ExitUtil.EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT;
+
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -44,14 +46,19 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import org.apache.hyracks.util.ExitUtil;
 import org.apache.hyracks.util.annotations.ThreadSafe;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 @ThreadSafe
 public class AtomicNoWALTransactionContext extends AtomicTransactionContext {
 
+    private static final Logger LOGGER = LogManager.getLogger();
     private final INcApplicationContext appCtx;
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
@@ -111,9 +118,13 @@
         }
         try {
             commit();
-        } catch (Exception e) {
-            rollback(resourceMap);
-            throw new ACIDException(e);
+        } catch (HyracksDataException e) {
+            try {
+                rollback(resourceMap);
+            } catch (Exception ex) {
+                LOGGER.error("Error while rolling back atomic statement for {}, halting JVM", txnId);
+                ExitUtil.halt(EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT);
+            }
         } finally {
             deleteLogFile();
         }
@@ -122,17 +133,25 @@
 
     private void persistLogFile(List<Integer> datasetIds, Map<String, ILSMComponentId> resourceMap)
             throws HyracksDataException, JsonProcessingException {
-        IIOManager ioManager = appCtx.getIoManager();
+        IIOManager ioManager = appCtx.getPersistenceIoManager();
         FileReference fref = ioManager.resolve(Paths.get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
                 StorageConstants.PARTITION_DIR_PREFIX + StorageConstants.METADATA_PARTITION,
                 String.format("%s.log", txnId)).toString());
-        MetadataAtomicTransactionLog txnLog = new MetadataAtomicTransactionLog(txnId, datasetIds,
-                appCtx.getServiceContext().getNodeId(), resourceMap);
-        ioManager.overwrite(fref, OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(txnLog).getBytes());
+        ioManager.overwrite(fref, OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
+                .writeValueAsString(toJson(datasetIds, resourceMap)).getBytes());
+    }
+
+    private ObjectNode toJson(List<Integer> datasetIds, Map<String, ILSMComponentId> resourceMap) {
+        ObjectNode jsonNode = OBJECT_MAPPER.createObjectNode();
+        jsonNode.put("txnId", txnId.getId());
+        jsonNode.putPOJO("datasetIds", datasetIds);
+        jsonNode.put("nodeId", appCtx.getServiceContext().getNodeId());
+        jsonNode.putPOJO("resourceMap", resourceMap);
+        return jsonNode;
     }
 
     public void deleteLogFile() {
-        IIOManager ioManager = appCtx.getIoManager();
+        IIOManager ioManager = appCtx.getPersistenceIoManager();
         try {
             FileReference fref = ioManager.resolve(Paths.get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
                     StorageConstants.PARTITION_DIR_PREFIX + StorageConstants.METADATA_PARTITION,
@@ -172,10 +191,13 @@
                 datasetLifecycleManager.getIndexCheckpointManagerProvider();
         resourceMap.forEach((k, v) -> {
             try {
-                IIndexCheckpointManager checkpointManager = indexCheckpointManagerProvider.get(ResourceReference.of(k));
+                IIndexCheckpointManager checkpointManager =
+                        indexCheckpointManagerProvider.get(ResourceReference.ofIndex(k));
                 if (checkpointManager.getCheckpointCount() > 0) {
                     IndexCheckpoint checkpoint = checkpointManager.getLatest();
                     if (checkpoint.getLastComponentId() == v.getMaxId()) {
+                        LOGGER.info("Removing checkpoint for resource {} for component id {}", k,
+                                checkpoint.getLastComponentId());
                         checkpointManager.deleteLatest(v.getMaxId(), 1);
                     }
                 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java
deleted file mode 100644
index 7b3af3f..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.transaction;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.transactions.TxnId;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class MetadataAtomicTransactionLog {
-
-    private TxnId txnId;
-    private List<Integer> datasetIds;
-    private String nodeId;
-    private Map<String, ILSMComponentId> resourceMap;
-
-    @JsonCreator
-    public MetadataAtomicTransactionLog(@JsonProperty("txnId") TxnId txnId,
-            @JsonProperty("datasetIds") List<Integer> datasetIds, @JsonProperty("nodeId") String nodeId,
-            @JsonProperty("resourceMap") Map<String, ILSMComponentId> resourceMap) {
-        this.txnId = txnId;
-        this.datasetIds = datasetIds;
-        this.nodeId = nodeId;
-        this.resourceMap = resourceMap;
-    }
-
-    public TxnId getTxnId() {
-        return txnId;
-    }
-
-    public List<Integer> getDatasetIds() {
-        return datasetIds;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public Map<String, ILSMComponentId> getResourceMap() {
-        return resourceMap;
-    }
-}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index ea7fcb8..4c7f53b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -23,6 +23,8 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -41,12 +43,16 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 @ThreadSafe
 public class TransactionManager implements ITransactionManager, ILifeCycleComponent {
@@ -196,7 +202,7 @@
 
     @Override
     public void rollbackMetadataTransactionsWithoutWAL() {
-        IIOManager ioManager = txnSubsystem.getApplicationContext().getIoManager();
+        IIOManager ioManager = txnSubsystem.getApplicationContext().getPersistenceIoManager();
         try {
             Set<FileReference> txnLogFileRefs =
                     ioManager.list(ioManager.resolve(Paths
@@ -205,15 +211,30 @@
                             .toString()));
             ObjectMapper objectMapper = new ObjectMapper();
             for (FileReference txnLogFileRef : txnLogFileRefs) {
-                MetadataAtomicTransactionLog atomicTransactionLog = objectMapper.readValue(
-                        new String(ioManager.readAllBytes(txnLogFileRef)), MetadataAtomicTransactionLog.class);
-                AtomicNoWALTransactionContext context = new AtomicNoWALTransactionContext(
-                        atomicTransactionLog.getTxnId(), txnSubsystem.getApplicationContext());
-                context.rollback(atomicTransactionLog.getResourceMap());
+                ObjectNode atomicTransactionLog =
+                        objectMapper.readValue(new String(ioManager.readAllBytes(txnLogFileRef)), ObjectNode.class);
+                TxnId txnId = new TxnId(atomicTransactionLog.get("txnId").asInt());
+                JsonNode jsonNode = atomicTransactionLog.get("resourceMap");
+                Map<String, ILSMComponentId> resourceMap = getResourceMapFromJson(jsonNode);
+                AtomicNoWALTransactionContext context =
+                        new AtomicNoWALTransactionContext(txnId, txnSubsystem.getApplicationContext());
+                context.rollback(resourceMap);
                 context.deleteLogFile();
             }
         } catch (Exception e) {
             throw new ACIDException(e);
         }
     }
+
+    private Map<String, ILSMComponentId> getResourceMapFromJson(JsonNode jsonNode) {
+        Map<String, ILSMComponentId> resourceMap = new HashMap<>();
+        for (Iterator<String> it = jsonNode.fieldNames(); it.hasNext();) {
+            String resourcePath = it.next();
+            JsonNode componentIdNode = jsonNode.get(resourcePath);
+            ILSMComponentId componentId =
+                    new LSMComponentId(componentIdNode.get("minId").asLong(), componentIdNode.get("maxId").asLong());
+            resourceMap.put(resourcePath, componentId);
+        }
+        return resourceMap;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index 6904e3c..18825da 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -89,7 +89,9 @@
                 STRING,
                 appConfig -> FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR),
                         "global-txn-log"),
-                ControllerConfig.Option.DEFAULT_DIR.cmdline() + "/global-txn-log");
+                ControllerConfig.Option.DEFAULT_DIR.cmdline() + "/global-txn-log"),
+        GLOBAL_TXN_COMMIT_TIMEOUT(LONG, 600000L),
+        GLOBAL_TXN_ROLLBACK_TIMEOUT(LONG, 600000L);
 
         private final IOptionType parser;
         private Object defaultValue;
@@ -213,6 +215,10 @@
                     return "Path to HTTP basic credentials";
                 case GLOBAL_TXN_LOG_DIR:
                     return "Directory to store global transaction logs";
+                case GLOBAL_TXN_COMMIT_TIMEOUT:
+                    return "Timeout for Commit";
+                case GLOBAL_TXN_ROLLBACK_TIMEOUT:
+                    return "Timeout for Rollback";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -492,4 +498,11 @@
         return getAppConfig().getString(Option.GLOBAL_TXN_LOG_DIR);
     }
 
+    public long getGlobalTxCommitTimeout() {
+        return getAppConfig().getLong(Option.GLOBAL_TXN_COMMIT_TIMEOUT);
+    }
+
+    public long getGlobalTxRollbackTimeout() {
+        return getAppConfig().getLong(Option.GLOBAL_TXN_ROLLBACK_TIMEOUT);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index 8f8e8f6..0f03562 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -58,6 +58,7 @@
     public static final int EC_ACTIVE_RECOVERY_FAILURE = 20;
     public static final int EC_FAILED_TO_CANCEL_ACTIVE_START_STOP = 22;
     public static final int EC_INCONSISTENT_STORAGE_REFERENCES = 23;
+    public static final int EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT = 24;
     public static final int EC_IMMEDIATE_HALT = 33;
     public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44;
     public static final int EC_IO_SCHEDULER_FAILED = 55;