[NO ISSUE][TX]: Statement level atomicity for inserts/upserts/deletes

- user model changes: yes
- storage format changes: no
- interface changes: yes

Details:
With this change we introduce statement level atomicity for
inserts/upserts/deletes. The statement level atomicity is
only enabled for datasets created  without any type specification.

Change-Id: I3b4aefeba07be921d128255393aec1b703198a40
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17598
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Murtadha Al Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
index 0b8024b..bb7be73 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
@@ -35,6 +35,7 @@
 import org.apache.asterix.common.api.IRequestTracker;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.cluster.IGlobalTxManager;
 import org.apache.asterix.common.config.ActiveProperties;
 import org.apache.asterix.common.config.BuildProperties;
 import org.apache.asterix.common.config.CloudProperties;
@@ -74,6 +75,7 @@
 import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.api.result.IResultSet;
 import org.apache.hyracks.client.result.ResultSet;
+import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.ipc.impl.HyracksConnection;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.util.NetworkUtil;
@@ -119,6 +121,8 @@
     private final IAdapterFactoryService adapterFactoryService;
     private final ReentrantReadWriteLock compilationLock = new ReentrantReadWriteLock(true);
     private final IDataPartitioningProvider dataPartitioningProvider;
+    private final IGlobalTxManager globalTxManager;
+    private final IOManager ioManager;
 
     public CcApplicationContext(ICCServiceContext ccServiceCtx, HyracksConnection hcc,
             Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
@@ -126,7 +130,8 @@
             IStorageComponentProvider storageComponentProvider, IMetadataLockManager mdLockManager,
             IMetadataLockUtil mdLockUtil, IReceptionistFactory receptionistFactory,
             IConfigValidatorFactory configValidatorFactory, Object extensionManager,
-            IAdapterFactoryService adapterFactoryService) throws AlgebricksException, IOException {
+            IAdapterFactoryService adapterFactoryService, IGlobalTxManager globalTxManager, IOManager ioManager,
+            CloudProperties cloudProperties) throws AlgebricksException, IOException {
         this.ccServiceCtx = ccServiceCtx;
         this.hcc = hcc;
         this.activeLifeCycleListener = activeLifeCycleListener;
@@ -142,7 +147,7 @@
         activeProperties = new ActiveProperties(propertiesAccessor);
         extensionProperties = new ExtensionProperties(propertiesAccessor);
         replicationProperties = new ReplicationProperties(propertiesAccessor);
-        cloudProperties = new CloudProperties(propertiesAccessor);
+        this.cloudProperties = cloudProperties;
         this.ftStrategy = ftStrategy;
         this.buildProperties = new BuildProperties(propertiesAccessor);
         this.messagingProperties = new MessagingProperties(propertiesAccessor);
@@ -163,6 +168,8 @@
         configValidator = configValidatorFactory.create();
         this.adapterFactoryService = adapterFactoryService;
         dataPartitioningProvider = DataPartitioningProvider.create(this);
+        this.globalTxManager = globalTxManager;
+        this.ioManager = ioManager;
     }
 
     @Override
@@ -381,4 +388,14 @@
     public CloudProperties getCloudProperties() {
         return cloudProperties;
     }
+
+    @Override
+    public IGlobalTxManager getGlobalTxManager() {
+        return globalTxManager;
+    }
+
+    @Override
+    public IOManager getIoManager() {
+        return ioManager;
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
new file mode 100644
index 0000000..beb55d8
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.cc;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.asterix.app.message.AtomicJobCommitMessage;
+import org.apache.asterix.app.message.AtomicJobRollbackMessage;
+import org.apache.asterix.app.message.EnableMergeMessage;
+import org.apache.asterix.common.cluster.IGlobalTxManager;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.transactions.IGlobalTransactionContext;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.transaction.management.service.transaction.GlobalTransactionContext;
+import org.apache.asterix.transaction.management.service.transaction.GlobalTxInfo;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class GlobalTxManager implements IGlobalTxManager {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final Map<JobId, IGlobalTransactionContext> txnContextRepository = new ConcurrentHashMap<>();
+    private final ICCServiceContext serviceContext;
+    private final IOManager ioManager;
+    public static final String GlOBAL_TX_PROPERTY_NAME = "GlobalTxProperty";
+
+    public GlobalTxManager(ICCServiceContext serviceContext, IOManager ioManager) {
+        this.serviceContext = serviceContext;
+        this.ioManager = ioManager;
+    }
+
+    @Override
+    public IGlobalTransactionContext beginTransaction(JobId jobId, int numParticipatingNodes,
+            int numParticipatingPartitions, List<Integer> participatingDatasetIds) throws ACIDException {
+        GlobalTransactionContext context = new GlobalTransactionContext(jobId, participatingDatasetIds,
+                numParticipatingNodes, numParticipatingPartitions);
+        txnContextRepository.put(jobId, context);
+        return context;
+    }
+
+    @Override
+    public void commitTransaction(JobId jobId) throws ACIDException {
+        IGlobalTransactionContext context = getTransactionContext(jobId);
+        if (context.getTxnStatus() == TransactionStatus.ACTIVE
+                || context.getTxnStatus() == TransactionStatus.PREPARED) {
+            synchronized (context) {
+                try {
+                    context.wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new ACIDException(e);
+                }
+            }
+        }
+        txnContextRepository.remove(jobId);
+    }
+
+    @Override
+    public IGlobalTransactionContext getTransactionContext(JobId jobId) throws ACIDException {
+        IGlobalTransactionContext context = txnContextRepository.get(jobId);
+        if (context == null) {
+            throw new ACIDException("Transaction for jobId " + jobId + " does not exist");
+        }
+        return context;
+    }
+
+    @Override
+    public void handleJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
+            Map<String, ILSMComponentId> componentIdMap) {
+        IGlobalTransactionContext context = getTransactionContext(jobId);
+        if (context.getNodeResourceMap().containsKey(nodeId)) {
+            context.getNodeResourceMap().get(nodeId).putAll(componentIdMap);
+        } else {
+            context.getNodeResourceMap().put(nodeId, componentIdMap);
+        }
+        if (context.incrementAndGetAcksReceived() == context.getNumPartitions()) {
+            context.setTxnStatus(TransactionStatus.PREPARED);
+            context.persist(ioManager);
+            context.resetAcksReceived();
+            sendJobCommitMessages(context);
+        }
+    }
+
+    private void sendJobCommitMessages(IGlobalTransactionContext context) {
+        for (String nodeId : context.getNodeResourceMap().keySet()) {
+            AtomicJobCommitMessage message = new AtomicJobCommitMessage(context.getJobId(), context.getDatasetIds());
+            try {
+                ((ICCMessageBroker) serviceContext.getMessageBroker()).sendRealTimeApplicationMessageToNC(message,
+                        nodeId);
+            } catch (Exception e) {
+                throw new ACIDException(e);
+            }
+        }
+    }
+
+    @Override
+    public void handleJobCompletionMessage(JobId jobId, String nodeId) {
+        IGlobalTransactionContext context = getTransactionContext(jobId);
+        if (context.incrementAndGetAcksReceived() == context.getNumNodes()) {
+            context.delete(ioManager);
+            context.setTxnStatus(TransactionStatus.COMMITTED);
+            sendEnableMergeMessages(context);
+            synchronized (context) {
+                context.notifyAll();
+            }
+        }
+    }
+
+    @Override
+    public void handleJobRollbackCompletionMessage(JobId jobId, String nodeId) {
+        IGlobalTransactionContext context = getTransactionContext(jobId);
+        if (context.incrementAndGetAcksReceived() == context.getNumNodes()) {
+            context.setTxnStatus(TransactionStatus.ROLLBACK);
+            context.delete(ioManager);
+            synchronized (context) {
+                context.notifyAll();
+            }
+        }
+    }
+
+    private void sendEnableMergeMessages(IGlobalTransactionContext context) {
+        for (String nodeId : context.getNodeResourceMap().keySet()) {
+            for (Integer datasetId : context.getDatasetIds()) {
+                EnableMergeMessage message = new EnableMergeMessage(context.getJobId(), datasetId);
+                try {
+                    ((ICCMessageBroker) serviceContext.getMessageBroker()).sendRealTimeApplicationMessageToNC(message,
+                            nodeId);
+                } catch (Exception e) {
+                    throw new ACIDException(e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void rollback() throws Exception {
+        Set<FileReference> txnLogFileRefs = ioManager.list(ioManager.resolve(StorageConstants.CC_TX_LOG_DIR));
+        for (FileReference txnLogFileRef : txnLogFileRefs) {
+            IGlobalTransactionContext context = new GlobalTransactionContext(txnLogFileRef, ioManager);
+            txnContextRepository.put(context.getJobId(), context);
+            sendJobRollbackMessages(context);
+        }
+    }
+
+    private void sendJobRollbackMessages(IGlobalTransactionContext context) throws Exception {
+        JobId jobId = context.getJobId();
+        for (String nodeId : context.getNodeResourceMap().keySet()) {
+            AtomicJobRollbackMessage rollbackMessage = new AtomicJobRollbackMessage(jobId, context.getDatasetIds(),
+                    context.getNodeResourceMap().get(nodeId));
+            ((ICCMessageBroker) serviceContext.getMessageBroker()).sendRealTimeApplicationMessageToNC(rollbackMessage,
+                    nodeId);
+        }
+        synchronized (context) {
+            try {
+                context.wait();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new ACIDException(e);
+            }
+        }
+        txnContextRepository.remove(jobId);
+    }
+
+    @Override
+    public void abortTransaction(JobId jobId) throws Exception {
+        IGlobalTransactionContext context = getTransactionContext(jobId);
+        if (context.getTxnStatus() == TransactionStatus.PREPARED) {
+            sendJobRollbackMessages(context);
+        }
+        txnContextRepository.remove(jobId);
+    }
+
+    @Override
+    public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+        GlobalTxInfo globalTxInfo = (GlobalTxInfo) spec.getProperty(GlOBAL_TX_PROPERTY_NAME);
+        if (globalTxInfo != null) {
+            beginTransaction(jobId, globalTxInfo.getNumNodes(), globalTxInfo.getNumPartitions(),
+                    globalTxInfo.getDatasetIds());
+        }
+    }
+
+    @Override
+    public void notifyJobStart(JobId jobId) throws HyracksException {
+
+    }
+
+    @Override
+    public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
new file mode 100644
index 0000000..55ae225
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.List;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Message sent from CC to all NCs asking to commit an atomic statement/job.
+ */
+public class AtomicJobCommitMessage implements INcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final JobId jobId;
+    private final List<Integer> datasetIds;
+
+    public AtomicJobCommitMessage(JobId jobId, List<Integer> datasetIds) {
+        this.jobId = jobId;
+        this.datasetIds = datasetIds;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
+        for (Integer datasetId : datasetIds) {
+            for (IndexInfo indexInfo : datasetLifecycleManager.getDatasetInfo(datasetId).getIndexes().values()) {
+                if (indexInfo.getIndex().isPrimaryIndex()) {
+                    ((PrimaryIndexOperationTracker) indexInfo.getIndex().getOperationTracker()).commit();
+                }
+            }
+        }
+        AtomicJobCompletionMessage message =
+                new AtomicJobCompletionMessage(jobId, appCtx.getServiceContext().getNodeId());
+        NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        try {
+            mb.sendRealTimeMessageToCC(jobId.getCcId(), message);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCompletionMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCompletionMessage.java
new file mode 100644
index 0000000..869e5d2
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCompletionMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Message sent from NC to CC on successful local commit of an atomic statement/job.
+ */
+public class AtomicJobCompletionMessage implements ICcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private final JobId jobId;
+
+    public AtomicJobCompletionMessage(JobId jobId, String nodeId) {
+        this.jobId = jobId;
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        appCtx.getGlobalTxManager().handleJobCompletionMessage(jobId, nodeId);
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackCompleteMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackCompleteMessage.java
new file mode 100644
index 0000000..90e1fbd
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackCompleteMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Message sent from an NC to CC after successful local rollback of an atomic statement/job.
+ */
+public class AtomicJobRollbackCompleteMessage implements ICcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private final JobId jobId;
+
+    public AtomicJobRollbackCompleteMessage(JobId jobId, String nodeId) {
+        this.jobId = jobId;
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        appCtx.getGlobalTxManager().handleJobRollbackCompletionMessage(jobId, nodeId);
+    }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
new file mode 100644
index 0000000..aeb6be2
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.IndexCheckpoint;
+import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Message sent from CC to all NCs to rollback an atomic statement/job.
+ */
+public class AtomicJobRollbackMessage implements INcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final JobId jobId;
+    private final List<Integer> datasetIds;
+    private final Map<String, ILSMComponentId> componentIdMap;
+
+    public AtomicJobRollbackMessage(JobId jobId, List<Integer> datasetIds,
+            Map<String, ILSMComponentId> componentIdMap) {
+        this.jobId = jobId;
+        this.datasetIds = datasetIds;
+        this.componentIdMap = componentIdMap;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
+        IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
+                datasetLifecycleManager.getIndexCheckpointManagerProvider();
+        componentIdMap.forEach((k, v) -> {
+            try {
+                IIndexCheckpointManager checkpointManager = indexCheckpointManagerProvider.get(ResourceReference.of(k));
+                if (checkpointManager.getCheckpointCount() > 0) {
+                    IndexCheckpoint checkpoint = checkpointManager.getLatest();
+                    if (checkpoint.getLastComponentId() == v.getMaxId()) {
+                        checkpointManager.deleteLatest(v.getMaxId(), 1);
+                    }
+                }
+            } catch (HyracksDataException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        AtomicJobRollbackCompleteMessage message =
+                new AtomicJobRollbackCompleteMessage(jobId, appCtx.getServiceContext().getNodeId());
+        NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        try {
+            mb.sendRealTimeMessageToPrimaryCC(message);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/EnableMergeMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/EnableMergeMessage.java
new file mode 100644
index 0000000..900da82
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/EnableMergeMessage.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class EnableMergeMessage implements INcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final JobId jobId;
+    private final int datasetId;
+
+    public EnableMergeMessage(JobId jobId, int datasetId) {
+        this.jobId = jobId;
+        this.datasetId = datasetId;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
+        for (IndexInfo indexInfo : datasetLifecycleManager.getDatasetInfo(datasetId).getIndexes().values()) {
+            if (indexInfo.getIndex().isPrimaryIndex()) {
+                indexInfo.getIndex().getMergePolicy().diskComponentAdded(indexInfo.getIndex(), false);;
+            }
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 372cf69..9383f06 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -227,6 +227,22 @@
         return IndexCheckpoint.fromJson(new String(ioManager.readAllBytes(checkpointPath)));
     }
 
+    @Override
+    public void deleteLatest(long latestId, int historyToDelete) {
+        try {
+            final Collection<FileReference> checkpointFiles = ioManager.list(indexPath, CHECKPOINT_FILE_FILTER);
+            if (!checkpointFiles.isEmpty()) {
+                for (FileReference checkpointFile : checkpointFiles) {
+                    if (getCheckpointIdFromFileName(checkpointFile) > (latestId - historyToDelete)) {
+                        ioManager.delete(checkpointFile);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.warn(() -> "Couldn't delete history checkpoints at " + indexPath, e);
+        }
+    }
+
     private void deleteHistory(long latestId, int historyToKeep) {
         try {
             final Collection<FileReference> checkpointFiles = ioManager.list(indexPath, CHECKPOINT_FILE_FILTER);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 28a6d2d..dc165a0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -55,6 +55,7 @@
 import org.apache.asterix.app.active.ActiveEntityEventsListener;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.active.FeedEventsListener;
+import org.apache.asterix.app.cc.GlobalTxManager;
 import org.apache.asterix.app.external.ExternalLibraryJobUtils;
 import org.apache.asterix.app.result.ExecutionError;
 import org.apache.asterix.app.result.ResultHandle;
@@ -69,6 +70,7 @@
 import org.apache.asterix.common.api.IRequestTracker;
 import org.apache.asterix.common.api.IResponsePrinter;
 import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.cluster.IGlobalTxManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.config.DatasetConfig.TransactionState;
@@ -209,6 +211,7 @@
 import org.apache.asterix.runtime.fulltext.StopwordsFullTextFilterDescriptor;
 import org.apache.asterix.runtime.operators.DatasetStreamStats;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
+import org.apache.asterix.transaction.management.service.transaction.GlobalTxInfo;
 import org.apache.asterix.translator.AbstractLangTranslator;
 import org.apache.asterix.translator.ClientRequest;
 import org.apache.asterix.translator.CompiledStatements.CompiledCopyFromFileStatement;
@@ -291,6 +294,7 @@
     protected final IResponsePrinter responsePrinter;
     protected final WarningCollector warningCollector;
     protected final ReentrantReadWriteLock compilationLock;
+    protected final IGlobalTxManager globalTxManager;
 
     public QueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
             ILangCompilationProvider compilationProvider, ExecutorService executorService,
@@ -313,6 +317,7 @@
         if (appCtx.getServiceContext().getAppConfig().getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)) {
             this.jobFlags.add(JobFlag.ENFORCE_CONTRACT);
         }
+        this.globalTxManager = appCtx.getGlobalTxManager();
     }
 
     public SessionOutput getSessionOutput() {
@@ -3481,6 +3486,8 @@
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
+        JobId jobId = null;
+        boolean atomic = false;
         try {
             metadataProvider.setWriteTransaction(true);
             Dataset dataset = metadataProvider.findDataset(dataverseName, copyStmt.getDatasetName());
@@ -3503,9 +3510,32 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             if (spec != null && !isCompileOnly()) {
-                runJob(hcc, spec);
+                atomic = dataset.isAtomic();
+                if (atomic) {
+                    int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(spec).size();
+                    int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(spec);
+                    List<Integer> participatingDatasetIds = new ArrayList<>();
+                    participatingDatasetIds.add(dataset.getDatasetId());
+                    spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(participatingDatasetIds,
+                            numParticipatingNodes, numParticipatingPartitions));
+                }
+                jobId = JobUtils.runJob(hcc, spec, jobFlags, false);
+
+                String nameBefore = Thread.currentThread().getName();
+                try {
+                    Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
+                    hcc.waitForCompletion(jobId);
+                } finally {
+                    Thread.currentThread().setName(nameBefore);
+                }
+                if (atomic) {
+                    globalTxManager.commitTransaction(jobId);
+                }
             }
         } catch (Exception e) {
+            if (atomic && jobId != null) {
+                globalTxManager.abortTransaction(jobId);
+            }
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
             }
@@ -3555,18 +3585,45 @@
                 throw e;
             }
         };
-
         if (stmtInsertUpsert.getReturnExpression() != null) {
             deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
-                    requestParameters, false);
+                    requestParameters, false, stmt);
         } else {
             locker.lock();
+            JobId jobId = null;
+            boolean atomic = false;
             try {
                 final JobSpecification jobSpec = compiler.compile();
                 if (jobSpec == null) {
                     return jobSpec;
                 }
-                runJob(hcc, jobSpec);
+                Dataset ds = metadataProvider.findDataset(((InsertStatement) stmt).getDataverseName(),
+                        ((InsertStatement) stmt).getDatasetName());
+                atomic = ds.isAtomic();
+                if (atomic) {
+                    int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size();
+                    int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec);
+                    List<Integer> participatingDatasetIds = new ArrayList<>();
+                    participatingDatasetIds.add(ds.getDatasetId());
+                    jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
+                            participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions));
+                }
+                jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+                String nameBefore = Thread.currentThread().getName();
+                try {
+                    Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
+                    hcc.waitForCompletion(jobId);
+                } finally {
+                    Thread.currentThread().setName(nameBefore);
+                }
+                if (atomic) {
+                    globalTxManager.commitTransaction(jobId);
+                }
+            } catch (Exception e) {
+                if (atomic && jobId != null) {
+                    globalTxManager.abortTransaction(jobId);
+                }
+                throw e;
             } finally {
                 locker.unlock();
             }
@@ -3586,6 +3643,8 @@
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
+        boolean atomic = false;
+        JobId jobId = null;
         try {
             metadataProvider.setWriteTransaction(true);
             CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
@@ -3597,12 +3656,34 @@
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-
             if (jobSpec != null && !isCompileOnly()) {
-                runJob(hcc, jobSpec);
+                Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
+                atomic = ds.isAtomic();
+                if (atomic) {
+                    int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size();
+                    int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec);
+                    List<Integer> participatingDatasetIds = new ArrayList<>();
+                    participatingDatasetIds.add(ds.getDatasetId());
+                    jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
+                            participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions));
+                }
+                jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+                String nameBefore = Thread.currentThread().getName();
+                try {
+                    Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
+                    hcc.waitForCompletion(jobId);
+                } finally {
+                    Thread.currentThread().setName(nameBefore);
+                }
+                if (atomic) {
+                    globalTxManager.commitTransaction(jobId);
+                }
             }
             return jobSpec;
         } catch (Exception e) {
+            if (atomic && jobId != null) {
+                globalTxManager.abortTransaction(jobId);
+            }
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
             }
@@ -4557,19 +4638,19 @@
             }
         };
         deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
-                requestParameters, true);
+                requestParameters, true, null);
     }
 
     private void deliverResult(IHyracksClientConnection hcc, IResultSet resultSet, IStatementCompiler compiler,
             MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery,
-            ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable)
-            throws Exception {
+            ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable,
+            Statement atomicStmt) throws Exception {
         final ResultSetId resultSetId = metadataProvider.getResultSetId();
         switch (resultDelivery) {
             case ASYNC:
                 MutableBoolean printed = new MutableBoolean(false);
                 executorService.submit(() -> asyncCreateAndRunJob(hcc, compiler, locker, resultDelivery,
-                        requestParameters, cancellable, resultSetId, printed, metadataProvider));
+                        requestParameters, cancellable, resultSetId, printed, metadataProvider, atomicStmt));
                 synchronized (printed) {
                     while (!printed.booleanValue()) {
                         printed.wait();
@@ -4583,7 +4664,7 @@
                     responsePrinter.addResultPrinter(new ResultsPrinter(appCtx, resultReader,
                             metadataProvider.findOutputRecordType(), stats, sessionOutput));
                     responsePrinter.printResults();
-                }, requestParameters, cancellable, appCtx, metadataProvider);
+                }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt);
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
@@ -4595,7 +4676,7 @@
                         outMetadata.getResultSets().add(org.apache.commons.lang3.tuple.Triple.of(id, resultSetId,
                                 metadataProvider.findOutputRecordType()));
                     }
-                }, requestParameters, cancellable, appCtx, metadataProvider);
+                }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt);
                 break;
             default:
                 break;
@@ -4618,7 +4699,7 @@
 
     private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker,
             ResultDelivery resultDelivery, IRequestParameters requestParameters, boolean cancellable,
-            ResultSetId resultSetId, MutableBoolean printed, MetadataProvider metadataProvider) {
+            ResultSetId resultSetId, MutableBoolean printed, MetadataProvider metadataProvider, Statement atomicStmt) {
         Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
         try {
             createAndRunJob(hcc, jobFlags, jobId, compiler, locker, resultDelivery, id -> {
@@ -4630,7 +4711,7 @@
                     printed.setTrue();
                     printed.notify();
                 }
-            }, requestParameters, cancellable, appCtx, metadataProvider);
+            }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt);
         } catch (Exception e) {
             if (Objects.equals(JobId.INVALID, jobId.getValue())) {
                 // compilation failed
@@ -4670,10 +4751,10 @@
         return p.second;
     }
 
-    private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
+    private void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
             IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer,
             IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx,
-            MetadataProvider metadataProvider) throws Exception {
+            MetadataProvider metadataProvider, Statement atomicStatement) throws Exception {
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
         final ClientRequest clientRequest =
                 (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
@@ -4681,6 +4762,8 @@
             clientRequest.markCancellable();
         }
         locker.lock();
+        JobId jobId = null;
+        boolean atomic = false;
         try {
             final JobSpecification jobSpec = compiler.compile();
             if (jobSpec == null) {
@@ -4691,7 +4774,20 @@
             appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
             // ensure request not cancelled before running job
             ensureNotCancelled(clientRequest);
-            final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+            if (atomicStatement != null) {
+                Dataset ds = metadataProvider.findDataset(((InsertStatement) atomicStatement).getDataverseName(),
+                        ((InsertStatement) atomicStatement).getDatasetName());
+                atomic = ds.isAtomic();
+                if (atomic) {
+                    int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size();
+                    int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec);
+                    List<Integer> participatingDatasetIds = new ArrayList<>();
+                    participatingDatasetIds.add(ds.getDatasetId());
+                    jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
+                            participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions));
+                }
+            }
+            jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
             clientRequest.setJobId(jobId);
             if (jId != null) {
                 jId.setValue(jobId);
@@ -4704,7 +4800,13 @@
                 ensureNotCancelled(clientRequest);
                 printer.print(jobId);
             }
+            if (atomic) {
+                globalTxManager.commitTransaction(jobId);
+            }
         } catch (Exception e) {
+            if (atomic && jobId != null) {
+                globalTxManager.abortTransaction(jobId);
+            }
             if (org.apache.hyracks.api.util.ExceptionUtils.getRootCause(e) instanceof InterruptedException) {
                 Thread.currentThread().interrupt();
                 throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, clientRequest.getId());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 2a66cfd..0e740f7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -23,6 +23,7 @@
 import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
 import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.SHUTTING_DOWN;
+import static org.apache.hyracks.control.common.controllers.ControllerConfig.Option.CLOUD_DEPLOYMENT;
 
 import java.io.File;
 import java.io.IOException;
@@ -54,16 +55,20 @@
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.cc.CCExtensionManager;
 import org.apache.asterix.app.cc.CcApplicationContext;
+import org.apache.asterix.app.cc.GlobalTxManager;
 import org.apache.asterix.app.config.ConfigValidator;
 import org.apache.asterix.app.io.PersistedResourceRegistry;
 import org.apache.asterix.app.replication.NcLifecycleCoordinator;
 import org.apache.asterix.app.result.JobResultCallback;
+import org.apache.asterix.cloud.CloudManagerProvider;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IConfigValidatorFactory;
 import org.apache.asterix.common.api.INodeJobTracker;
 import org.apache.asterix.common.api.IReceptionistFactory;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.cluster.IGlobalTxManager;
 import org.apache.asterix.common.config.AsterixExtension;
+import org.apache.asterix.common.config.CloudProperties;
 import org.apache.asterix.common.config.ExtensionProperties;
 import org.apache.asterix.common.config.ExternalProperties;
 import org.apache.asterix.common.config.GlobalConfig;
@@ -77,6 +82,7 @@
 import org.apache.asterix.common.metadata.IMetadataLockUtil;
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
 import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.external.adapter.factory.AdapterFactoryService;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.messaging.CCMessageBroker;
@@ -99,12 +105,15 @@
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.control.IGatekeeper;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.result.IJobResultCallback;
 import org.apache.hyracks.control.cc.BaseCCApplication;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.nc.io.DefaultDeviceResolver;
+import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.http.api.IServlet;
 import org.apache.hyracks.http.server.HttpServer;
 import org.apache.hyracks.http.server.HttpServerConfig;
@@ -158,8 +167,19 @@
         componentProvider = new StorageComponentProvider();
         ccExtensionManager = new CCExtensionManager(new ArrayList<>(getExtensions()));
         IGlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager();
+
+        List<IODeviceHandle> devices = new ArrayList<>();
+        devices.add(new IODeviceHandle(new File(StorageConstants.CC_STORAGE_ROOT_DIR), "."));
+        IOManager ioManager = new IOManager(devices, new DefaultDeviceResolver(), 1, 10);
+        CloudProperties cloudProperties = null;
+        if (ccServiceCtx.getAppConfig().getBoolean(CLOUD_DEPLOYMENT)) {
+            cloudProperties = new CloudProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
+            ioManager = (IOManager) CloudManagerProvider.createIOManager(cloudProperties, ioManager);;
+        }
+        IGlobalTxManager globalTxManager = createGlobalTxManager(ioManager);
         appCtx = createApplicationContext(null, globalRecoveryManager, lifecycleCoordinator, Receptionist::new,
-                ConfigValidator::new, ccExtensionManager, new AdapterFactoryService());
+                ConfigValidator::new, ccExtensionManager, new AdapterFactoryService(), globalTxManager, ioManager,
+                cloudProperties);
         final CCConfig ccConfig = controllerService.getCCConfig();
         if (System.getProperty("java.rmi.server.hostname") == null) {
             System.setProperty("java.rmi.server.hostname", ccConfig.getClusterPublicAddress());
@@ -180,6 +200,7 @@
         final INodeJobTracker nodeJobTracker = appCtx.getNodeJobTracker();
         ccServiceCtx.addJobLifecycleListener(nodeJobTracker);
         ccServiceCtx.addClusterLifecycleListener(nodeJobTracker);
+        ccServiceCtx.addJobLifecycleListener(globalTxManager);
 
         jobCapacityController = new JobCapacityController(controllerService.getResourceManager());
     }
@@ -207,18 +228,23 @@
     protected ICcApplicationContext createApplicationContext(ILibraryManager libraryManager,
             IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator lifecycleCoordinator,
             IReceptionistFactory receptionistFactory, IConfigValidatorFactory configValidatorFactory,
-            CCExtensionManager ccExtensionManager, IAdapterFactoryService adapterFactoryService)
+            CCExtensionManager ccExtensionManager, IAdapterFactoryService adapterFactoryService,
+            IGlobalTxManager globalTxManager, IOManager ioManager, CloudProperties cloudProperties)
             throws AlgebricksException, IOException {
         return new CcApplicationContext(ccServiceCtx, hcc, () -> MetadataManager.INSTANCE, globalRecoveryManager,
                 lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider, new MetadataLockManager(),
                 createMetadataLockUtil(), receptionistFactory, configValidatorFactory, ccExtensionManager,
-                adapterFactoryService);
+                adapterFactoryService, globalTxManager, ioManager, cloudProperties);
     }
 
     protected IGlobalRecoveryManager createGlobalRecoveryManager() throws Exception {
         return ccExtensionManager.getGlobalRecoveryManager(ccServiceCtx, getHcc(), componentProvider);
     }
 
+    protected IGlobalTxManager createGlobalTxManager(IOManager ioManager) throws Exception {
+        return new GlobalTxManager(ccServiceCtx, ioManager);
+    }
+
     protected INcLifecycleCoordinator createNcLifeCycleCoordinator(boolean replicationEnabled) {
         return new NcLifecycleCoordinator(ccServiceCtx, replicationEnabled);
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 2452b8f..3f76a31 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -117,6 +117,7 @@
             performGlobalStorageCleanup(mdTxnCtx, storageGlobalCleanupTimeout);
         }
         mdTxnCtx = doRecovery(appCtx, mdTxnCtx);
+        rollbackIncompleteAtomicTransactions(appCtx);
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         recoveryCompleted = true;
         recovering = false;
@@ -127,6 +128,10 @@
         appCtx.getClusterStateManager().refreshState();
     }
 
+    protected void rollbackIncompleteAtomicTransactions(ICcApplicationContext appCtx) throws Exception {
+        appCtx.getGlobalTxManager().rollback();
+    }
+
     protected void performGlobalStorageCleanup(MetadataTransactionContext mdTxnCtx, int storageGlobalCleanupTimeoutSecs)
             throws Exception {
         List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.01.ddl.sqlpp
new file mode 100644
index 0000000..e41bd77
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.01.ddl.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create dataset reviews primary key (id: int);
+
+CREATE PRIMARY INDEX review_idx_primary ON reviews;
+
+CREATE INDEX review_idx_review ON reviews(review: string?) TYPE BTREE;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.02.update.sqlpp
new file mode 100644
index 0000000..1358269
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.02.update.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ use test;
+
+insert into reviews ([
+    {"id": 1, "year": null, "quarter": null, "review": "good"},
+    {"id": 2, "year": null, "quarter": null, "review": "good"},
+    {"id": 3, "year": 2018, "quarter": null, "review": "good"},
+    {"id": 4, "year": 2018, "quarter": null, "review": "bad"},
+    {"id": 5, "year": 2018, "quarter": 1, "review": "good"},
+    {"id": 5, "year": 2018, "quarter": 1, "review": "bad"},
+    {"id": 7, "year": 2018, "quarter": 2, "review": "good"},
+    {"id": 8, "year": 2018, "quarter": 2, "review": "bad"},
+    {"id": 9, "year": 2019, "quarter": null, "review": "good"},
+    {"id": 10, "year": 2019, "quarter": null, "review": "bad"}
+]);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.03.query.sqlpp
new file mode 100644
index 0000000..8709326
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.03.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+select value count(*)
+from reviews;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.04.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.04.update.sqlpp
new file mode 100644
index 0000000..4ab1bcc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.04.update.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into reviews ([
+    {"id": 1, "year": null, "quarter": null, "review": "good"},
+    {"id": 2, "year": null, "quarter": null, "review": "good"},
+    {"id": 3, "year": 2018, "quarter": null, "review": "good"},
+    {"id": 4, "year": 2018, "quarter": null, "review": "bad"},
+    {"id": 5, "year": 2018, "quarter": 1, "review": "good"},
+    {"id": 6, "year": 2018, "quarter": 1, "review": "bad"},
+    {"id": 7, "year": 2018, "quarter": 2, "review": "good"},
+    {"id": 8, "year": 2018, "quarter": 2, "review": "bad"},
+    {"id": 9, "year": 2019, "quarter": null, "review": "good"},
+    {"id": 10, "year": 2019, "quarter": null, "review": "bad"}
+]);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.05.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.05.query.sqlpp
new file mode 100644
index 0000000..8709326
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.05.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+select value count(*)
+from reviews;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.06.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.06.ddl.sqlpp
new file mode 100644
index 0000000..3cc900b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.06.ddl.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+drop dataset reviews;
+
+drop dataverse test;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.01.ddl.sqlpp
new file mode 100644
index 0000000..e41bd77
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.01.ddl.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create dataset reviews primary key (id: int);
+
+CREATE PRIMARY INDEX review_idx_primary ON reviews;
+
+CREATE INDEX review_idx_review ON reviews(review: string?) TYPE BTREE;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.02.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.02.query.sqlpp
new file mode 100644
index 0000000..ba1e16a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.02.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into reviews ([
+    {"id": 1, "year": null, "quarter": null, "review": "good"},
+    {"id": 2, "year": null, "quarter": null, "review": "good"},
+    {"id": 3, "year": 2018, "quarter": null, "review": "good"}
+]) returning review;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.03.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.03.update.sqlpp
new file mode 100644
index 0000000..ea608d6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.03.update.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into reviews ([
+    {"id": 4, "year": 2018, "quarter": null, "review": "bad"},
+    {"id": 5, "year": 2018, "quarter": 1, "review": "good"},
+    {"id": 6, "year": 2018, "quarter": 1, "review": "bad"},
+    {"id": 7, "year": 2018, "quarter": 2, "review": "good"},
+    {"id": 8, "year": 2018, "quarter": 2, "review": "bad"},
+    {"id": 9, "year": 2019, "quarter": null, "review": "good"},
+    {"id": 10, "year": 2019, "quarter": null, "review": "bad"}
+]);
+
+delete from reviews where year=2019;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.04.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.04.query.sqlpp
new file mode 100644
index 0000000..8709326
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.04.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+select value count(*)
+from reviews;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.05.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.05.ddl.sqlpp
new file mode 100644
index 0000000..3cc900b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.05.ddl.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+drop dataset reviews;
+
+drop dataverse test;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.03.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.03.adm
new file mode 100644
index 0000000..c227083
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.03.adm
@@ -0,0 +1 @@
+0
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.05.adm
new file mode 100644
index 0000000..9a03714
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.05.adm
@@ -0,0 +1 @@
+10
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.02.adm
new file mode 100644
index 0000000..3ca0114
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.02.adm
@@ -0,0 +1,3 @@
+"good"
+"good"
+"good"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.04.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.04.adm
new file mode 100644
index 0000000..301160a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.04.adm
@@ -0,0 +1 @@
+8
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index a257c2d..0afee45 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -16328,4 +16328,18 @@
       </compilation-unit>
     </test-case>
   </test-group>
+  <test-group name="atomic-statements">
+    <test-case FilePath="atomic-statements">
+      <compilation-unit name="atomic-statements-1">
+        <output-dir compare="Clean-JSON">atomic-statements-1</output-dir>
+        <expected-error>HYR0033: Inserting duplicate keys into the primary storage</expected-error>
+        <source-location>false</source-location>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="atomic-statements">
+      <compilation-unit name="atomic-statements-2">
+        <output-dir compare="Clean-JSON">atomic-statements-2</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
 </test-suite>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 1c2a047..ac66f33 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.common.context.IndexInfo;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.StorageIOStats;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -185,4 +186,6 @@
      * @param partitionId
      */
     void closePartition(int partitionId);
+
+    IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
index 09ab550..27a3d49 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
@@ -45,4 +45,7 @@
      * @return The participating nodes in the job execution
      */
     Set<String> getJobParticipatingNodes(JobSpecification spec);
+
+    int getNumParticipatingPartitions(JobSpecification spec);
+
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
new file mode 100644
index 0000000..498d174
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.IGlobalTransactionContext;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+
+public interface IGlobalTxManager extends IJobLifecycleListener {
+
+    enum TransactionStatus {
+        ACTIVE,
+        PREPARED,
+        COMMITTED,
+        ABORTED,
+        ROLLBACK
+    }
+
+    IGlobalTransactionContext beginTransaction(JobId jobId, int numParticipatingNodes, int numParticipatingPartitions,
+            List<Integer> participatingDatasetIds) throws ACIDException;
+
+    void commitTransaction(JobId jobId) throws ACIDException;
+
+    void abortTransaction(JobId jobId) throws Exception;
+
+    IGlobalTransactionContext getTransactionContext(JobId jobId) throws ACIDException;
+
+    void handleJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
+            Map<String, ILSMComponentId> componentIdMap);
+
+    void handleJobCompletionMessage(JobId jobId, String nodeId);
+
+    void handleJobRollbackCompletionMessage(JobId jobId, String nodeId);
+
+    void rollback() throws Exception;
+
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index f9c410b..7a109cc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -654,4 +654,9 @@
         return !(lsmIndex.isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()
                 || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit());
     }
+
+    @Override
+    public IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
+        return indexCheckpointManagerProvider;
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index d9456d2..a1980b6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -266,12 +266,14 @@
         }
     }
 
-    public void commit() throws HyracksDataException {
+    public void finishAllFlush() throws HyracksDataException {
         LogRecord logRecord = new LogRecord();
         triggerScheduleFlush(logRecord);
         List<FlushOperation> flushes = new ArrayList<>(getScheduledFlushes());
         LSMIndexUtil.waitFor(flushes);
+    }
 
+    public synchronized void commit() throws HyracksDataException {
         Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
         for (ILSMIndex lsmIndex : indexes) {
             lsmIndex.commit();
@@ -285,13 +287,14 @@
             indexCheckpointManagerProvider.get(ref).flushed(componentSequence, 0L, id.getMaxId());
         }
         lastFlushOperation.clear();
-
-        for (ILSMIndex lsmIndex : indexes) {
-            lsmIndex.getMergePolicy().diskComponentAdded(lsmIndex, false);
-        }
     }
 
     public void abort() throws HyracksDataException {
+        clear();
+    }
+
+    public void clear() throws HyracksDataException {
+        deleteMemoryComponent(false);
         Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
         for (ILSMIndex lsmIndex : indexes) {
             lsmIndex.abort();
@@ -362,7 +365,16 @@
         return "Dataset (" + datasetID + "), Partition (" + partition + ")";
     }
 
-    public void deleteMemoryComponent() throws HyracksDataException {
+    public void deleteMemoryComponent(ILSMIndex lsmIndex, ILSMComponentId nextComponentId) throws HyracksDataException {
+        Map<String, Object> flushMap = new HashMap<>();
+        flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, 0L);
+        flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
+        ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+        accessor.getOpContext().setParameters(flushMap);
+        accessor.deleteComponents(c -> c.getType() == ILSMComponent.LSMComponentType.MEMORY);
+    }
+
+    public void deleteMemoryComponent(boolean onlyPrimaryIndex) throws HyracksDataException {
         Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
         ILSMIndex primaryLsmIndex = null;
         for (ILSMIndex lsmIndex : indexes) {
@@ -379,15 +391,21 @@
         Objects.requireNonNull(primaryLsmIndex, "no primary index found in " + indexes);
         idGenerator.refresh();
         ILSMComponentId nextComponentId = idGenerator.getId();
-        Map<String, Object> flushMap = new HashMap<>();
-        flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, 0L);
-        flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
-        ILSMIndexAccessor accessor = primaryLsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-        accessor.getOpContext().setParameters(flushMap);
-        accessor.deleteComponents(c -> c.getType() == ILSMComponent.LSMComponentType.MEMORY);
+        if (onlyPrimaryIndex) {
+            deleteMemoryComponent(primaryLsmIndex, nextComponentId);
+        } else {
+            for (ILSMIndex lsmIndex : indexes) {
+                deleteMemoryComponent(lsmIndex, nextComponentId);
+            }
+        }
     }
 
     private boolean canSafelyFlush() {
         return numActiveOperations.get() == 0;
     }
+
+    public Map<String, FlushOperation> getLastFlushOperation() {
+        return lastFlushOperation;
+    }
+
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 281b069..922c352 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.common.api.IRequestTracker;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.cluster.IGlobalTxManager;
 import org.apache.asterix.common.config.ExtensionProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.external.IAdapterFactoryService;
@@ -38,6 +39,7 @@
 import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.storage.common.IStorageManager;
 
 /**
@@ -165,4 +167,8 @@
      * @return the data partitioing provider
      */
     IDataPartitioningProvider getDataPartitioningProvider();
+
+    IGlobalTxManager getGlobalTxManager();
+
+    IOManager getIoManager();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index 421fb7b..64748ce 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -19,9 +19,12 @@
 package org.apache.asterix.common.dataflow;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.messaging.AtomicJobPreparedMessage;
 import org.apache.asterix.common.transactions.ILogMarkerCallback;
 import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -32,6 +35,8 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -45,10 +50,12 @@
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.LocalResource;
@@ -101,6 +108,9 @@
                 IIndexDataflowHelper indexHelper = indexHelpers[i];
                 indexHelper.open();
                 indexes[i] = indexHelper.getIndexInstance();
+                if (((ILSMIndex) indexes[i]).isAtomic() && isPrimary()) {
+                    ((PrimaryIndexOperationTracker) ((ILSMIndex) indexes[i]).getOperationTracker()).clear();
+                }
                 LocalResource resource = indexHelper.getResource();
                 modCallbacks[i] = modOpCallbackFactory.createModificationOperationCallback(resource, ctx, this);
                 IIndexAccessParameters iap = new IndexAccessParameters(modCallbacks[i], NoOpOperationCallback.INSTANCE);
@@ -238,11 +248,31 @@
 
     private void commitAtomicInsertDelete() throws HyracksDataException {
         if (isPrimary) {
+            final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
+            int datasetID = -1;
+            boolean atomic = false;
             for (IIndex index : indexes) {
                 if (((ILSMIndex) index).isAtomic()) {
                     PrimaryIndexOperationTracker opTracker =
                             ((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
-                    opTracker.commit();
+                    opTracker.finishAllFlush();
+                    for (Map.Entry<String, FlushOperation> entry : opTracker.getLastFlushOperation().entrySet()) {
+                        componentIdMap.put(entry.getKey(), entry.getValue().getFlushingComponent().getId());
+                    }
+                    datasetID = opTracker.getDatasetInfo().getDatasetID();
+                    atomic = true;
+                }
+            }
+
+            if (atomic) {
+                AtomicJobPreparedMessage message = new AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
+                        ctx.getJobletContext().getServiceContext().getNodeId(), datasetID, componentIdMap);
+                try {
+                    ((NodeControllerService) ctx.getJobletContext().getServiceContext().getControllerService())
+                            .sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
+                                    JavaSerializationUtils.serialize(message), null);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
                 }
             }
         }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
new file mode 100644
index 0000000..8adbf49
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+import java.util.Map;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Message sent from an NC to CC for every partition handled by it after all
+ * the components generated by an atomic statement/job are flushed to disk.
+ */
+public class AtomicJobPreparedMessage implements ICcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final JobId jobId;
+    private final String nodeId;
+    private final int datasetId;
+    private final Map<String, ILSMComponentId> componentIdMap;
+
+    public AtomicJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
+            Map<String, ILSMComponentId> componentIdMap) {
+        this.nodeId = nodeId;
+        this.datasetId = datasetId;
+        this.componentIdMap = componentIdMap;
+        this.jobId = jobId;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        appCtx.getGlobalTxManager().handleJobPreparedMessage(jobId, nodeId, datasetId, componentIdMap);
+    }
+
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index beb8e07..c43f4f0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -102,6 +102,8 @@
      */
     void delete();
 
+    void deleteLatest(long latestId, int historyToDelete);
+
     /**
      * Gets the index last valid component sequence.
      *
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IGlobalTransactionContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IGlobalTransactionContext.java
new file mode 100644
index 0000000..5a175ac
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IGlobalTransactionContext.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.cluster.IGlobalTxManager;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+
+public interface IGlobalTransactionContext {
+
+    JobId getJobId();
+
+    int incrementAndGetAcksReceived();
+
+    int getAcksReceived();
+
+    int getNumNodes();
+
+    int getNumPartitions();
+
+    void resetAcksReceived();
+
+    void setTxnStatus(IGlobalTxManager.TransactionStatus status);
+
+    IGlobalTxManager.TransactionStatus getTxnStatus();
+
+    List<Integer> getDatasetIds();
+
+    Map<String, Map<String, ILSMComponentId>> getNodeResourceMap();
+
+    void persist(IOManager ioManager);
+
+    void delete(IOManager ioManager);
+
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 1321c96..7b6ed0d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.utils;
 
+import java.nio.file.Paths;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -30,6 +31,8 @@
  */
 public class StorageConstants {
 
+    public static final String CC_STORAGE_ROOT_DIR = "/tmp/";
+    public static final String CC_TX_LOG_DIR = Paths.get("cc", "txnlogs").toString();
     public static final String STORAGE_ROOT_DIR_NAME = "storage";
     public static final String INGESTION_LOGS_DIR_NAME = "ingestion_logs";
     public static final String PARTITION_DIR_PREFIX = "partition_";
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
index a2ebd9a..52fba60 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
@@ -33,6 +33,7 @@
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.constraints.Constraint;
 import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
+import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
@@ -81,4 +82,11 @@
                 .map(ConstantExpression::getValue).map(Object::toString).filter(nodeJobs::containsKey)
                 .collect(Collectors.toSet());
     }
+
+    @Override
+    public int getNumParticipatingPartitions(JobSpecification spec) {
+        return spec.getUserConstraints().stream().filter(ce -> ce.getLValue() instanceof PartitionCountExpression)
+                .map(Constraint::getRValue).map(ConstantExpression.class::cast).map(ConstantExpression::getValue)
+                .map(Object::toString).map(Integer::parseInt).max(Integer::compare).get();
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 6e767ed..ec016bc 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -19,11 +19,14 @@
 package org.apache.asterix.runtime.operators;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
+import org.apache.asterix.common.messaging.AtomicJobPreparedMessage;
 import org.apache.asterix.common.transactions.ILogMarkerCallback;
 import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallback;
@@ -35,6 +38,8 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -54,10 +59,12 @@
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -216,6 +223,9 @@
                 indexHelper.open();
                 indexes[i] = indexHelper.getIndexInstance();
                 IIndex index = indexes[i];
+                if (((ILSMIndex) indexes[i]).isAtomic()) {
+                    ((PrimaryIndexOperationTracker) ((ILSMIndex) indexes[i]).getOperationTracker()).clear();
+                }
                 IIndexDataflowHelper keyIndexHelper = keyIndexHelpers[i];
                 IIndex indexForUniquessCheck;
                 if (keyIndexHelper != null) {
@@ -341,11 +351,31 @@
     }
 
     private void commitAtomicInsert() throws HyracksDataException {
+        final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
+        int datasetID = -1;
+        boolean atomic = false;
         for (IIndex index : indexes) {
             if (((ILSMIndex) index).isAtomic()) {
                 PrimaryIndexOperationTracker opTracker =
                         ((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
-                opTracker.commit();
+                opTracker.finishAllFlush();
+                for (Map.Entry<String, FlushOperation> entry : opTracker.getLastFlushOperation().entrySet()) {
+                    componentIdMap.put(entry.getKey(), entry.getValue().getFlushingComponent().getId());
+                }
+                datasetID = opTracker.getDatasetInfo().getDatasetID();
+                atomic = true;
+            }
+        }
+
+        if (atomic) {
+            AtomicJobPreparedMessage message = new AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
+                    ctx.getJobletContext().getServiceContext().getNodeId(), datasetID, componentIdMap);
+            try {
+                ((NodeControllerService) ctx.getJobletContext().getServiceContext().getControllerService())
+                        .sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
+                                JavaSerializationUtils.serialize(message), null);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
             }
         }
     }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 03130ae..de95d60 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -24,11 +24,14 @@
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.messaging.AtomicJobPreparedMessage;
 import org.apache.asterix.common.transactions.ILogMarkerCallback;
 import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.asterix.om.base.AInt8;
@@ -48,6 +51,8 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -69,10 +74,12 @@
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -297,6 +304,9 @@
                 IIndexDataflowHelper indexHelper = indexHelpers[i];
                 indexHelper.open();
                 indexes[i] = indexHelper.getIndexInstance();
+                if (((ILSMIndex) indexes[i]).isAtomic()) {
+                    ((PrimaryIndexOperationTracker) ((ILSMIndex) indexes[i]).getOperationTracker()).clear();
+                }
                 if (ctx.getSharedObject() != null && i == 0) {
                     PrimaryIndexLogMarkerCallback callback =
                             new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) indexes[0]);
@@ -557,12 +567,33 @@
         // No op since nextFrame flushes by default
     }
 
+    // TODO: Refactor and remove duplicated code
     private void commitAtomicUpsert() throws HyracksDataException {
+        final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
+        int datasetID = -1;
+        boolean atomic = false;
         for (IIndex index : indexes) {
             if (((ILSMIndex) index).isAtomic()) {
                 PrimaryIndexOperationTracker opTracker =
                         ((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
-                opTracker.commit();
+                opTracker.finishAllFlush();
+                for (Map.Entry<String, FlushOperation> entry : opTracker.getLastFlushOperation().entrySet()) {
+                    componentIdMap.put(entry.getKey(), entry.getValue().getFlushingComponent().getId());
+                }
+                datasetID = opTracker.getDatasetInfo().getDatasetID();
+                atomic = true;
+            }
+        }
+
+        if (atomic) {
+            AtomicJobPreparedMessage message = new AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
+                    ctx.getJobletContext().getServiceContext().getNodeId(), datasetID, componentIdMap);
+            try {
+                ((NodeControllerService) ctx.getJobletContext().getServiceContext().getControllerService())
+                        .sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
+                                JavaSerializationUtils.serialize(message), null);
+            } catch (Exception e) {
+                throw new ACIDException(e);
             }
         }
     }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
index 2312f15..39c6043 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
@@ -77,4 +77,9 @@
             message.getBuffer().flip();
         }
     }
+
+    @Override
+    public void fail() {
+        failed = true;
+    }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
index cfa39c6..3576ba1 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
@@ -59,7 +59,7 @@
         for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
             PrimaryIndexOperationTracker primaryIndexOpTracker = (PrimaryIndexOperationTracker) opTrackerRef;
             try {
-                primaryIndexOpTracker.deleteMemoryComponent();
+                primaryIndexOpTracker.deleteMemoryComponent(true);
             } catch (HyracksDataException e) {
                 throw new ACIDException(e);
             }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionLog.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionLog.java
new file mode 100644
index 0000000..dc068bd
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionLog.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AtomicTransactionLog {
+
+    private JobId jobId;
+    private List<Integer> datasetIds;
+    private Set<String> nodeIds;
+    private Map<String, Map<String, ILSMComponentId>> nodeResourceMap;
+    private int numPartitions;
+
+    @JsonCreator
+    public AtomicTransactionLog(@JsonProperty("jobId") JobId jobId,
+            @JsonProperty("datasetIds") List<Integer> datasetIds, @JsonProperty("nodeIds") Set<String> nodeIds,
+            @JsonProperty("nodeResourceMap") Map<String, Map<String, ILSMComponentId>> nodeResourceMap,
+            @JsonProperty("numPartitions") int numPartitions) {
+        this.jobId = jobId;
+        this.datasetIds = datasetIds;
+        this.nodeIds = nodeIds;
+        this.nodeResourceMap = nodeResourceMap;
+        this.numPartitions = numPartitions;
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    public List<Integer> getDatasetIds() {
+        return datasetIds;
+    }
+
+    public Set<String> getNodeIds() {
+        return nodeIds;
+    }
+
+    public Map<String, Map<String, ILSMComponentId>> getNodeResourceMap() {
+        return nodeResourceMap;
+    }
+
+    public int getNumPartitions() {
+        return numPartitions;
+    }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTransactionContext.java
new file mode 100644
index 0000000..9d1dd1d
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTransactionContext.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.common.cluster.IGlobalTxManager.TransactionStatus;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.IGlobalTransactionContext;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+@ThreadSafe
+public class GlobalTransactionContext implements IGlobalTransactionContext {
+
+    protected final JobId jobId;
+    private AtomicInteger acksReceived;
+    private final int numNodes;
+    private TransactionStatus status;
+    private final List<Integer> datasetIds;
+    private final int numPartitions;
+    private final Map<String, Map<String, ILSMComponentId>> nodeResourceMap;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    public GlobalTransactionContext(JobId jobId, List<Integer> datasetIds, int numNodes, int numPartitions) {
+        this.jobId = jobId;
+        this.datasetIds = datasetIds;
+        this.numNodes = numNodes;
+        this.numPartitions = numPartitions;
+        this.acksReceived = new AtomicInteger(0);
+        this.nodeResourceMap = new HashMap<>();
+        this.status = TransactionStatus.ACTIVE;
+    }
+
+    public GlobalTransactionContext(FileReference txnLogFileRef, IOManager ioManager) {
+        try {
+            AtomicTransactionLog txnLog = OBJECT_MAPPER.readValue(new String(ioManager.readAllBytes(txnLogFileRef)),
+                    AtomicTransactionLog.class);
+            this.jobId = txnLog.getJobId();
+            this.datasetIds = txnLog.getDatasetIds();
+            this.nodeResourceMap = txnLog.getNodeResourceMap();
+            this.numNodes = nodeResourceMap.keySet().size();
+            this.numPartitions = txnLog.getNumPartitions();
+            this.acksReceived = new AtomicInteger(0);
+        } catch (JsonProcessingException | HyracksDataException e) {
+            throw new ACIDException(e);
+        }
+    }
+
+    @Override
+    public void setTxnStatus(TransactionStatus status) {
+        this.status = status;
+    }
+
+    @Override
+    public TransactionStatus getTxnStatus() {
+        return status;
+    }
+
+    @Override
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public int incrementAndGetAcksReceived() {
+        return acksReceived.incrementAndGet();
+    }
+
+    @Override
+    public int getAcksReceived() {
+        return acksReceived.get();
+    }
+
+    @Override
+    public void resetAcksReceived() {
+        acksReceived = new AtomicInteger(0);
+    }
+
+    public int getNumNodes() {
+        return numNodes;
+    }
+
+    public int getNumPartitions() {
+        return numPartitions;
+    }
+
+    public List<Integer> getDatasetIds() {
+        return datasetIds;
+    }
+
+    public Map<String, Map<String, ILSMComponentId>> getNodeResourceMap() {
+        return nodeResourceMap;
+    }
+
+    @Override
+    public void persist(IOManager ioManager) {
+        try {
+            FileReference fref = ioManager
+                    .resolve(Paths.get(StorageConstants.CC_TX_LOG_DIR, String.format("%s.log", jobId)).toString());
+            AtomicTransactionLog txnLog = new AtomicTransactionLog(jobId, datasetIds, nodeResourceMap.keySet(),
+                    nodeResourceMap, numPartitions);
+            ioManager.overwrite(fref,
+                    OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(txnLog).getBytes());
+        } catch (HyracksDataException | JsonProcessingException e) {
+            throw new ACIDException(e);
+        }
+    }
+
+    @Override
+    public void delete(IOManager ioManager) {
+        try {
+            FileReference fref = ioManager
+                    .resolve(Paths.get(StorageConstants.CC_TX_LOG_DIR, String.format("%s.log", jobId)).toString());
+            ioManager.delete(fref);
+        } catch (HyracksDataException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String prettyPrint() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n" + jobId + "\n");
+        sb.append("TransactionState: " + status + "\n");
+        return sb.toString();
+    }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTxInfo.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTxInfo.java
new file mode 100644
index 0000000..b99da87
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTxInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class GlobalTxInfo implements Serializable {
+
+    private final int numNodes;
+    private final List<Integer> datasetIds;
+    private final int numPartitions;
+
+    public GlobalTxInfo(List<Integer> datasetIds, int numNodes, int numPartitions) {
+        this.datasetIds = datasetIds;
+        this.numNodes = numNodes;
+        this.numPartitions = numPartitions;
+    }
+
+    public int getNumNodes() {
+        return numNodes;
+    }
+
+    public List<Integer> getDatasetIds() {
+        return datasetIds;
+    }
+
+    public int getNumPartitions() {
+        return numPartitions;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
index 366b585..136807c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
@@ -98,5 +98,9 @@
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
index de6b5ff..10db9f1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
@@ -31,6 +31,11 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IWritable;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
 public final class JobId implements IWritable, Serializable, Comparable {
 
     private static final Pattern jobIdPattern = Pattern.compile("^JID:(\\d+)\\.(\\d+)$");
@@ -50,6 +55,7 @@
     private JobId() {
     }
 
+    @JsonCreator
     public JobId(long id) {
         this.id = id;
     }
@@ -58,6 +64,7 @@
         return id;
     }
 
+    @JsonIgnore
     public CcId getCcId() {
         if (ccId == null) {
             ccId = CcId.valueOf((int) (id >>> CcIdPartitionedLongFactory.ID_BITS));
@@ -65,6 +72,7 @@
         return ccId;
     }
 
+    @JsonIgnore
     public long getIdOnly() {
         return id & CcIdPartitionedLongFactory.MAX_ID;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
index ba932e1..009a5f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
@@ -98,5 +98,9 @@
       <groupId>it.unimi.dsi</groupId>
       <artifactId>fastutil-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
index c3835eb..356c8c9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
@@ -18,11 +18,18 @@
  */
 package org.apache.hyracks.storage.am.lsm.common.api;
 
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
 /**
  * Stores the id of the disk component, which is a interval (minId, maxId).
  * It is generated by {@link ILSMComponentIdGenerator}
  *
  */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type")
+@JsonSubTypes({ @JsonSubTypes.Type(value = LSMComponentId.class, name = "lsmComponentId"), })
 public interface ILSMComponentId {
     public enum IdCompareResult {
         UNKNOWN,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index b10dd5c..33fd38d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -300,7 +300,6 @@
 
     @Override
     public void abort() throws HyracksDataException {
-        resetMemoryComponents();
         for (ILSMDiskComponent c : temporaryDiskComponents) {
             c.deactivateAndDestroy();
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
index cf6c4a2..8260dde 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
@@ -19,9 +19,14 @@
 
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
+import java.io.Serializable;
+
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 
-public class LSMComponentId implements ILSMComponentId {
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class LSMComponentId implements ILSMComponentId, Serializable {
 
     public static final long NOT_FOUND = -1;
     public static final long MIN_VALID_COMPONENT_ID = 0;
@@ -37,7 +42,8 @@
 
     private long maxId;
 
-    public LSMComponentId(long minId, long maxId) {
+    @JsonCreator
+    public LSMComponentId(@JsonProperty("minId") long minId, @JsonProperty("maxId") long maxId) {
         assert minId <= maxId;
         this.minId = minId;
         this.maxId = maxId;