diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/io/PersistedResourceRegistry.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/io/PersistedResourceRegistry.java
index 077b657..e754c77 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/io/PersistedResourceRegistry.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/io/PersistedResourceRegistry.java
@@ -31,6 +31,7 @@
 import org.apache.asterix.common.context.DatasetInfoProvider;
 import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.ioopcallbacks.AtomicLSMIndexIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMIndexIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMIndexPageWriteCallbackFactory;
 import org.apache.asterix.common.library.LibraryDescriptor;
@@ -190,6 +191,8 @@
         // ILSMOperationTrackerFactory
         registeredClasses.put("NoOpIOOperationCallbackFactory", NoOpIOOperationCallbackFactory.class);
         registeredClasses.put("LSMBTreeIOOperationCallbackFactory", LSMIndexIOOperationCallbackFactory.class);
+        registeredClasses.put("AtomicLSMBTreeIOOperationCallbackFactory",
+                AtomicLSMIndexIOOperationCallbackFactory.class);
         registeredClasses.put("LSMIndexPageWriteCallbackFactory", LSMIndexPageWriteCallbackFactory.class);
         registeredClasses.put("NoOpPageWriteCallbackFactory", NoOpPageWriteCallbackFactory.class);
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
index 250c25f..8f482ee 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
@@ -23,6 +23,7 @@
 
 import org.apache.asterix.common.context.DatasetInfo;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -34,8 +35,8 @@
     private final List<ITestOpCallback<Void>> callbacks = new ArrayList<>();
 
     public TestPrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
-            ILSMComponentIdGenerator idGenerator) {
-        super(datasetID, partition, logManager, dsInfo, idGenerator);
+            ILSMComponentIdGenerator idGenerator, IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
+        super(datasetID, partition, logManager, dsInfo, idGenerator, indexCheckpointManagerProvider);
     }
 
     public void addCallback(ITestOpCallback<Void> callback) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
index 38fdf56..8cf708b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -60,7 +60,8 @@
                 Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
                 opTracker = new TestPrimaryIndexOperationTracker(datasetId, partition,
                         appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(),
-                        dslcManager.getComponentIdGenerator(datasetId, partition, resource.getPath()));
+                        dslcManager.getComponentIdGenerator(datasetId, partition, resource.getPath()),
+                        appCtx.getIndexCheckpointManagerProvider());
                 replaceMapEntry(opTrackersField, dsr, partition, opTracker);
             }
             return opTracker;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 117b4fc..f9c410b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -339,7 +339,7 @@
         ILSMComponentIdGenerator idGenerator =
                 new LSMComponentIdGenerator(storageProperties.getMemoryComponentsNum(), lastValidId);
         PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(dataset.getDatasetID(), partition,
-                logManager, dataset.getDatasetInfo(), idGenerator);
+                logManager, dataset.getDatasetInfo(), idGenerator, indexCheckpointManagerProvider);
         dataset.setPrimaryIndexOperationTracker(partition, opTracker);
         dataset.setIdGenerator(partition, idGenerator);
     }
@@ -424,7 +424,12 @@
             return;
         }
         // ensure all in-flight flushes gets scheduled
-        logManager.log(waitLog);
+        final boolean requiresWaitLog =
+                dsInfo.getIndexes().values().stream().noneMatch(indexInfo -> indexInfo.getIndex().isAtomic());
+        if (requiresWaitLog) {
+            logManager.log(waitLog);
+        }
+
         for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
             if (!partitions.test(primaryOpTracker.getPartition())) {
                 continue;
@@ -439,7 +444,9 @@
             primaryOpTracker.flushIfNeeded();
         }
         // ensure requested flushes were scheduled
-        logManager.log(waitLog);
+        if (requiresWaitLog) {
+            logManager.log(waitLog);
+        }
         if (!asyncFlush) {
             List<FlushOperation> flushes = new ArrayList<>();
             for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 2704e64..d9456d2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -28,14 +28,18 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -50,6 +54,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
@@ -70,14 +75,17 @@
     private boolean flushLogCreated = false;
     private final Map<String, FlushOperation> scheduledFlushes = new HashMap<>();
     private long lastFlushTime = System.nanoTime();
+    private final Map<String, FlushOperation> lastFlushOperation = new HashMap<>();
+    private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
 
     public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
-            ILSMComponentIdGenerator idGenerator) {
+            ILSMComponentIdGenerator idGenerator, IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
         super(datasetID, dsInfo);
         this.partition = partition;
         this.logManager = logManager;
         this.numActiveOperations = new AtomicInteger();
         this.idGenerator = idGenerator;
+        this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
     }
 
     @Override
@@ -101,7 +109,7 @@
     }
 
     public synchronized void flushIfNeeded() throws HyracksDataException {
-        if (canSafelyFlush()) {
+        if (canSafelyFlush() && !isFlushLogCreated()) {
             flushIfRequested();
         }
     }
@@ -163,9 +171,16 @@
                 synchronized (opTracker) {
                     ILSMMemoryComponent memComponent = lsmIndex.getCurrentMemoryComponent();
                     if (memComponent.getWriterCount() > 0) {
-                        throw new IllegalStateException(
-                                "Can't request a flush on a component with writers inside: Index:" + lsmIndex
-                                        + " Component:" + memComponent);
+                        if (lsmIndex.isAtomic()) {
+                            LOGGER.debug(
+                                    "Can't request a flush on a component with writers inside: Index: {} Component: {}",
+                                    lsmIndex, memComponent);
+                            return;
+                        } else {
+                            throw new IllegalStateException(
+                                    "Can't request a flush on a component with writers inside: Index:" + lsmIndex
+                                            + " Component:" + memComponent);
+                        }
                     }
                     if (memComponent.getState() == ComponentState.READABLE_WRITABLE && memComponent.isModified()) {
                         memComponent.setUnwritable();
@@ -180,7 +195,7 @@
                         + " and partition " + partition + " and is modified but its component id is null");
             }
             LogRecord logRecord = new LogRecord();
-            if (dsInfo.isDurable()) {
+            if (dsInfo.isDurable() && !primaryLsmIndex.isAtomic()) {
                 /*
                  * Generate a FLUSH log.
                  * Flush will be triggered when the log is written to disk by LogFlusher.
@@ -194,7 +209,9 @@
                 }
                 flushLogCreated = true;
             } else {
-                //trigger flush for temporary indexes without generating a FLUSH log.
+                // trigger flush for temporary indexes and indexes on datasets with atomic statements enabled without
+                // generating a FLUSH log.
+                flushLogCreated = true;
                 triggerScheduleFlush(logRecord);
             }
         }
@@ -221,6 +238,16 @@
             Map<String, Object> flushMap = new HashMap<>();
             flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
             flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
+            for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) {
+                if (lsmIndex.isPrimaryIndex()) {
+                    if (lsmIndex.isCurrentMutableComponentEmpty()) {
+                        LOGGER.debug("Primary index on dataset {} and partition {} is empty... skipping flush",
+                                dsInfo.getDatasetID(), partition);
+                        return;
+                    }
+                    break;
+                }
+            }
             synchronized (scheduledFlushes) {
                 for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) {
                     ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -228,6 +255,9 @@
                     ILSMIOOperation flush = accessor.scheduleFlush();
                     lastFlushTime = System.nanoTime();
                     scheduledFlushes.put(flush.getTarget().getRelativePath(), (FlushOperation) flush);
+                    if (lsmIndex.isAtomic()) {
+                        lastFlushOperation.put(lsmIndex.getIndexIdentifier(), (FlushOperation) flush);
+                    }
                     flush.addCompleteListener(this);
                 }
             }
@@ -236,6 +266,39 @@
         }
     }
 
+    public void commit() throws HyracksDataException {
+        LogRecord logRecord = new LogRecord();
+        triggerScheduleFlush(logRecord);
+        List<FlushOperation> flushes = new ArrayList<>(getScheduledFlushes());
+        LSMIndexUtil.waitFor(flushes);
+
+        Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
+        for (ILSMIndex lsmIndex : indexes) {
+            lsmIndex.commit();
+        }
+        for (FlushOperation flush : lastFlushOperation.values()) {
+            FileReference target = flush.getTarget();
+            Map<String, Object> map = flush.getParameters();
+            final LSMComponentId id = (LSMComponentId) map.get(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID);
+            final ResourceReference ref = ResourceReference.of(target.getAbsolutePath());
+            final long componentSequence = IndexComponentFileReference.of(ref.getName()).getSequenceEnd();
+            indexCheckpointManagerProvider.get(ref).flushed(componentSequence, 0L, id.getMaxId());
+        }
+        lastFlushOperation.clear();
+
+        for (ILSMIndex lsmIndex : indexes) {
+            lsmIndex.getMergePolicy().diskComponentAdded(lsmIndex, false);
+        }
+    }
+
+    public void abort() throws HyracksDataException {
+        Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
+        for (ILSMIndex lsmIndex : indexes) {
+            lsmIndex.abort();
+        }
+        lastFlushOperation.clear();
+    }
+
     @Override
     public void completed(ILSMIOOperation operation) {
         synchronized (scheduledFlushes) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index f9934f2..421fb7b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -21,6 +21,7 @@
 import java.nio.ByteBuffer;
 
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.transactions.ILogMarkerCallback;
 import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -48,6 +49,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.LocalResource;
 
@@ -212,6 +214,11 @@
                 failure = ExceptionUtils.suppress(failure, th);
             }
         }
+        if (failure == null && !failed) {
+            commitAtomicInsertDelete();
+        } else {
+            abortAtomicInsertDelete();
+        }
         if (failure != null) {
             throw HyracksDataException.create(failure);
         }
@@ -219,6 +226,7 @@
 
     @Override
     public void fail() throws HyracksDataException {
+        this.failed = true;
         if (writerOpen) {
             writer.fail();
         }
@@ -227,4 +235,28 @@
     public boolean isPrimary() {
         return isPrimary;
     }
+
+    private void commitAtomicInsertDelete() throws HyracksDataException {
+        if (isPrimary) {
+            for (IIndex index : indexes) {
+                if (((ILSMIndex) index).isAtomic()) {
+                    PrimaryIndexOperationTracker opTracker =
+                            ((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
+                    opTracker.commit();
+                }
+            }
+        }
+    }
+
+    private void abortAtomicInsertDelete() throws HyracksDataException {
+        if (isPrimary) {
+            for (IIndex index : indexes) {
+                if (((ILSMIndex) index).isAtomic()) {
+                    PrimaryIndexOperationTracker opTracker =
+                            ((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
+                    opTracker.abort();
+                }
+            }
+        }
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
new file mode 100644
index 0000000..111c6e7
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.ioopcallbacks;
+
+import org.apache.asterix.common.context.DatasetInfo;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class AtomicLSMIOOperationCallback extends LSMIOOperationCallback {
+
+    public AtomicLSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentId componentId,
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
+        super(dsInfo, lsmIndex, componentId, indexCheckpointManagerProvider);
+    }
+
+    @Override
+    public void afterFinalize(ILSMIOOperation operation) throws HyracksDataException {
+        if (operation.getStatus() == LSMIOOperationStatus.FAILURE) {
+            return;
+        }
+        if (operation.getIOOpertionType() != LSMIOOperationType.LOAD
+                && operation.getAccessor().getOpContext().getOperation() == IndexOperation.DELETE_COMPONENTS) {
+            deleteComponentsFromCheckpoint(operation);
+        } else if (operation.getIOOpertionType() == LSMIOOperationType.LOAD) {
+            addComponentToCheckpoint(operation);
+        } else if (isMerge(operation)) {
+            IoUtil.delete(getOperationMaskFilePath(operation));
+        }
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
new file mode 100644
index 0000000..3829c54
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.ioopcallbacks;
+
+import org.apache.asterix.common.api.IDatasetInfoProvider;
+import org.apache.asterix.common.api.ILSMComponentIdGeneratorFactory;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IJsonSerializable;
+import org.apache.hyracks.api.io.IPersistedResourceRegistry;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class AtomicLSMIndexIOOperationCallbackFactory extends LSMIndexIOOperationCallbackFactory {
+
+    private static final long serialVersionUID = -2617830712731546932L;
+
+    public AtomicLSMIndexIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory,
+            IDatasetInfoProvider datasetInfoProvider) {
+        super(idGeneratorFactory, datasetInfoProvider);
+    }
+
+    protected IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
+        return ((INcApplicationContext) ncCtx.getApplicationContext()).getIndexCheckpointManagerProvider();
+    }
+
+    @Override
+    public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException {
+        return new AtomicLSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index,
+                getComponentIdGenerator().getId(), getIndexCheckpointManagerProvider());
+    }
+
+    @SuppressWarnings("squid:S1172") // unused parameter
+    public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json)
+            throws HyracksDataException {
+        final ILSMComponentIdGeneratorFactory idGeneratorFactory =
+                (ILSMComponentIdGeneratorFactory) registry.deserialize(json.get("idGeneratorFactory"));
+        final IDatasetInfoProvider datasetInfoProvider =
+                (IDatasetInfoProvider) registry.deserialize(json.get("datasetInfoProvider"));
+        return new AtomicLSMIndexIOOperationCallbackFactory(idGeneratorFactory, datasetInfoProvider);
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 999636d..9ab4848 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -138,7 +138,7 @@
         }
     }
 
-    private void addComponentToCheckpoint(ILSMIOOperation operation) throws HyracksDataException {
+    protected void addComponentToCheckpoint(ILSMIOOperation operation) throws HyracksDataException {
         // will always update the checkpoint file even if no new component was created
         FileReference target = operation.getTarget();
         Map<String, Object> map = operation.getParameters();
@@ -150,7 +150,7 @@
         indexCheckpointManagerProvider.get(ref).flushed(componentSequence, lsn, id.getMaxId());
     }
 
-    private void deleteComponentsFromCheckpoint(ILSMIOOperation operation) throws HyracksDataException {
+    protected void deleteComponentsFromCheckpoint(ILSMIOOperation operation) throws HyracksDataException {
         // component was deleted... if a flush, do nothing.. if a merge, must update the checkpoint file
         if (operation.getIOOpertionType() == LSMIOOperationType.MERGE) {
             // Get component id of the last disk component
@@ -298,12 +298,12 @@
         return indexCheckpointManagerProvider.get(resourceReference).getValidComponentSequence();
     }
 
-    private boolean isMerge(ILSMIOOperation operation) {
+    protected boolean isMerge(ILSMIOOperation operation) {
         return operation.getIOOpertionType() == LSMIOOperationType.MERGE
                 && operation.getAccessor().getOpContext().getOperation() != IndexOperation.DELETE_COMPONENTS;
     }
 
-    private static FileReference getOperationMaskFilePath(ILSMIOOperation operation) {
+    protected static FileReference getOperationMaskFilePath(ILSMIOOperation operation) {
         FileReference target = operation.getTarget();
         String componentSequence = getComponentSequence(target.getFile().getAbsolutePath());
         FileReference idxRelPath = target.getParent();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 8880461..d7637f4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -423,16 +423,17 @@
         if (createMetadataDataset) {
             final double bloomFilterFalsePositiveRate =
                     appContext.getStorageProperties().getBloomFilterFalsePositiveRate();
-            LSMBTreeLocalResourceFactory lsmBtreeFactory = new LSMBTreeLocalResourceFactory(
-                    storageComponentProvider.getStorageManager(), typeTraits, cmpFactories, null, null, null,
-                    opTrackerFactory, ioOpCallbackFactory, pageWriteCallbackFactory,
-                    storageComponentProvider.getMetadataPageManagerFactory(),
-                    new AsterixVirtualBufferCacheProvider(datasetId),
-                    storageComponentProvider.getIoOperationSchedulerProvider(),
-                    appContext.getMetadataMergePolicyFactory(), StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES,
-                    true, bloomFilterKeyFields, bloomFilterFalsePositiveRate, true, null,
-                    NoOpCompressorDecompressorFactory.INSTANCE, true,
-                    TypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ANULL), NullIntrospector.INSTANCE, false);
+            LSMBTreeLocalResourceFactory lsmBtreeFactory =
+                    new LSMBTreeLocalResourceFactory(storageComponentProvider.getStorageManager(), typeTraits,
+                            cmpFactories, null, null, null, opTrackerFactory, ioOpCallbackFactory,
+                            pageWriteCallbackFactory, storageComponentProvider.getMetadataPageManagerFactory(),
+                            new AsterixVirtualBufferCacheProvider(datasetId),
+                            storageComponentProvider.getIoOperationSchedulerProvider(),
+                            appContext.getMetadataMergePolicyFactory(),
+                            StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES, true, bloomFilterKeyFields,
+                            bloomFilterFalsePositiveRate, true, null, NoOpCompressorDecompressorFactory.INSTANCE, true,
+                            TypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ANULL), NullIntrospector.INSTANCE,
+                            false, false);
             DatasetLocalResourceFactory dsLocalResourceFactory =
                     new DatasetLocalResourceFactory(datasetId, lsmBtreeFactory);
             // TODO(amoudi) Creating the index should be done through the same code path as
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ArrayBTreeResourceFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ArrayBTreeResourceFactoryProvider.java
index 9a2821e..a61db6f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ArrayBTreeResourceFactoryProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ArrayBTreeResourceFactoryProvider.java
@@ -100,7 +100,8 @@
                         pageWriteCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
                         mergePolicyFactory, mergePolicyProperties, true, null, bloomFilterFalsePositiveRate,
                         index.isPrimaryIndex(), btreeFields, compDecompFactory, false,
-                        typeTraitProvider.getTypeTrait(BuiltinType.ANULL), NullIntrospector.INSTANCE, false);
+                        typeTraitProvider.getTypeTrait(BuiltinType.ANULL), NullIntrospector.INSTANCE, false,
+                        dataset.isAtomic());
             default:
                 throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_DATASET_TYPE,
                         dataset.getDatasetType().toString());
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
index 88e96e7..ab4b585 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
@@ -113,7 +113,7 @@
                             mergePolicyFactory, mergePolicyProperties, true, bloomFilterFields,
                             bloomFilterFalsePositiveRate, index.isPrimaryIndex(), btreeFields, compDecompFactory,
                             hasBloomFilter, typeTraitProvider.getTypeTrait(BuiltinType.ANULL),
-                            NullIntrospector.INSTANCE, isSecondaryNoIncrementalMaintenance);
+                            NullIntrospector.INSTANCE, isSecondaryNoIncrementalMaintenance, dataset.isAtomic());
                 } else {
                     //Column
                     List<Integer> keySourceIndicator =
@@ -127,7 +127,8 @@
                             pageWriteCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
                             mergePolicyFactory, mergePolicyProperties, bloomFilterFields, bloomFilterFalsePositiveRate,
                             btreeFields, compDecompFactory, typeTraitProvider.getTypeTrait(BuiltinType.ANULL),
-                            NullIntrospector.INSTANCE, isSecondaryNoIncrementalMaintenance, columnManagerFactory);
+                            NullIntrospector.INSTANCE, isSecondaryNoIncrementalMaintenance, columnManagerFactory,
+                            dataset.isAtomic());
                 }
             default:
                 throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_DATASET_TYPE,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index b97880e..e8baeb1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -37,6 +37,7 @@
 import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.ioopcallbacks.AtomicLSMIndexIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMIndexIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMIndexPageWriteCallbackFactory;
 import org.apache.asterix.common.metadata.DatasetFullyQualifiedName;
@@ -81,6 +82,7 @@
 import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
 import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
 import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
+import org.apache.asterix.transaction.management.runtime.NoOpCommitRuntimeFactory;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -473,7 +475,12 @@
      */
     @SuppressWarnings("squid:S1172")
     public ILSMIOOperationCallbackFactory getIoOperationCallbackFactory(Index index) throws AlgebricksException {
-        return new LSMIndexIOOperationCallbackFactory(getComponentIdGeneratorFactory(), getDatasetInfoProvider());
+        if (isAtomic()) {
+            return new AtomicLSMIndexIOOperationCallbackFactory(getComponentIdGeneratorFactory(),
+                    getDatasetInfoProvider());
+        } else {
+            return new LSMIndexIOOperationCallbackFactory(getComponentIdGeneratorFactory(), getDatasetInfoProvider());
+        }
     }
 
     public ILSMPageWriteCallbackFactory getPageWriteCallbackFactory() throws AlgebricksException {
@@ -499,6 +506,11 @@
         return new DatasetInfoProvider(getDatasetId());
     }
 
+    public boolean isAtomic() {
+        return datasetType == DatasetType.INTERNAL
+                && ((InternalDatasetDetails) getDatasetDetails()).isDatasetWithoutTypeSpecification();
+    }
+
     /**
      * Get search callback factory for this dataset with the passed index and operation
      *
@@ -514,6 +526,10 @@
     public ISearchOperationCallbackFactory getSearchCallbackFactory(IStorageComponentProvider storageComponentProvider,
             Index index, IndexOperation op, int[] primaryKeyFields, int[] primaryKeyFieldsInSecondaryIndex,
             boolean proceedIndexOnlyPlan) throws AlgebricksException {
+        if (isAtomic()) {
+            return NoOpOperationCallbackFactory.INSTANCE;
+        }
+
         if (index.isPrimaryIndex()) {
             /**
              * Due to the read-committed isolation level,
@@ -566,6 +582,10 @@
     public IModificationOperationCallbackFactory getModificationCallbackFactory(
             IStorageComponentProvider componentProvider, Index index, IndexOperation op, int[] primaryKeyFields)
             throws AlgebricksException {
+        if (isAtomic()) {
+            return NoOpOperationCallbackFactory.INSTANCE;
+        }
+
         if (index.isPrimaryIndex()) {
             return op == IndexOperation.UPSERT || op == IndexOperation.INSERT
                     ? new UpsertOperationCallbackFactory(getDatasetId(),
@@ -636,6 +656,10 @@
         IBinaryHashFunctionFactory[] pkHashFunFactories = getPrimaryHashFunctionFactories(metadataProvider);
         ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
                 pkHashFunFactories, datasetPartitions.length);
+        if (isAtomic()) {
+            return new NoOpCommitRuntimeFactory(datasetId, primaryKeyFieldPermutation,
+                    metadataProvider.isWriteTransaction(), datasetPartitions, isSink, partitionerFactory);
+        }
         return new CommitRuntimeFactory(datasetId, primaryKeyFieldPermutation, metadataProvider.isWriteTransaction(),
                 datasetPartitions, isSink, partitionerFactory);
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
index fd19937..5abad5d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
@@ -145,6 +145,10 @@
         return filterSourceIndicator;
     }
 
+    public boolean isDatasetWithoutTypeSpecification() {
+        return isDatasetWithoutTypeSpecification;
+    }
+
     @Override
     public DatasetType getDatasetType() {
         return DatasetType.INTERNAL;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index ac798aa..3950053 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -219,9 +219,10 @@
                     }
                 }
 
+                boolean isDatasetWithoutTypeSpec = primaryKeyTypes != null && !primaryKeyTypes.isEmpty();
                 datasetDetails = new InternalDatasetDetails(fileStructure, partitioningStrategy, partitioningKey,
                         partitioningKey, keyFieldSourceIndicator, primaryKeyTypes, autogenerated, filterSourceIndicator,
-                        filterField);
+                        filterField, isDatasetWithoutTypeSpec);
                 break;
             }
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
index 1de387f..8ee8f11 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
@@ -248,7 +248,7 @@
     public void insertDeleteUpsertBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
             String datasetName) throws AlgebricksException {
         lockMgr.acquireDataverseReadLock(locks, dataverseName);
-        lockMgr.acquireDatasetModifyLock(locks, dataverseName, datasetName);
+        lockMgr.acquireDatasetExclusiveModificationLock(locks, dataverseName, datasetName);
     }
 
     @Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 3762e82..6e767ed 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -21,6 +21,7 @@
 import java.nio.ByteBuffer;
 
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
 import org.apache.asterix.common.transactions.ILogMarkerCallback;
@@ -53,6 +54,7 @@
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -61,6 +63,7 @@
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.MultiComparator;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
@@ -76,7 +79,7 @@
     private MultiComparator keySearchCmp;
     private RangePredicate searchPred;
     private final IIndexCursor[] cursors;
-    private final LockThenSearchOperationCallback[] searchCallbacks;
+    private final ISearchOperationCallback[] searchCallbacks;
     private final ISearchOperationCallbackFactory searchCallbackFactory;
     private final IFrameTupleProcessor[] processors;
     private final LSMTreeIndexAccessor[] lsmAccessorForKeyIndexes;
@@ -101,7 +104,7 @@
                 modCallbackFactory, null, tuplePartitionerFactory, partitionsMap);
         this.sourceLoc = sourceLoc;
         this.frameOpCallbacks = new IFrameOperationCallback[partitions.length];
-        this.searchCallbacks = new LockThenSearchOperationCallback[partitions.length];
+        this.searchCallbacks = new ISearchOperationCallback[partitions.length];
         this.cursors = new IIndexCursor[partitions.length];
         this.lsmAccessorForUniqunessChecks = new LSMTreeIndexAccessor[partitions.length];
         this.lsmAccessorForKeyIndexes = new LSMTreeIndexAccessor[partitions.length];
@@ -138,7 +141,7 @@
             IIndexCursor cursor = cursors[i];
             IIndexAccessor indexAccessor = indexAccessors[i];
             LSMTreeIndexAccessor lsmAccessorForKeyIndex = lsmAccessorForKeyIndexes[i];
-            LockThenSearchOperationCallback searchCallback = searchCallbacks[i];
+            ISearchOperationCallback searchCallback = searchCallbacks[i];
             processors[i] = new IFrameTupleProcessor() {
                 @Override
                 public void process(FrameTupleAccessor accessor, ITupleReference tuple, int index)
@@ -155,7 +158,9 @@
                     try {
                         if (cursor.hasNext()) {
                             // duplicate, skip
-                            searchCallback.release();
+                            if (searchCallback instanceof LockThenSearchOperationCallback) {
+                                ((LockThenSearchOperationCallback) searchCallback).release();
+                            }
                             duplicate = true;
                         }
                     } finally {
@@ -226,7 +231,7 @@
                 }
                 modCallbacks[i] =
                         modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this);
-                searchCallbacks[i] = (LockThenSearchOperationCallback) searchCallbackFactory
+                searchCallbacks[i] = searchCallbackFactory
                         .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
                 IIndexAccessParameters iap = new IndexAccessParameters(modCallbacks[i], NoOpOperationCallback.INSTANCE);
                 indexAccessors[i] = index.createAccessor(iap);
@@ -240,6 +245,8 @@
                         new IndexAccessParameters(NoOpOperationCallback.INSTANCE, searchCallbacks[i]);
                 lsmAccessorForUniqunessChecks[i] =
                         (LSMTreeIndexAccessor) indexForUniquessCheck.createAccessor(iapForUniquenessCheck);
+                setAtomicOpContextIfAtomic(indexForUniquessCheck, lsmAccessorForUniqunessChecks[i]);
+
                 cursors[i] = lsmAccessorForUniqunessChecks[i].createSearchCursor(false);
                 LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
                         appCtx.getTransactionSubsystem().getLogManager());
@@ -311,6 +318,12 @@
         failure = CleanupUtils.close(writer, failure);
         failure = CleanupUtils.close(indexHelpers, failure);
         failure = CleanupUtils.close(keyIndexHelpers, failure);
+        if (failure == null && !failed) {
+            commitAtomicInsert();
+        } else {
+            abortAtomicInsert();
+        }
+
         if (failure != null) {
             throw HyracksDataException.create(failure);
         }
@@ -318,6 +331,7 @@
 
     @Override
     public void fail() throws HyracksDataException {
+        failed = true;
         writer.fail();
     }
 
@@ -325,4 +339,24 @@
     public void flush() throws HyracksDataException {
         // No op since nextFrame flushes by default
     }
+
+    private void commitAtomicInsert() throws HyracksDataException {
+        for (IIndex index : indexes) {
+            if (((ILSMIndex) index).isAtomic()) {
+                PrimaryIndexOperationTracker opTracker =
+                        ((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
+                opTracker.commit();
+            }
+        }
+    }
+
+    private void abortAtomicInsert() throws HyracksDataException {
+        for (IIndex index : indexes) {
+            if (((ILSMIndex) index).isAtomic()) {
+                PrimaryIndexOperationTracker opTracker =
+                        ((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
+                opTracker.abort();
+            }
+        }
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 3a9a020..03130ae 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -26,6 +26,7 @@
 import java.util.Date;
 
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ILogMarkerCallback;
@@ -68,6 +69,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -76,6 +78,7 @@
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.MultiComparator;
 import org.apache.hyracks.storage.common.projection.ITupleProjector;
 import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
@@ -119,7 +122,7 @@
     private final boolean hasMeta;
     private final int filterFieldIndex;
     private final int metaFieldIndex;
-    protected final LockThenSearchOperationCallback[] searchCallbacks;
+    protected final ISearchOperationCallback[] searchCallbacks;
     protected final IFrameOperationCallback[] frameOpCallbacks;
     private final IFrameOperationCallbackFactory frameOpCallbackFactory;
     private final ISearchOperationCallbackFactory searchCallbackFactory;
@@ -142,7 +145,7 @@
                 modCallbackFactory, null, tuplePartitionerFactory, partitionsMap);
         this.hasSecondaries = hasSecondaries;
         this.frameOpCallbacks = new IFrameOperationCallback[partitions.length];
-        this.searchCallbacks = new LockThenSearchOperationCallback[partitions.length];
+        this.searchCallbacks = new ISearchOperationCallback[partitions.length];
         this.cursors = new IIndexCursor[partitions.length];
         this.processors = new IFrameTupleProcessor[partitions.length];
         this.key = new PermutingFrameTupleReference();
@@ -180,7 +183,7 @@
         for (int i = 0; i < partitions.length; i++) {
             ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessors[i];
             IIndexCursor cursor = cursors[i];
-            LockThenSearchOperationCallback searchCallback = searchCallbacks[i];
+            ISearchOperationCallback searchCallback = searchCallbacks[i];
             IModificationOperationCallback modCallback = modCallbacks[i];
             IFrameOperationCallback frameOpCallback = frameOpCallbacks[i];
             processors[i] = new IFrameTupleProcessor() {
@@ -189,8 +192,7 @@
                         throws HyracksDataException {
                     try {
                         tb.reset();
-                        AbstractIndexModificationOperationCallback abstractModCallback =
-                                (AbstractIndexModificationOperationCallback) modCallback;
+                        IModificationOperationCallback abstractModCallback = modCallback;
                         boolean recordWasInserted = false;
                         boolean recordWasDeleted = false;
                         boolean isDelete = isDeleteOperation(tuple, numOfPrimaryKeys);
@@ -223,11 +225,17 @@
                         if (isDelete && prevTuple != null) {
                             // Only delete if it is a delete and not upsert
                             // And previous tuple with the same key was found
-                            abstractModCallback.setOp(Operation.DELETE);
+                            if (abstractModCallback instanceof AbstractIndexModificationOperationCallback) {
+                                ((AbstractIndexModificationOperationCallback) abstractModCallback)
+                                        .setOp(Operation.DELETE);
+                            }
                             lsmAccessor.forceDelete(tuple);
                             recordWasDeleted = true;
                         } else if (!isDelete) {
-                            abstractModCallback.setOp(Operation.UPSERT);
+                            if (abstractModCallback instanceof AbstractIndexModificationOperationCallback) {
+                                ((AbstractIndexModificationOperationCallback) abstractModCallback)
+                                        .setOp(Operation.UPSERT);
+                            }
                             lsmAccessor.forceUpsert(tuple);
                             recordWasInserted = true;
                         }
@@ -296,11 +304,12 @@
                 }
                 modCallbacks[i] =
                         modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this);
-                searchCallbacks[i] = (LockThenSearchOperationCallback) searchCallbackFactory
+                searchCallbacks[i] = searchCallbackFactory
                         .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
                 IIndexAccessParameters iap = new IndexAccessParameters(modCallbacks[i], searchCallbacks[i]);
                 iap.getParameters().put(HyracksConstants.TUPLE_PROJECTOR, tupleProjector);
                 indexAccessors[i] = indexes[i].createAccessor(iap);
+                setAtomicOpContextIfAtomic(indexes[i], indexAccessors[i]);
                 cursors[i] = ((LSMTreeIndexAccessor) indexAccessors[i]).createSearchCursor(false);
                 LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) indexes[i],
                         appCtx.getTransactionSubsystem().getLogManager());
@@ -353,7 +362,7 @@
     }
 
     protected void writeOutput(int tupleIndex, boolean recordWasInserted, boolean recordWasDeleted,
-            LockThenSearchOperationCallback searchCallback) throws IOException {
+            ISearchOperationCallback searchCallback) throws IOException {
         if (recordWasInserted || recordWasDeleted) {
             frameTuple.reset(accessor, tupleIndex);
             for (int i = 0; i < frameTuple.getFieldCount(); i++) {
@@ -363,7 +372,9 @@
             FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
         } else {
             try {
-                searchCallback.release();
+                if (searchCallback instanceof LockThenSearchOperationCallback) {
+                    ((LockThenSearchOperationCallback) searchCallback).release();
+                }
             } catch (ACIDException e) {
                 throw HyracksDataException.create(e);
             }
@@ -506,6 +517,12 @@
         failure = CleanupUtils.destroy(failure, cursors);
         failure = CleanupUtils.close(writer, failure);
         failure = CleanupUtils.close(indexHelpers, failure);
+        if (failure == null && !failed) {
+            commitAtomicUpsert();
+        } else {
+            abortAtomicUpsert();
+        }
+
         if (failure != null) {
             throw HyracksDataException.create(failure);
         }
@@ -531,6 +548,7 @@
 
     @Override
     public void fail() throws HyracksDataException {
+        failed = true;
         writer.fail();
     }
 
@@ -538,4 +556,24 @@
     public void flush() throws HyracksDataException {
         // No op since nextFrame flushes by default
     }
+
+    private void commitAtomicUpsert() throws HyracksDataException {
+        for (IIndex index : indexes) {
+            if (((ILSMIndex) index).isAtomic()) {
+                PrimaryIndexOperationTracker opTracker =
+                        ((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
+                opTracker.commit();
+            }
+        }
+    }
+
+    private void abortAtomicUpsert() throws HyracksDataException {
+        for (IIndex index : indexes) {
+            if (((ILSMIndex) index).isAtomic()) {
+                PrimaryIndexOperationTracker opTracker =
+                        ((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
+                opTracker.abort();
+            }
+        }
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index 6d5e88b0..3dc6d37 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -41,6 +41,7 @@
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
 
 /**
  * This operator node is used for secondary indexes with upsert operations.
@@ -101,8 +102,7 @@
                 int storagePartition = tuplePartitioner.partition(accessor, i);
                 int storageIdx = storagePartitionId2Index.get(storagePartition);
                 ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessors[storageIdx];
-                AbstractIndexModificationOperationCallback abstractModCallback =
-                        (AbstractIndexModificationOperationCallback) modCallbacks[storageIdx];
+                IModificationOperationCallback abstractModCallback = modCallbacks[storageIdx];
                 frameTuple.reset(accessor, i);
                 int operation = operationInspector.getIntegerValue(frameTuple.getFieldData(operationFieldIndex),
                         frameTuple.getFieldStart(operationFieldIndex), frameTuple.getFieldLength(operationFieldIndex));
@@ -111,23 +111,33 @@
 
                 if (operation == UPSERT_NEW) {
                     if (tupleFilterIsNull || tupleFilter.accept(frameTuple)) {
-                        abstractModCallback.setOp(Operation.INSERT);
+                        if (abstractModCallback instanceof AbstractIndexModificationOperationCallback) {
+                            ((AbstractIndexModificationOperationCallback) abstractModCallback).setOp(Operation.INSERT);
+                        }
                         lsmAccessor.forceInsert(tuple);
                     }
                 } else if (operation == UPSERT_EXISTING) {
                     if (!TupleUtils.equalTuples(tuple, prevTuple, numberOfFields)) {
                         if (prevTupleFilterIsNull || prevTupleFilter.accept(frameTuple)) {
-                            abstractModCallback.setOp(Operation.DELETE);
+                            if (abstractModCallback instanceof AbstractIndexModificationOperationCallback) {
+                                ((AbstractIndexModificationOperationCallback) abstractModCallback)
+                                        .setOp(Operation.DELETE);
+                            }
                             lsmAccessor.forceDelete(prevTuple);
                         }
                         if (tupleFilterIsNull || tupleFilter.accept(frameTuple)) {
-                            abstractModCallback.setOp(Operation.INSERT);
+                            if (abstractModCallback instanceof AbstractIndexModificationOperationCallback) {
+                                ((AbstractIndexModificationOperationCallback) abstractModCallback)
+                                        .setOp(Operation.INSERT);
+                            }
                             lsmAccessor.forceInsert(tuple);
                         }
                     }
                 } else if (operation == DELETE_EXISTING) {
                     if (prevTupleFilterIsNull || prevTupleFilter.accept(frameTuple)) {
-                        abstractModCallback.setOp(Operation.DELETE);
+                        if (abstractModCallback instanceof AbstractIndexModificationOperationCallback) {
+                            ((AbstractIndexModificationOperationCallback) abstractModCallback).setOp(Operation.DELETE);
+                        }
                         lsmAccessor.forceDelete(prevTuple);
                     }
                 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
index 17858a3e..3eb9437 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
@@ -41,6 +41,7 @@
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.tuples.ConcatenatingTupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
 
 public class LSMSecondaryUpsertWithNestedPlanOperatorNodePushable extends LSMSecondaryUpsertOperatorNodePushable {
     private final NestedTupleSourceRuntime[] startOfNewKeyPipelines;
@@ -190,11 +191,13 @@
                 int storagePartition = tuplePartitioner.partition(tuple.getFrameTupleAccessor(), tuple.getTupleIndex());
                 int storageIdx = storagePartitionId2Index.get(storagePartition);
                 ILSMIndexAccessor workingLSMAccessor = (ILSMIndexAccessor) indexAccessors[storageIdx];
-                AbstractIndexModificationOperationCallback abstractModCallback =
-                        (AbstractIndexModificationOperationCallback) modCallbacks[storageIdx];
+                IModificationOperationCallback abstractModCallback = modCallbacks[storageIdx];
                 // Finally, pass the tuple to our accessor. There are only two operations: insert or delete.
                 if (this.isInsert) {
-                    abstractModCallback.setOp(AbstractIndexModificationOperationCallback.Operation.INSERT);
+                    if (abstractModCallback instanceof AbstractIndexModificationOperationCallback) {
+                        ((AbstractIndexModificationOperationCallback) abstractModCallback)
+                                .setOp(AbstractIndexModificationOperationCallback.Operation.INSERT);
+                    }
                     try {
                         workingLSMAccessor.forceInsert(endTupleReference);
                     } catch (HyracksDataException e) {
@@ -203,7 +206,10 @@
                         }
                     }
                 } else {
-                    abstractModCallback.setOp(AbstractIndexModificationOperationCallback.Operation.DELETE);
+                    if (abstractModCallback instanceof AbstractIndexModificationOperationCallback) {
+                        ((AbstractIndexModificationOperationCallback) abstractModCallback)
+                                .setOp(AbstractIndexModificationOperationCallback.Operation.DELETE);
+                    }
                     workingLSMAccessor.forceDelete(endTupleReference);
                 }
             }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
new file mode 100644
index 0000000..2312f15
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.runtime;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.utils.TaskUtil;
+
+public class NoOpCommitRuntime extends CommitRuntime {
+
+    public NoOpCommitRuntime(IHyracksTaskContext ctx, TxnId txnId, int datasetId, int[] primaryKeyFields,
+            boolean isWriteTransaction, int resourcePartition, boolean isSink,
+            ITuplePartitionerFactory partitionerFactory, int[] datasetPartitions) {
+        super(ctx, txnId, datasetId, primaryKeyFields, isWriteTransaction, resourcePartition, isSink,
+                partitionerFactory, datasetPartitions);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        try {
+            transactionContext = transactionManager.getTransactionContext(txnId);
+            transactionContext.setWriteTxn(isWriteTransaction);
+            if (isSink) {
+                return;
+            }
+            initAccessAppend(ctx);
+            super.open();
+        } catch (ACIDException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        tAccess.reset(buffer);
+        int nTuple = tAccess.getTupleCount();
+        for (int t = 0; t < nTuple; t++) {
+            tRef.reset(tAccess, t);
+            try {
+                if (!isSink) {
+                    appendTupleToFrame(t);
+                }
+            } catch (ACIDException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+        IFrame message = TaskUtil.get(HyracksConstants.KEY_MESSAGE, ctx);
+        if (message != null
+                && MessagingFrameTupleAppender.getMessageType(message) == MessagingFrameTupleAppender.MARKER_MESSAGE) {
+            message.reset();
+            message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+            message.getBuffer().flip();
+        }
+    }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntimeFactory.java
new file mode 100644
index 0000000..bfe610b
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntimeFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.runtime;
+
+import org.apache.asterix.common.api.IJobEventListenerFactory;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+
+public class NoOpCommitRuntimeFactory extends CommitRuntimeFactory {
+
+    private static final long serialVersionUID = 2L;
+
+    public NoOpCommitRuntimeFactory(int datasetId, int[] primaryKeyFields, boolean isWriteTransaction,
+            int[] datasetPartitions, boolean isSink, ITuplePartitionerFactory partitionerFactory) {
+        super(datasetId, primaryKeyFields, isWriteTransaction, datasetPartitions, isSink, partitionerFactory);
+    }
+
+    @Override
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+        IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+        return new IPushRuntime[] { new NoOpCommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(datasetId),
+                datasetId, primaryKeyFields, isWriteTransaction,
+                datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink, partitionerFactory,
+                datasetPartitions) };
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
index 3c829e6..b076cf2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
@@ -30,6 +30,8 @@
 
     public static final String TUPLE_PROJECTOR = "TUPLE_PROJECTOR";
 
+    public static final String ATOMIC_OP_CONTEXT = "ATOMIC_OP_CONTEXT";
+
     private HyracksConstants() {
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
index c3bc5ca..e279eaa 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
@@ -49,6 +49,6 @@
                 getVirtualBufferCacheProvider(), SynchronousSchedulerProvider.INSTANCE, MERGE_POLICY_FACTORY,
                 MERGE_POLICY_PROPERTIES, DURABLE, bloomFilterKeyFields,
                 LSMTreeOperatorTestHelper.DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE, true, btreefields,
-                NoOpCompressorDecompressorFactory.INSTANCE, bloomFilterKeyFields != null, null, null, false);
+                NoOpCompressorDecompressorFactory.INSTANCE, bloomFilterKeyFields != null, null, null, false, false);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 422aef3..10425f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -69,6 +69,7 @@
     protected final int[] partitions;
     protected final Int2IntMap storagePartitionId2Index;
     protected boolean writerOpen;
+    protected boolean failed;
 
     public IndexInsertUpdateDeleteOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
@@ -92,6 +93,7 @@
         this.op = op;
         this.tuple.setFieldPermutation(fieldPermutation);
         this.tuplePartitioner = tuplePartitionerFactory.createPartitioner(ctx);
+        this.failed = false;
     }
 
     @Override
@@ -203,6 +205,7 @@
 
     @Override
     public void fail() throws HyracksDataException {
+        failed = true;
         if (writerOpen) {
             writer.fail();
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
index 8c47bba..a0b592a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
@@ -61,22 +61,22 @@
             ILSMIOOperationSchedulerProvider ioSchedulerProvider,
             ICompressorDecompressorFactory compressorDecompressorFactory, ITypeTraits nullTypeTraits,
             INullIntrospector nullIntrospector, boolean isSecondaryNoIncrementalMaintenance,
-            IColumnManagerFactory columnManagerFactory) {
+            IColumnManagerFactory columnManagerFactory, boolean atomic) {
         super(typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, true, path, storageManager,
                 mergePolicyFactory, mergePolicyProperties, null, null, btreeFields, null, opTrackerProvider,
                 ioOpCallbackFactory, pageWriteCallbackFactory, metadataPageManagerFactory, vbcProvider,
                 ioSchedulerProvider, true, compressorDecompressorFactory, true, nullTypeTraits, nullIntrospector,
-                isSecondaryNoIncrementalMaintenance);
+                isSecondaryNoIncrementalMaintenance, atomic);
         this.columnManagerFactory = columnManagerFactory;
     }
 
     private LSMColumnBTreeLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields,
             double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields,
             ICompressorDecompressorFactory compressorDecompressorFactory, boolean hasBloomFilter,
-            boolean isSecondaryNoIncrementalMaintenance, IColumnManagerFactory columnManagerFactory)
+            boolean isSecondaryNoIncrementalMaintenance, IColumnManagerFactory columnManagerFactory, boolean atomic)
             throws HyracksDataException {
         super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields,
-                compressorDecompressorFactory, hasBloomFilter, isSecondaryNoIncrementalMaintenance);
+                compressorDecompressorFactory, hasBloomFilter, isSecondaryNoIncrementalMaintenance, atomic);
         this.columnManagerFactory = columnManagerFactory;
     }
 
@@ -93,7 +93,7 @@
                 opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx),
                 ioOpCallbackFactory, pageWriteCallbackFactory, btreeFields, metadataPageManagerFactory, false,
                 serviceCtx.getTracer(), compressorDecompressorFactory, nullTypeTraits, nullIntrospector,
-                columnManagerFactory);
+                columnManagerFactory, atomic);
     }
 
     public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json)
@@ -109,11 +109,12 @@
         JsonNode columnManagerFactoryNode = json.get("columnManagerFactory");
         boolean isSecondaryNoIncrementalMaintenance =
                 getOrDefaultBoolean(json, "isSecondaryNoIncrementalMaintenance", false);
+        boolean atomic = getOrDefaultBoolean(json, "atomic", false);
         IColumnManagerFactory columnManagerFactory =
                 (IColumnManagerFactory) registry.deserialize(columnManagerFactoryNode);
         return new LSMColumnBTreeLocalResource(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
                 isPrimary, btreeFields, compDecompFactory, hasBloomFilter, isSecondaryNoIncrementalMaintenance,
-                columnManagerFactory);
+                columnManagerFactory, atomic);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
index eccb7c2..2cd045a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
@@ -51,12 +51,12 @@
             Map<String, String> mergePolicyProperties, int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate,
             int[] btreeFields, ICompressorDecompressorFactory compressorDecompressorFactory, ITypeTraits nullTypeTraits,
             INullIntrospector nullIntrospector, boolean isSecondaryNoIncrementalMaintenance,
-            IColumnManagerFactory columnManagerFactory) {
+            IColumnManagerFactory columnManagerFactory, boolean atomic) {
         super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
                 opTrackerFactory, ioOpCallbackFactory, pageWriteCallbackFactory, metadataPageManagerFactory,
                 vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, true, bloomFilterKeyFields,
                 bloomFilterFalsePositiveRate, true, btreeFields, compressorDecompressorFactory, true, nullTypeTraits,
-                nullIntrospector, isSecondaryNoIncrementalMaintenance);
+                nullIntrospector, isSecondaryNoIncrementalMaintenance, atomic);
         this.columnManagerFactory = columnManagerFactory;
     }
 
@@ -66,6 +66,6 @@
                 bloomFilterFalsePositiveRate, fileRef.getRelativePath(), storageManager, mergePolicyFactory,
                 mergePolicyProperties, btreeFields, opTrackerProvider, ioOpCallbackFactory, pageWriteCallbackFactory,
                 metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, compressorDecompressorFactory,
-                nullTypeTraits, nullIntrospector, isSecondaryNoIncrementalMaintenance, columnManagerFactory);
+                nullTypeTraits, nullIntrospector, isSecondaryNoIncrementalMaintenance, columnManagerFactory, atomic);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
index 048d9de..e95b380 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
@@ -70,11 +70,13 @@
             double bloomFilterFalsePositiveRate, int fieldCount, IBinaryComparatorFactory[] cmpFactories,
             ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
             ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
-            int[] btreeFields, ITracer tracer, IColumnManager columnManager) throws HyracksDataException {
+            int[] btreeFields, ITracer tracer, IColumnManager columnManager, boolean atomic)
+            throws HyracksDataException {
         super(ioManager, virtualBufferCaches, interiorFrameFactory, insertLeafFrameFactory, deleteLeafFrameFactory,
                 diskBufferCache, fileManager, componentFactory, bulkloadComponentFactory, null, null, null,
                 bloomFilterFalsePositiveRate, fieldCount, cmpFactories, mergePolicy, opTracker, ioScheduler,
-                ioOpCallbackFactory, pageWriteCallbackFactory, true, true, btreeFields, null, true, false, tracer);
+                ioOpCallbackFactory, pageWriteCallbackFactory, true, true, btreeFields, null, true, false, tracer,
+                atomic);
         this.columnManager = columnManager;
         this.mergeComponentFactory = mergeComponentFactory;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
index 1a55447..6ef0dc6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
@@ -63,7 +63,7 @@
             ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
             int[] btreeFields, IMetadataPageManagerFactory freePageManagerFactory, boolean updateAware, ITracer tracer,
             ICompressorDecompressorFactory compressorDecompressorFactory, ITypeTraits nullTypeTraits,
-            INullIntrospector nullIntrospector, IColumnManagerFactory columnManagerFactory)
+            INullIntrospector nullIntrospector, IColumnManagerFactory columnManagerFactory, boolean atomic)
             throws HyracksDataException {
 
         //Tuple writers
@@ -111,6 +111,6 @@
                 deleteLeafFrameFactory, diskBufferCache, fileNameManager, flushComponentFactory, mergeComponentFactory,
                 bulkLoadComponentFactory, bloomFilterFalsePositiveRate, typeTraits.length, cmpFactories, mergePolicy,
                 opTracker, ioScheduler, ioOpCallbackFactory, pageWriteCallbackFactory, btreeFields, tracer,
-                columnManagerFactory.createColumnManager());
+                columnManagerFactory.createColumnManager(), atomic);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
index 0a47fc0..43555ca 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
@@ -60,6 +60,7 @@
     protected final int[] btreeFields;
     protected final ICompressorDecompressorFactory compressorDecompressorFactory;
     protected final boolean isSecondaryNoIncrementalMaintenance;
+    protected final boolean atomic;
 
     public LSMBTreeLocalResource(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories,
             int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, boolean isPrimary, String path,
@@ -71,8 +72,8 @@
             IMetadataPageManagerFactory metadataPageManagerFactory, IVirtualBufferCacheProvider vbcProvider,
             ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean durable,
             ICompressorDecompressorFactory compressorDecompressorFactory, boolean hasBloomFilter,
-            ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector,
-            boolean isSecondaryNoIncrementalMaintenance) {
+            ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector, boolean isSecondaryNoIncrementalMaintenance,
+            boolean atomic) {
         super(path, storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
                 opTrackerProvider, ioOpCallbackFactory, pageWriteCallbackFactory, metadataPageManagerFactory,
                 vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, durable, nullTypeTraits,
@@ -84,12 +85,13 @@
         this.compressorDecompressorFactory = compressorDecompressorFactory;
         this.hasBloomFilter = hasBloomFilter;
         this.isSecondaryNoIncrementalMaintenance = isSecondaryNoIncrementalMaintenance;
+        this.atomic = atomic;
     }
 
     protected LSMBTreeLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields,
             double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields,
             ICompressorDecompressorFactory compressorDecompressorFactory, boolean hasBloomFilter,
-            boolean isSecondaryNoIncrementalMaintenance) throws HyracksDataException {
+            boolean isSecondaryNoIncrementalMaintenance, boolean atomic) throws HyracksDataException {
         super(registry, json);
         this.bloomFilterKeyFields = bloomFilterKeyFields;
         this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
@@ -98,6 +100,7 @@
         this.compressorDecompressorFactory = compressorDecompressorFactory;
         this.hasBloomFilter = hasBloomFilter;
         this.isSecondaryNoIncrementalMaintenance = isSecondaryNoIncrementalMaintenance;
+        this.atomic = atomic;
     }
 
     @Override
@@ -115,7 +118,7 @@
                 opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx),
                 ioOpCallbackFactory, pageWriteCallbackFactory, isPrimary, filterTypeTraits, filterCmpFactories,
                 btreeFields, filterFields, durable, metadataPageManagerFactory, updateAware, serviceCtx.getTracer(),
-                compressorDecompressorFactory, hasBloomFilter, nullTypeTraits, nullIntrospector);
+                compressorDecompressorFactory, hasBloomFilter, nullTypeTraits, nullIntrospector, atomic);
     }
 
     public boolean isSecondaryNoIncrementalMaintenance() {
@@ -141,8 +144,9 @@
                 .deserializeOrDefault(compressorDecompressorNode, NoOpCompressorDecompressorFactory.class);
         boolean isSecondaryNoIncrementalMaintenance =
                 getOrDefaultBoolean(json, "isSecondaryNoIncrementalMaintenance", false);
+        boolean atomic = getOrDefaultBoolean(json, "atomic", false);
         return new LSMBTreeLocalResource(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary,
-                btreeFields, compDecompFactory, hasBloomFilter, isSecondaryNoIncrementalMaintenance);
+                btreeFields, compDecompFactory, hasBloomFilter, isSecondaryNoIncrementalMaintenance, atomic);
     }
 
     @Override
@@ -156,6 +160,7 @@
         json.putPOJO("btreeFields", btreeFields);
         json.putPOJO("compressorDecompressorFactory", compressorDecompressorFactory.toJson(registry));
         json.put("isSecondaryNoIncrementalMaintenance", isSecondaryNoIncrementalMaintenance);
+        json.put("atomic", atomic);
     }
 
     protected static boolean getOrDefaultHasBloomFilter(JsonNode json, boolean isPrimary) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
index 6695e90..bb44655 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
@@ -49,6 +49,7 @@
     protected final int[] btreeFields;
     protected final ICompressorDecompressorFactory compressorDecompressorFactory;
     protected final boolean isSecondaryNoIncrementalMaintenance;
+    protected final boolean atomic;
 
     public LSMBTreeLocalResourceFactory(IStorageManager storageManager, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] cmpFactories, ITypeTraits[] filterTypeTraits,
@@ -60,8 +61,8 @@
             Map<String, String> mergePolicyProperties, boolean durable, int[] bloomFilterKeyFields,
             double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields,
             ICompressorDecompressorFactory compressorDecompressorFactory, boolean hasBloomFilter,
-            ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector,
-            boolean isSecondaryNoIncrementalMaintenance) {
+            ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector, boolean isSecondaryNoIncrementalMaintenance,
+            boolean atomic) {
         super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
                 opTrackerFactory, ioOpCallbackFactory, pageWriteCallbackFactory, metadataPageManagerFactory,
                 vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, durable, nullTypeTraits,
@@ -73,6 +74,7 @@
         this.btreeFields = btreeFields;
         this.compressorDecompressorFactory = compressorDecompressorFactory;
         this.isSecondaryNoIncrementalMaintenance = isSecondaryNoIncrementalMaintenance;
+        this.atomic = atomic;
     }
 
     @Override
@@ -82,7 +84,7 @@
                 filterTypeTraits, filterCmpFactories, btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory,
                 pageWriteCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, durable,
                 compressorDecompressorFactory, hasBloomFilter, nullTypeTraits, nullIntrospector,
-                isSecondaryNoIncrementalMaintenance);
+                isSecondaryNoIncrementalMaintenance, atomic);
     }
 
     private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index eceefc6..05e78dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -100,11 +100,11 @@
             ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
             ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
             boolean needKeyDupCheck, boolean hasBloomFilter, int[] btreeFields, int[] filterFields, boolean durable,
-            boolean updateAware, ITracer tracer) throws HyracksDataException {
+            boolean updateAware, ITracer tracer, boolean atomic) throws HyracksDataException {
         super(ioManager, virtualBufferCaches, diskBufferCache, fileManager, bloomFilterFalsePositiveRate, mergePolicy,
                 opTracker, ioScheduler, ioOpCallbackFactory, pageWriteCallbackFactory, componentFactory,
                 bulkLoadComponentFactory, filterFrameFactory, filterManager, filterFields, durable, filterHelper,
-                btreeFields, tracer);
+                btreeFields, tracer, atomic);
         this.insertLeafFrameFactory = insertLeafFrameFactory;
         this.deleteLeafFrameFactory = deleteLeafFrameFactory;
         this.cmpFactories = cmpFactories;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
index c3ca3d5..3106b29 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
@@ -72,6 +72,23 @@
             boolean updateAware, ITracer tracer, ICompressorDecompressorFactory compressorDecompressorFactory,
             boolean hasBloomFilter, ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector)
             throws HyracksDataException {
+        return createLSMTree(ioManager, virtualBufferCaches, file, diskBufferCache, typeTraits, cmpFactories,
+                bloomFilterKeyFields, bloomFilterFalsePositiveRate, mergePolicy, opTracker, ioScheduler,
+                ioOpCallbackFactory, pageWriteCallbackFactory, needKeyDupCheck, filterTypeTraits, filterCmpFactories,
+                btreeFields, filterFields, durable, freePageManagerFactory, updateAware, tracer,
+                compressorDecompressorFactory, hasBloomFilter, nullTypeTraits, nullIntrospector, false);
+    }
+
+    public static LSMBTree createLSMTree(IIOManager ioManager, List<IVirtualBufferCache> virtualBufferCaches,
+            FileReference file, IBufferCache diskBufferCache, ITypeTraits[] typeTraits,
+            IBinaryComparatorFactory[] cmpFactories, int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate,
+            ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
+            ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
+            boolean needKeyDupCheck, ITypeTraits[] filterTypeTraits, IBinaryComparatorFactory[] filterCmpFactories,
+            int[] btreeFields, int[] filterFields, boolean durable, IMetadataPageManagerFactory freePageManagerFactory,
+            boolean updateAware, ITracer tracer, ICompressorDecompressorFactory compressorDecompressorFactory,
+            boolean hasBloomFilter, ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector, boolean atomic)
+            throws HyracksDataException {
         LSMBTreeTupleWriterFactory insertTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits,
                 cmpFactories.length, false, updateAware, nullTypeTraits, nullIntrospector);
         LSMBTreeTupleWriterFactory deleteTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits,
@@ -124,6 +141,6 @@
                 deleteLeafFrameFactory, diskBufferCache, fileNameManager, componentFactory, bulkLoadComponentFactory,
                 filterHelper, filterFrameFactory, filterManager, bloomFilterFalsePositiveRate, typeTraits.length,
                 cmpFactories, mergePolicy, opTracker, ioScheduler, ioOpCallbackFactory, pageWriteCallbackFactory,
-                needKeyDupCheck, hasBloomFilter, btreeFields, filterFields, durable, updateAware, tracer);
+                needKeyDupCheck, hasBloomFilter, btreeFields, filterFields, durable, updateAware, tracer, atomic);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 69b9547..4756921 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -43,6 +43,27 @@
  */
 public interface ILSMIndex extends IIndex {
 
+    boolean isAtomic();
+
+    /**
+     * Commits the atomic statement by adding all the temporary disk components generated by an statement to the
+     * list of disk components. Queries after call to this function will be able to read all the changes made by
+     * the insert/upsert/delete statement.
+     *
+     * @throws HyracksDataException
+     */
+    void commit() throws HyracksDataException;
+
+    /**
+     * Aborts the ongoing statement by destroying all temporary disk components generated by the statement and
+     * resetting the memory components
+     *
+     * @throws HyracksDataException
+     */
+    void abort() throws HyracksDataException;
+
+    ILSMMergePolicy getMergePolicy();
+
     void deactivate(boolean flush) throws HyracksDataException;
 
     @Override
@@ -106,6 +127,8 @@
 
     void addDiskComponent(ILSMDiskComponent index) throws HyracksDataException;
 
+    void addBulkLoadedDiskComponent(ILSMDiskComponent c) throws HyracksDataException;
+
     void subsumeMergedComponents(ILSMDiskComponent newComponent, List<ILSMComponent> mergedComponents)
             throws HyracksDataException;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
index c64e1e1..2ec59d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -19,11 +19,14 @@
 package org.apache.hyracks.storage.am.lsm.common.dataflow;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
@@ -31,8 +34,11 @@
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFrameWriter;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IIndexAccessor;
 
 public class LSMIndexInsertUpdateDeleteOperatorNodePushable extends IndexInsertUpdateDeleteOperatorNodePushable
         implements ILSMIndexFrameWriter {
@@ -134,4 +140,12 @@
         }
         appender.write(writer, true);
     }
+
+    protected void setAtomicOpContextIfAtomic(IIndex index, IIndexAccessor accessor) {
+        if (((ILSMIndex) index).isAtomic()) {
+            Map<String, Object> indexAccessorOpContextParameters = new HashMap<>();
+            indexAccessorOpContextParameters.put(HyracksConstants.ATOMIC_OP_CONTEXT, true);
+            ((ILSMIndexAccessor) accessor).getOpContext().setParameters(indexAccessorOpContextParameters);
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 2745b44..b10dd5c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -39,6 +39,7 @@
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.impls.AbstractSearchPredicate;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
@@ -112,6 +113,9 @@
     protected final ILSMDiskComponentFactory bulkLoadComponentFactory;
     protected final ILSMPageWriteCallbackFactory pageWriteCallbackFactory;
     private int numScheduledFlushes = 0;
+    private final boolean atomic;
+    private final List<ILSMDiskComponent> temporaryDiskComponents;
+    private final ILSMMergePolicy mergePolicy;
 
     public AbstractLSMIndex(IIOManager ioManager, List<IVirtualBufferCache> virtualBufferCaches,
             IBufferCache diskBufferCache, ILSMIndexFileManager fileManager, double bloomFilterFalsePositiveRate,
@@ -119,8 +123,8 @@
             ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
             ILSMDiskComponentFactory componentFactory, ILSMDiskComponentFactory bulkLoadComponentFactory,
             ILSMComponentFilterFrameFactory filterFrameFactory, LSMComponentFilterManager filterManager,
-            int[] filterFields, boolean durable, IComponentFilterHelper filterHelper, int[] treeFields, ITracer tracer)
-            throws HyracksDataException {
+            int[] filterFields, boolean durable, IComponentFilterHelper filterHelper, int[] treeFields, ITracer tracer,
+            boolean atomic) throws HyracksDataException {
         this.ioManager = ioManager;
         this.virtualBufferCaches = virtualBufferCaches;
         this.diskBufferCache = diskBufferCache;
@@ -139,6 +143,10 @@
         this.inactiveMemoryComponents = new ArrayList<>();
         this.durable = durable;
         this.tracer = tracer;
+        this.atomic = atomic;
+        this.temporaryDiskComponents = new ArrayList<>();
+        this.mergePolicy = mergePolicy;
+
         fileManager.initLastUsedSeq(ioOpCallback.getLastValidSequence());
         lsmHarness = new LSMHarness(this, ioScheduler, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled(),
                 tracer);
@@ -152,6 +160,25 @@
         }
     }
 
+    public AbstractLSMIndex(IIOManager ioManager, List<IVirtualBufferCache> virtualBufferCaches,
+            IBufferCache diskBufferCache, ILSMIndexFileManager fileManager, double bloomFilterFalsePositiveRate,
+            ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
+            ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
+            ILSMDiskComponentFactory componentFactory, ILSMDiskComponentFactory bulkLoadComponentFactory,
+            ILSMComponentFilterFrameFactory filterFrameFactory, LSMComponentFilterManager filterManager,
+            int[] filterFields, boolean durable, IComponentFilterHelper filterHelper, int[] treeFields, ITracer tracer)
+            throws HyracksDataException {
+        this(ioManager, virtualBufferCaches, diskBufferCache, fileManager, bloomFilterFalsePositiveRate, mergePolicy,
+                opTracker, ioScheduler, ioOpCallbackFactory, pageWriteCallbackFactory, componentFactory,
+                bulkLoadComponentFactory, filterFrameFactory, filterManager, filterFields, durable, filterHelper,
+                treeFields, tracer, false);
+    }
+
+    @Override
+    public boolean isAtomic() {
+        return atomic;
+    }
+
     @Override
     public synchronized void create() throws HyracksDataException {
         if (isActive) {
@@ -223,6 +250,9 @@
         for (ILSMDiskComponent c : diskComponents) {
             c.deactivateAndPurge();
         }
+        for (ILSMDiskComponent c : temporaryDiskComponents) {
+            c.deactivateAndPurge();
+        }
     }
 
     private void deallocateMemoryComponents() throws HyracksDataException {
@@ -247,6 +277,9 @@
         for (ILSMDiskComponent c : diskComponents) {
             c.destroy();
         }
+        for (ILSMDiskComponent c : temporaryDiskComponents) {
+            c.destroy();
+        }
     }
 
     @Override
@@ -265,6 +298,15 @@
         diskComponents.clear();
     }
 
+    @Override
+    public void abort() throws HyracksDataException {
+        resetMemoryComponents();
+        for (ILSMDiskComponent c : temporaryDiskComponents) {
+            c.deactivateAndDestroy();
+        }
+        temporaryDiskComponents.clear();
+    }
+
     private void resetMemoryComponents() throws HyracksDataException {
         if (memoryComponentsAllocated && memoryComponents != null) {
             for (ILSMMemoryComponent c : memoryComponents) {
@@ -299,7 +341,9 @@
                 operationalComponents.addAll(diskComponents);
                 break;
             case SEARCH:
-                if (memoryComponentsAllocated) {
+                // search should include memory components for datasets with atomic statements not enabled or search to
+                // check duplicate key while inserts/upserts on datasets with atomic statements enabled
+                if (memoryComponentsAllocated && (!atomic || isAtomicOpContext(ctx))) {
                     addOperationalMemoryComponents(operationalComponents, false);
                 }
                 if (filterManager != null) {
@@ -315,6 +359,9 @@
                 } else {
                     operationalComponents.addAll(diskComponents);
                 }
+                if (isAtomicOpContext(ctx)) {
+                    operationalComponents.addAll(temporaryDiskComponents);
+                }
 
                 break;
             case REPLICATE:
@@ -328,6 +375,11 @@
         }
     }
 
+    private boolean isAtomicOpContext(ILSMIndexOperationContext ctx) {
+        Map<String, Object> ctxParameters = ctx.getParameters();
+        return ctxParameters != null && (boolean) ctxParameters.getOrDefault(HyracksConstants.ATOMIC_OP_CONTEXT, false);
+    }
+
     @Override
     public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException {
         throw HyracksDataException.create(ErrorCode.DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX);
@@ -554,12 +606,35 @@
     @Override
     public void addDiskComponent(ILSMDiskComponent c) throws HyracksDataException {
         if (c != EmptyComponent.INSTANCE) {
+            if (atomic) {
+                temporaryDiskComponents.add(c);
+                LOGGER.debug("Adding new temporary disk component to index {}; current count: {}", c,
+                        temporaryDiskComponents.size());
+            } else {
+                diskComponents.add(0, c);
+            }
+        }
+        validateComponentIds();
+    }
+
+    @Override
+    public void addBulkLoadedDiskComponent(ILSMDiskComponent c) throws HyracksDataException {
+        if (c != EmptyComponent.INSTANCE) {
             diskComponents.add(0, c);
         }
         validateComponentIds();
     }
 
     @Override
+    public void commit() throws HyracksDataException {
+        for (ILSMDiskComponent c : temporaryDiskComponents) {
+            diskComponents.add(0, c);
+        }
+        temporaryDiskComponents.clear();
+        validateComponentIds();
+    }
+
+    @Override
     public void subsumeMergedComponents(ILSMDiskComponent newComponent, List<ILSMComponent> mergedComponents)
             throws HyracksDataException {
         int swapIndex = diskComponents.indexOf(mergedComponents.get(0));
@@ -879,4 +954,9 @@
         return pageWriteCallbackFactory;
     }
 
+    @Override
+    public ILSMMergePolicy getMergePolicy() {
+        return mergePolicy;
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index e2c9fab..9fcce8b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -190,7 +190,9 @@
                     if (opType == LSMOperationType.FLUSH) {
                         opTracker.notifyAll();
                         if (!failedOperation) {
-                            waitForLaggingMerge();
+                            if (!lsmIndex.isAtomic()) {
+                                waitForLaggingMerge();
+                            }
                         }
                     } else if (opType == LSMOperationType.MERGE) {
                         opTracker.notifyAll();
@@ -299,7 +301,9 @@
                         componentsToBeReplicated.add(newComponent);
                         triggerReplication(componentsToBeReplicated, opType);
                     }
-                    mergePolicy.diskComponentAdded(lsmIndex, false);
+                    if (!lsmIndex.isAtomic()) {
+                        mergePolicy.diskComponentAdded(lsmIndex, false);
+                    }
                 }
                 break;
             case MERGE:
@@ -639,7 +643,7 @@
             throw HyracksDataException.create(ioOperation.getFailure());
         }
         synchronized (opTracker) {
-            lsmIndex.addDiskComponent(c);
+            lsmIndex.addBulkLoadedDiskComponent(c);
             if (replicationEnabled) {
                 componentsToBeReplicated.clear();
                 componentsToBeReplicated.add(c);
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
index 2d0f079..94d3447 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
@@ -95,7 +95,7 @@
                 diskBufferCache, fileManager, componentFactory, bulkLoadComponentFactory, filterHelper,
                 filterFrameFactory, filterManager, bloomFilterFalsePositiveRate, fieldCount, cmpFactories, mergePolicy,
                 opTracker, ioScheduler, ioOperationCallbackFactory, pageWriteCallbackFactory, needKeyDupCheck,
-                hasBloomFilter, btreeFields, filterFields, durable, updateAware, tracer);
+                hasBloomFilter, btreeFields, filterFields, durable, updateAware, tracer, false);
         addModifyCallback(AllowTestOpCallback.INSTANCE);
         addSearchCallback(AllowTestOpCallback.INSTANCE);
         addFlushCallback(AllowTestOpCallback.INSTANCE);
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
index 039b244..04fa11f 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
@@ -63,14 +63,14 @@
                 btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory, pageWriteCallbackFactory,
                 metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, durable,
                 NoOpCompressorDecompressorFactory.INSTANCE, hasBloomFilter, null, null,
-                isSecondaryNoIncrementalMaintenance);
+                isSecondaryNoIncrementalMaintenance, false);
     }
 
     protected TestLsmBtreeLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields,
             double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields, boolean hasBloomFilter,
             boolean isSecondaryNoIncrementalMaintenance) throws HyracksDataException {
         super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields,
-                NoOpCompressorDecompressorFactory.INSTANCE, hasBloomFilter, isSecondaryNoIncrementalMaintenance);
+                NoOpCompressorDecompressorFactory.INSTANCE, hasBloomFilter, isSecondaryNoIncrementalMaintenance, false);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
index 48adf91..54b786b 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
@@ -53,7 +53,7 @@
                 vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, durable,
                 bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields,
                 NoOpCompressorDecompressorFactory.INSTANCE, hasBloomFilter, null, null,
-                isSecondaryNoIncrementalMaintenance);
+                isSecondaryNoIncrementalMaintenance, false);
     }
 
     @Override
