[NO ISSUE][MTD][TX] Implement atomic metadata transactions without WAL

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- With this change, metadata transactions without WAL are made atomic on
cloud deployments.
- Some refactoring related to global transaction manager.

Change-Id: I282e0e4ca8a9bff68fa88613b9d34b14bc2b764c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17657
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index d69376a..f6456dc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -104,8 +104,7 @@
     }
 
     @Override
-    public void handleJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
-            Map<String, ILSMComponentId> componentIdMap) {
+    public void handleJobPreparedMessage(JobId jobId, String nodeId, Map<String, ILSMComponentId> componentIdMap) {
         IGlobalTransactionContext context = txnContextRepository.get(jobId);
         if (context == null) {
             LOGGER.warn("JobPreparedMessage received for jobId " + jobId
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index f6eb123..5494250 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -61,6 +61,7 @@
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
@@ -157,6 +158,9 @@
     @Override
     public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException {
         state = SystemState.RECOVERING;
+        if (appCtx.isCloudDeployment()) {
+            doMetadataRecovery();
+        }
         LOGGER.info("starting recovery for partitions {}", partitions);
         long readableSmallestLSN = logMgr.getReadableSmallestLSN();
         Checkpoint checkpointObject = checkpointManager.getLatest();
@@ -171,6 +175,11 @@
         replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN, true);
     }
 
+    public synchronized void doMetadataRecovery() {
+        LOGGER.info("starting recovery for metadata partition {}", StorageConstants.METADATA_PARTITION);
+        appCtx.getTransactionSubsystem().getTransactionManager().rollbackMetadataTransactionsWithoutWAL();
+    }
+
     public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN,
             boolean closeOnFlushRedo) throws IOException, ACIDException {
         try {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index fb08fe3..cea3832 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -175,7 +175,7 @@
         LogRecord logRecord = new LogRecord();
         final long txnId = 1;
         logRecord.setTxnCtx(TransactionContextFactory.create(new TxnId(txnId),
-                new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)));
+                new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL), ncAppCtx));
         logRecord.setLogSource(LogSource.LOCAL);
         logRecord.setLogType(LogType.WAIT);
         logRecord.setTxnId(txnId);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
index 498d174..956ae9e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
@@ -46,8 +46,7 @@
 
     IGlobalTransactionContext getTransactionContext(JobId jobId) throws ACIDException;
 
-    void handleJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
-            Map<String, ILSMComponentId> componentIdMap);
+    void handleJobPreparedMessage(JobId jobId, String nodeId, Map<String, ILSMComponentId> componentIdMap);
 
     void handleJobCompletionMessage(JobId jobId, String nodeId);
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index 64748ce..1a23ca8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -249,7 +249,6 @@
     private void commitAtomicInsertDelete() throws HyracksDataException {
         if (isPrimary) {
             final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
-            int datasetID = -1;
             boolean atomic = false;
             for (IIndex index : indexes) {
                 if (((ILSMIndex) index).isAtomic()) {
@@ -259,14 +258,13 @@
                     for (Map.Entry<String, FlushOperation> entry : opTracker.getLastFlushOperation().entrySet()) {
                         componentIdMap.put(entry.getKey(), entry.getValue().getFlushingComponent().getId());
                     }
-                    datasetID = opTracker.getDatasetInfo().getDatasetID();
                     atomic = true;
                 }
             }
 
             if (atomic) {
                 AtomicJobPreparedMessage message = new AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
-                        ctx.getJobletContext().getServiceContext().getNodeId(), datasetID, componentIdMap);
+                        ctx.getJobletContext().getServiceContext().getNodeId(), componentIdMap);
                 try {
                     ((NodeControllerService) ctx.getJobletContext().getServiceContext().getControllerService())
                             .sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
index 8adbf49..b4832ff 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
@@ -38,20 +38,17 @@
     private static final long serialVersionUID = 1L;
     private final JobId jobId;
     private final String nodeId;
-    private final int datasetId;
     private final Map<String, ILSMComponentId> componentIdMap;
 
-    public AtomicJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
-            Map<String, ILSMComponentId> componentIdMap) {
+    public AtomicJobPreparedMessage(JobId jobId, String nodeId, Map<String, ILSMComponentId> componentIdMap) {
         this.nodeId = nodeId;
-        this.datasetId = datasetId;
         this.componentIdMap = componentIdMap;
         this.jobId = jobId;
     }
 
     @Override
     public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
-        appCtx.getGlobalTxManager().handleJobPreparedMessage(jobId, nodeId, datasetId, componentIdMap);
+        appCtx.getGlobalTxManager().handleJobPreparedMessage(jobId, nodeId, componentIdMap);
     }
 
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
index 30c693d..66b5359 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
@@ -112,4 +112,9 @@
      */
     void ensureMaxTxnId(long txnId);
 
+    /**
+     * Rollback incomplete metadata transactions without WAL during recovery.
+     */
+    void rollbackMetadataTransactionsWithoutWAL();
+
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 1321c96..ed9c48e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -30,6 +30,7 @@
  */
 public class StorageConstants {
 
+    public static final String METADATA_TXN_NOWAL_DIR_NAME = "mtd-txn-logs";
     public static final String STORAGE_ROOT_DIR_NAME = "storage";
     public static final String INGESTION_LOGS_DIR_NAME = "ingestion_logs";
     public static final String PARTITION_DIR_PREFIX = "partition_";
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 522fd32..b40a4eb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -128,6 +128,7 @@
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -1742,6 +1743,12 @@
         return sb.toString();
     }
 
+    private void setAtomicOpContext(IIndexAccessor accessor) {
+        Map<String, Object> indexAccessorOpContextParameters = new HashMap<>();
+        indexAccessorOpContextParameters.put(HyracksConstants.ATOMIC_OP_CONTEXT, true);
+        ((ILSMIndexAccessor) accessor).getOpContext().setParameters(indexAccessorOpContextParameters);
+    }
+
     private <T> void searchIndex(TxnId txnId, IMetadataIndex index, ITupleReference searchKey,
             IValueExtractor<T> valueExtractor, List<T> results) throws AlgebricksException, HyracksDataException {
         IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
@@ -1753,6 +1760,9 @@
         IIndex indexInstance = datasetLifecycleManager.get(resourceName);
         datasetLifecycleManager.open(resourceName);
         IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+        if (atomicNoWAL) {
+            setAtomicOpContext(indexAccessor);
+        }
         try {
             IBinaryComparator[] searchCmps = null;
             MultiComparator searchCmp = null;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index c1eba7c..be935d0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -433,7 +433,7 @@
                             StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES, true, bloomFilterKeyFields,
                             bloomFilterFalsePositiveRate, true, null, NoOpCompressorDecompressorFactory.INSTANCE, true,
                             TypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ANULL), NullIntrospector.INSTANCE,
-                            false, false);
+                            false, appContext.isCloudDeployment());
             DatasetLocalResourceFactory dsLocalResourceFactory =
                     new DatasetLocalResourceFactory(datasetId, lsmBtreeFactory);
             // TODO(amoudi) Creating the index should be done through the same code path as
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index ec016bc..2c89b61 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -352,7 +352,6 @@
 
     private void commitAtomicInsert() throws HyracksDataException {
         final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
-        int datasetID = -1;
         boolean atomic = false;
         for (IIndex index : indexes) {
             if (((ILSMIndex) index).isAtomic()) {
@@ -362,14 +361,13 @@
                 for (Map.Entry<String, FlushOperation> entry : opTracker.getLastFlushOperation().entrySet()) {
                     componentIdMap.put(entry.getKey(), entry.getValue().getFlushingComponent().getId());
                 }
-                datasetID = opTracker.getDatasetInfo().getDatasetID();
                 atomic = true;
             }
         }
 
         if (atomic) {
             AtomicJobPreparedMessage message = new AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
-                    ctx.getJobletContext().getServiceContext().getNodeId(), datasetID, componentIdMap);
+                    ctx.getJobletContext().getServiceContext().getNodeId(), componentIdMap);
             try {
                 ((NodeControllerService) ctx.getJobletContext().getServiceContext().getControllerService())
                         .sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index de95d60..6b9d3eb 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -570,7 +570,6 @@
     // TODO: Refactor and remove duplicated code
     private void commitAtomicUpsert() throws HyracksDataException {
         final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
-        int datasetID = -1;
         boolean atomic = false;
         for (IIndex index : indexes) {
             if (((ILSMIndex) index).isAtomic()) {
@@ -580,14 +579,13 @@
                 for (Map.Entry<String, FlushOperation> entry : opTracker.getLastFlushOperation().entrySet()) {
                     componentIdMap.put(entry.getKey(), entry.getValue().getFlushingComponent().getId());
                 }
-                datasetID = opTracker.getDatasetInfo().getDatasetID();
                 atomic = true;
             }
         }
 
         if (atomic) {
             AtomicJobPreparedMessage message = new AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
-                    ctx.getJobletContext().getServiceContext().getNodeId(), datasetID, componentIdMap);
+                    ctx.getJobletContext().getServiceContext().getNodeId(), componentIdMap);
             try {
                 ((NodeControllerService) ctx.getJobletContext().getServiceContext().getControllerService())
                         .sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
index 3576ba1..aa698be 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
@@ -18,25 +18,46 @@
  */
 package org.apache.asterix.transaction.management.service.transaction;
 
+import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.IndexInfo;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.IndexCheckpoint;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 @ThreadSafe
 public class AtomicNoWALTransactionContext extends AtomicTransactionContext {
 
-    public AtomicNoWALTransactionContext(TxnId txnId) {
+    private final INcApplicationContext appCtx;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    public AtomicNoWALTransactionContext(TxnId txnId, INcApplicationContext appCtx) {
         super(txnId);
+        this.appCtx = appCtx;
     }
 
     @Override
@@ -59,7 +80,7 @@
         for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
             PrimaryIndexOperationTracker primaryIndexOpTracker = (PrimaryIndexOperationTracker) opTrackerRef;
             try {
-                primaryIndexOpTracker.deleteMemoryComponent(true);
+                primaryIndexOpTracker.abort();
             } catch (HyracksDataException e) {
                 throw new ACIDException(e);
             }
@@ -68,19 +89,102 @@
 
     private void ensureDurable() {
         List<FlushOperation> flushes = new ArrayList<>();
+        List<Integer> datasetIds = new ArrayList<>();
+        Map<String, ILSMComponentId> resourceMap = new HashMap<>();
         LogRecord dummyLogRecord = new LogRecord();
         try {
             for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
                 PrimaryIndexOperationTracker primaryIndexOpTracker = (PrimaryIndexOperationTracker) opTrackerRef;
                 primaryIndexOpTracker.triggerScheduleFlush(dummyLogRecord);
                 flushes.addAll(primaryIndexOpTracker.getScheduledFlushes());
+                datasetIds.add(primaryIndexOpTracker.getDatasetInfo().getDatasetID());
+                for (Map.Entry<String, FlushOperation> entry : primaryIndexOpTracker.getLastFlushOperation()
+                        .entrySet()) {
+                    resourceMap.put(entry.getKey(), entry.getValue().getFlushingComponent().getId());
+                }
             }
             LSMIndexUtil.waitFor(flushes);
+            persistLogFile(datasetIds, resourceMap);
+        } catch (Exception e) {
+            deleteUncommittedRecords();
+            throw new ACIDException(e);
+        }
+        try {
+            commit();
+        } catch (Exception e) {
+            rollback(resourceMap);
+            throw new ACIDException(e);
+        } finally {
+            deleteLogFile();
+        }
+        enableMerge();
+    }
+
+    private void persistLogFile(List<Integer> datasetIds, Map<String, ILSMComponentId> resourceMap)
+            throws HyracksDataException, JsonProcessingException {
+        IIOManager ioManager = appCtx.getIoManager();
+        FileReference fref = ioManager.resolve(Paths.get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
+                StorageConstants.PARTITION_DIR_PREFIX + StorageConstants.METADATA_PARTITION,
+                String.format("%s.log", txnId)).toString());
+        MetadataAtomicTransactionLog txnLog = new MetadataAtomicTransactionLog(txnId, datasetIds,
+                appCtx.getServiceContext().getNodeId(), resourceMap);
+        ioManager.overwrite(fref, OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(txnLog).getBytes());
+    }
+
+    public void deleteLogFile() {
+        IIOManager ioManager = appCtx.getIoManager();
+        try {
+            FileReference fref = ioManager.resolve(Paths.get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
+                    StorageConstants.PARTITION_DIR_PREFIX + StorageConstants.METADATA_PARTITION,
+                    String.format("%s.log", txnId)).toString());
+            ioManager.delete(fref);
         } catch (HyracksDataException e) {
             throw new ACIDException(e);
         }
     }
 
+    private void commit() throws HyracksDataException {
+        for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
+            PrimaryIndexOperationTracker primaryIndexOpTracker = (PrimaryIndexOperationTracker) opTrackerRef;
+            primaryIndexOpTracker.commit();
+        }
+    }
+
+    private void enableMerge() {
+        for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
+            PrimaryIndexOperationTracker primaryIndexOpTracker = (PrimaryIndexOperationTracker) opTrackerRef;
+            for (IndexInfo indexInfo : primaryIndexOpTracker.getDatasetInfo().getIndexes().values()) {
+                if (indexInfo.getIndex().isPrimaryIndex()) {
+                    try {
+                        indexInfo.getIndex().getMergePolicy().diskComponentAdded(indexInfo.getIndex(), false);
+                    } catch (HyracksDataException e) {
+                        throw new ACIDException(e);
+                    }
+                }
+            }
+        }
+    }
+
+    public void rollback(Map<String, ILSMComponentId> resourceMap) {
+        deleteUncommittedRecords();
+        IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
+        IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
+                datasetLifecycleManager.getIndexCheckpointManagerProvider();
+        resourceMap.forEach((k, v) -> {
+            try {
+                IIndexCheckpointManager checkpointManager = indexCheckpointManagerProvider.get(ResourceReference.of(k));
+                if (checkpointManager.getCheckpointCount() > 0) {
+                    IndexCheckpoint checkpoint = checkpointManager.getLatest();
+                    if (checkpoint.getLastComponentId() == v.getMaxId()) {
+                        checkpointManager.deleteLatest(v.getMaxId(), 1);
+                    }
+                }
+            } catch (HyracksDataException e) {
+                throw new ACIDException(e);
+            }
+        });
+    }
+
     @Override
     public boolean hasWAL() {
         return false;
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java
new file mode 100644
index 0000000..7b3af3f
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class MetadataAtomicTransactionLog {
+
+    private TxnId txnId;
+    private List<Integer> datasetIds;
+    private String nodeId;
+    private Map<String, ILSMComponentId> resourceMap;
+
+    @JsonCreator
+    public MetadataAtomicTransactionLog(@JsonProperty("txnId") TxnId txnId,
+            @JsonProperty("datasetIds") List<Integer> datasetIds, @JsonProperty("nodeId") String nodeId,
+            @JsonProperty("resourceMap") Map<String, ILSMComponentId> resourceMap) {
+        this.txnId = txnId;
+        this.datasetIds = datasetIds;
+        this.nodeId = nodeId;
+        this.resourceMap = resourceMap;
+    }
+
+    public TxnId getTxnId() {
+        return txnId;
+    }
+
+    public List<Integer> getDatasetIds() {
+        return datasetIds;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public Map<String, ILSMComponentId> getResourceMap() {
+        return resourceMap;
+    }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
index 1076086..e1e2ca8 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
@@ -20,6 +20,7 @@
 
 import static org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
 
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
@@ -29,13 +30,13 @@
     private TransactionContextFactory() {
     }
 
-    public static ITransactionContext create(TxnId txnId, TransactionOptions options) {
+    public static ITransactionContext create(TxnId txnId, TransactionOptions options, INcApplicationContext appCtx) {
         final AtomicityLevel atomicityLevel = options.getAtomicityLevel();
         switch (atomicityLevel) {
             case ATOMIC:
                 return new AtomicTransactionContext(txnId);
             case ATOMIC_NO_WAL:
-                return new AtomicNoWALTransactionContext(txnId);
+                return new AtomicNoWALTransactionContext(txnId, appCtx);
             case ENTITY_LEVEL:
                 return new EntityLevelTransactionContext(txnId);
             default:
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index 8bbfbfd..ea7fcb8 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -22,6 +22,7 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.file.Paths;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -34,14 +35,19 @@
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 @ThreadSafe
 public class TransactionManager implements ITransactionManager, ILifeCycleComponent {
 
@@ -61,7 +67,7 @@
         if (txnCtx != null) {
             throw new ACIDException("Transaction with the same (" + txnId + ") already exists");
         }
-        txnCtx = TransactionContextFactory.create(txnId, options);
+        txnCtx = TransactionContextFactory.create(txnId, options, txnSubsystem.getApplicationContext());
         txnCtxRepository.put(txnId, txnCtx);
         ensureMaxTxnId(txnId.getId());
         return txnCtx;
@@ -187,4 +193,27 @@
             LOGGER.log(Level.WARN, "exception while dumping state", e);
         }
     }
+
+    @Override
+    public void rollbackMetadataTransactionsWithoutWAL() {
+        IIOManager ioManager = txnSubsystem.getApplicationContext().getIoManager();
+        try {
+            Set<FileReference> txnLogFileRefs =
+                    ioManager.list(ioManager.resolve(Paths
+                            .get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
+                                    StorageConstants.PARTITION_DIR_PREFIX + StorageConstants.METADATA_PARTITION)
+                            .toString()));
+            ObjectMapper objectMapper = new ObjectMapper();
+            for (FileReference txnLogFileRef : txnLogFileRefs) {
+                MetadataAtomicTransactionLog atomicTransactionLog = objectMapper.readValue(
+                        new String(ioManager.readAllBytes(txnLogFileRef)), MetadataAtomicTransactionLog.class);
+                AtomicNoWALTransactionContext context = new AtomicNoWALTransactionContext(
+                        atomicTransactionLog.getTxnId(), txnSubsystem.getApplicationContext());
+                context.rollback(atomicTransactionLog.getResourceMap());
+                context.deleteLogFile();
+            }
+        } catch (Exception e) {
+            throw new ACIDException(e);
+        }
+    }
 }