diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
index 0f31935..c6c71f6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
@@ -24,6 +24,7 @@
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.ILogMarkerCallback;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.JobId;
@@ -32,12 +33,16 @@
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
 
 public class CommitRuntime implements IPushRuntime {
@@ -52,17 +57,18 @@
     protected final boolean isTemporaryDatasetWriteJob;
     protected final boolean isWriteTransaction;
     protected final long[] longHashes;
-    protected final LogRecord logRecord;
     protected final FrameTupleReference frameTupleReference;
-
-    protected ITransactionContext transactionContext;
-    protected FrameTupleAccessor frameTupleAccessor;
+    protected final IHyracksTaskContext ctx;
     protected final int resourcePartition;
+    protected ITransactionContext transactionContext;
+    protected LogRecord logRecord;
+    protected FrameTupleAccessor frameTupleAccessor;
 
     public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
             boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition) {
-        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
+        this.ctx = ctx;
+        IAsterixAppRuntimeContext runtimeCtx =
+                (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
         this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
         this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
         this.jobId = jobId;
@@ -73,7 +79,6 @@
         this.isWriteTransaction = isWriteTransaction;
         this.resourcePartition = resourcePartition;
         longHashes = new long[2];
-        logRecord = new LogRecord();
     }
 
     @Override
@@ -81,6 +86,9 @@
         try {
             transactionContext = transactionManager.getTransactionContext(jobId, false);
             transactionContext.setWriteTxn(isWriteTransaction);
+            ILogMarkerCallback callback =
+                    TaskUtils.<ILogMarkerCallback> get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
+            logRecord = new LogRecord(callback);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
@@ -113,6 +121,23 @@
                 }
             }
         }
+        VSizeFrame message = TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
+        if (message != null
+                && MessagingFrameTupleAppender.getMessageType(message) == MessagingFrameTupleAppender.MARKER_MESSAGE) {
+            try {
+                formMarkerLogRecords(message.getBuffer());
+                logMgr.log(logRecord);
+            } catch (ACIDException e) {
+                throw new HyracksDataException(e);
+            }
+            message.reset();
+            message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+            message.getBuffer().flip();
+        }
+    }
+
+    private void formMarkerLogRecords(ByteBuffer marker) {
+        TransactionUtil.formMarkerLogRecord(logRecord, transactionContext, datasetId, resourcePartition, marker);
     }
 
     protected void formLogRecord(ByteBuffer buffer, int t) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
new file mode 100644
index 0000000..5c3aefe
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.bootstrap;
+
+import java.io.File;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.asterix.algebra.operators.physical.CommitRuntime;
+import org.apache.asterix.api.common.AsterixAppRuntimeContext;
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.app.external.TestLibrarian;
+import org.apache.asterix.common.api.ILocalResourceMetadata;
+import org.apache.asterix.common.config.AsterixTransactionProperties;
+import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.TransactionSubsystemProvider;
+import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
+import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
+import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import org.apache.asterix.transaction.management.service.logging.LogReader;
+import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
+import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelper;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import org.apache.hyracks.storage.common.file.LocalResource;
+import org.apache.hyracks.test.support.TestUtils;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestNodeController {
+    protected static final Logger LOGGER = Logger.getLogger(TestNodeController.class.getName());
+
+    protected static final String PATH_ACTUAL = "unittest" + File.separator;
+    protected static final String PATH_BASE =
+            StringUtils.join(new String[] { "src", "test", "resources", "nodetests" }, File.separator);
+
+    protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+    protected static AsterixTransactionProperties txnProperties;
+    private static final boolean cleanupOnStart = true;
+    private static final boolean cleanupOnStop = true;
+
+    // Constants
+    public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
+    public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
+    public static final int KB32 = 32768;
+    public static final int PARTITION = 0;
+    public static final double BLOOM_FILTER_FALSE_POSITIVE_RATE = 0.01;
+    public static final TransactionSubsystemProvider TXN_SUBSYSTEM_PROVIDER = new TransactionSubsystemProvider();
+    // Mutables
+    private JobId jobId;
+    private long jobCounter = 0L;
+    private IHyracksJobletContext jobletCtx;
+
+    public TestNodeController() throws AsterixException, HyracksException, ACIDException {
+    }
+
+    public void init() throws Exception {
+        try {
+            File outdir = new File(PATH_ACTUAL);
+            outdir.mkdirs();
+            // remove library directory
+            TestLibrarian.removeLibraryDir();
+            ExecutionTestUtil.setUp(cleanupOnStart);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
+        jobletCtx = Mockito.mock(IHyracksJobletContext.class);
+        Mockito.when(jobletCtx.getApplicationContext())
+                .thenReturn(AsterixHyracksIntegrationUtil.ncs[0].getApplicationContext());
+        Mockito.when(jobletCtx.getJobId()).thenAnswer(new Answer<JobId>() {
+            @Override
+            public JobId answer(InvocationOnMock invocation) throws Throwable {
+                return jobId;
+            }
+        });
+    }
+
+    public void deInit() throws Exception {
+        TestLibrarian.removeLibraryDir();
+        ExecutionTestUtil.tearDown(cleanupOnStop);
+    }
+
+    public org.apache.asterix.common.transactions.JobId getTxnJobId() {
+        return new org.apache.asterix.common.transactions.JobId((int) jobId.getId());
+    }
+
+    public AsterixLSMInsertDeleteOperatorNodePushable getInsertPipeline(IHyracksTaskContext ctx, Dataset dataset,
+            IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields)
+            throws AlgebricksException {
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
+                mergePolicyFactory, mergePolicyProperties, filterFields);
+        IndexOperation op = IndexOperation.INSERT;
+        IModificationOperationCallbackFactory modOpCallbackFactory =
+                new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(), dataset.getDatasetId(),
+                        primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, op, ResourceType.LSM_BTREE, true);
+        AsterixLSMTreeInsertDeleteOperatorDescriptor indexOpDesc =
+                getInsertOpratorDesc(primaryIndexInfo, modOpCallbackFactory);
+        LSMBTreeDataflowHelperFactory dataflowHelperFactory =
+                getPrimaryIndexDataflowHelperFactory(ctx, primaryIndexInfo);
+        Mockito.when(indexOpDesc.getIndexDataflowHelperFactory()).thenReturn(dataflowHelperFactory);
+        IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
+        AsterixLSMInsertDeleteOperatorNodePushable insertOp =
+                new AsterixLSMInsertDeleteOperatorNodePushable(indexOpDesc, ctx, PARTITION,
+                        primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDescProvider, op, true);
+        CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(), dataset.getDatasetId(),
+                primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION);
+        insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
+        commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
+        return insertOp;
+    }
+
+    public IPushRuntime getFullScanPipeline(IFrameWriter countOp, IHyracksTaskContext ctx, Dataset dataset,
+            IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
+            NoMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields)
+            throws HyracksDataException, AlgebricksException {
+        IPushRuntime emptyTupleOp = new EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx);
+        JobSpecification spec = new JobSpecification();
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
+                mergePolicyFactory, mergePolicyProperties, filterFields);
+        LSMBTreeDataflowHelperFactory indexDataflowHelperFactory =
+                getPrimaryIndexDataflowHelperFactory(ctx, primaryIndexInfo);
+        BTreeSearchOperatorDescriptor searchOpDesc = new BTreeSearchOperatorDescriptor(spec, primaryIndexInfo.rDesc,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                primaryIndexInfo.fileSplitProvider, primaryIndexInfo.primaryIndexTypeTraits,
+                primaryIndexInfo.primaryIndexComparatorFactories, primaryIndexInfo.primaryIndexBloomFilterKeyFields,
+                primaryIndexInfo.primaryKeyIndexes, primaryIndexInfo.primaryKeyIndexes, true, true,
+                indexDataflowHelperFactory, false, false, null, NoOpOperationCallbackFactory.INSTANCE, filterFields,
+                filterFields);
+        BTreeSearchOperatorNodePushable searchOp = new BTreeSearchOperatorNodePushable(searchOpDesc, ctx, 0,
+                primaryIndexInfo.getSearchRecordDescriptorProvider(), /*primaryIndexInfo.primaryKeyIndexes*/null,
+                /*primaryIndexInfo.primaryKeyIndexes*/null, true, true, filterFields, filterFields);
+        emptyTupleOp.setFrameWriter(0, searchOp,
+                primaryIndexInfo.getSearchRecordDescriptorProvider().getInputRecordDescriptor(null, 0));
+        searchOp.setOutputFrameWriter(0, countOp, primaryIndexInfo.rDesc);
+        return emptyTupleOp;
+    }
+
+    public LogReader getTransactionLogReader(boolean isRecoveryMode) {
+        return (LogReader) getTransactionSubsystem().getLogManager().getLogReader(isRecoveryMode);
+    }
+
+    public JobId newJobId() {
+        jobId = new JobId(jobCounter++);
+        return jobId;
+    }
+
+    public AsterixLSMTreeInsertDeleteOperatorDescriptor getInsertOpratorDesc(PrimaryIndexInfo primaryIndexInfo,
+            IModificationOperationCallbackFactory modOpCallbackFactory) {
+        AsterixLSMTreeInsertDeleteOperatorDescriptor indexOpDesc =
+                Mockito.mock(AsterixLSMTreeInsertDeleteOperatorDescriptor.class);
+        Mockito.when(indexOpDesc.getLifecycleManagerProvider())
+                .thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+        Mockito.when(indexOpDesc.getStorageManager()).thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+        Mockito.when(indexOpDesc.getFileSplitProvider()).thenReturn(primaryIndexInfo.fileSplitProvider);
+        Mockito.when(indexOpDesc.getLocalResourceFactoryProvider())
+                .thenReturn(primaryIndexInfo.localResourceFactoryProvider);
+        Mockito.when(indexOpDesc.getTreeIndexTypeTraits()).thenReturn(primaryIndexInfo.primaryIndexTypeTraits);
+        Mockito.when(indexOpDesc.getTreeIndexComparatorFactories())
+                .thenReturn(primaryIndexInfo.primaryIndexComparatorFactories);
+        Mockito.when(indexOpDesc.getTreeIndexBloomFilterKeyFields())
+                .thenReturn(primaryIndexInfo.primaryIndexBloomFilterKeyFields);
+        Mockito.when(indexOpDesc.getModificationOpCallbackFactory()).thenReturn(modOpCallbackFactory);
+        return indexOpDesc;
+    }
+
+    public TreeIndexCreateOperatorDescriptor getIndexCreateOpDesc(PrimaryIndexInfo primaryIndexInfo) {
+        TreeIndexCreateOperatorDescriptor indexOpDesc = Mockito.mock(TreeIndexCreateOperatorDescriptor.class);
+        Mockito.when(indexOpDesc.getLifecycleManagerProvider())
+                .thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+        Mockito.when(indexOpDesc.getStorageManager()).thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+        Mockito.when(indexOpDesc.getFileSplitProvider()).thenReturn(primaryIndexInfo.fileSplitProvider);
+        Mockito.when(indexOpDesc.getLocalResourceFactoryProvider())
+                .thenReturn(primaryIndexInfo.localResourceFactoryProvider);
+        Mockito.when(indexOpDesc.getTreeIndexTypeTraits()).thenReturn(primaryIndexInfo.primaryIndexTypeTraits);
+        Mockito.when(indexOpDesc.getTreeIndexComparatorFactories())
+                .thenReturn(primaryIndexInfo.primaryIndexComparatorFactories);
+        Mockito.when(indexOpDesc.getTreeIndexBloomFilterKeyFields())
+                .thenReturn(primaryIndexInfo.primaryIndexBloomFilterKeyFields);
+        return indexOpDesc;
+    }
+
+    public ConstantFileSplitProvider getFileSplitProvider(Dataset dataset) {
+        FileSplit fileSplit = new FileSplit(AsterixHyracksIntegrationUtil.ncs[0].getId(),
+                dataset.getDataverseName() + File.separator + dataset.getDatasetName());
+        return new ConstantFileSplitProvider(new FileSplit[] { fileSplit });
+    }
+
+    public ILocalResourceFactoryProvider getPrimaryIndexLocalResourceMetadataProvider(Dataset dataset,
+            ITypeTraits[] primaryIndexTypeTraits, IBinaryComparatorFactory[] primaryIndexComparatorFactories,
+            int[] primaryIndexBloomFilterKeyFields, ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits,
+            IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields) {
+        ILocalResourceMetadata localResourceMetadata =
+                new LSMBTreeLocalResourceMetadata(primaryIndexTypeTraits, primaryIndexComparatorFactories,
+                        primaryIndexBloomFilterKeyFields, true, dataset.getDatasetId(), mergePolicyFactory,
+                        mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, filterFields);
+        ILocalResourceFactoryProvider localResourceFactoryProvider =
+                new PersistentLocalResourceFactoryProvider(localResourceMetadata, LocalResource.LSMBTreeResource);
+        return localResourceFactoryProvider;
+    }
+
+    public LSMBTreeDataflowHelper getPrimaryIndexDataflowHelper(IHyracksTaskContext ctx,
+            PrimaryIndexInfo primaryIndexInfo, TreeIndexCreateOperatorDescriptor indexOpDesc)
+            throws AlgebricksException {
+        LSMBTreeDataflowHelperFactory dataflowHelperFactory = new LSMBTreeDataflowHelperFactory(
+                new AsterixVirtualBufferCacheProvider(primaryIndexInfo.dataset.getDatasetId()),
+                primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties,
+                new PrimaryIndexOperationTrackerProvider(primaryIndexInfo.dataset.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                BLOOM_FILTER_FALSE_POSITIVE_RATE, true, primaryIndexInfo.filterTypeTraits,
+                primaryIndexInfo.filterCmpFactories, primaryIndexInfo.btreeFields, primaryIndexInfo.filterFields, true);
+        IndexDataflowHelper dataflowHelper =
+                dataflowHelperFactory.createIndexDataflowHelper(indexOpDesc, ctx, PARTITION);
+        return (LSMBTreeDataflowHelper) dataflowHelper;
+    }
+
+    public LSMBTreeDataflowHelperFactory getPrimaryIndexDataflowHelperFactory(IHyracksTaskContext ctx,
+            PrimaryIndexInfo primaryIndexInfo) throws AlgebricksException {
+        return new LSMBTreeDataflowHelperFactory(
+                new AsterixVirtualBufferCacheProvider(primaryIndexInfo.dataset.getDatasetId()),
+                primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties,
+                new PrimaryIndexOperationTrackerProvider(primaryIndexInfo.dataset.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                BLOOM_FILTER_FALSE_POSITIVE_RATE, true, primaryIndexInfo.filterTypeTraits,
+                primaryIndexInfo.filterCmpFactories, primaryIndexInfo.btreeFields, primaryIndexInfo.filterFields, true);
+    }
+
+    public LSMBTreeDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes,
+            ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, int[] filterFields)
+            throws AlgebricksException, HyracksDataException {
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
+                mergePolicyFactory, mergePolicyProperties, filterFields);
+        TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo);
+        return getPrimaryIndexDataflowHelper(createTestContext(), primaryIndexInfo, indexOpDesc);
+    }
+
+    public void createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
+            ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+            int[] filterFields) throws AlgebricksException, HyracksDataException {
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
+                mergePolicyFactory, mergePolicyProperties, filterFields);
+        TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo);
+        LSMBTreeDataflowHelper dataflowHelper =
+                getPrimaryIndexDataflowHelper(createTestContext(), primaryIndexInfo, indexOpDesc);
+        dataflowHelper.create();
+    }
+
+    private int[] createPrimaryIndexBloomFilterFields(int length) {
+        int[] primaryIndexBloomFilterKeyFields = new int[length];
+        for (int j = 0; j < length; ++j) {
+            primaryIndexBloomFilterKeyFields[j] = j;
+        }
+        return primaryIndexBloomFilterKeyFields;
+    }
+
+    private IBinaryComparatorFactory[] createPrimaryIndexComparatorFactories(IAType[] primaryKeyTypes) {
+        IBinaryComparatorFactory[] primaryIndexComparatorFactories =
+                new IBinaryComparatorFactory[primaryKeyTypes.length];
+        for (int j = 0; j < primaryKeyTypes.length; ++j) {
+            primaryIndexComparatorFactories[j] =
+                    AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(primaryKeyTypes[j], true);
+        }
+        return primaryIndexComparatorFactories;
+    }
+
+    private ISerializerDeserializer<?>[] createPrimaryIndexSerdes(int primaryIndexNumOfTupleFields,
+            IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType) {
+        int i = 0;
+        ISerializerDeserializer<?>[] primaryIndexSerdes = new ISerializerDeserializer<?>[primaryIndexNumOfTupleFields];
+        for (; i < primaryKeyTypes.length; i++) {
+            primaryIndexSerdes[i] =
+                    AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(primaryKeyTypes[i]);
+        }
+        primaryIndexSerdes[i++] = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(recordType);
+        if (metaType != null) {
+            primaryIndexSerdes[i] = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
+        }
+        return primaryIndexSerdes;
+    }
+
+    private ITypeTraits[] createPrimaryIndexTypeTraits(int primaryIndexNumOfTupleFields, IAType[] primaryKeyTypes,
+            ARecordType recordType, ARecordType metaType) {
+        ITypeTraits[] primaryIndexTypeTraits = new ITypeTraits[primaryIndexNumOfTupleFields];
+        int i = 0;
+        for (; i < primaryKeyTypes.length; i++) {
+            primaryIndexTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i]);
+        }
+        primaryIndexTypeTraits[i++] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(recordType);
+        if (metaType != null) {
+            primaryIndexTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(metaType);
+        }
+        return primaryIndexTypeTraits;
+    }
+
+    public IHyracksTaskContext createTestContext() throws HyracksDataException {
+        IHyracksTaskContext ctx = TestUtils.create(KB32);
+        TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
+        ctx = Mockito.spy(ctx);
+        Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
+        Mockito.when(ctx.getIOManager())
+                .thenReturn(AsterixHyracksIntegrationUtil.ncs[0].getRootContext().getIOManager());
+        return ctx;
+    }
+
+    public TransactionSubsystem getTransactionSubsystem() {
+        return (TransactionSubsystem) ((AsterixAppRuntimeContext) AsterixHyracksIntegrationUtil.ncs[0]
+                .getApplicationContext().getApplicationObject()).getTransactionSubsystem();
+    }
+
+    public ITransactionManager getTransactionManager() {
+        return getTransactionSubsystem().getTransactionManager();
+    }
+
+    public AsterixAppRuntimeContext getAppRuntimeContext() {
+        return (AsterixAppRuntimeContext) AsterixHyracksIntegrationUtil.ncs[0].getApplicationContext()
+                .getApplicationObject();
+    }
+
+    public DatasetLifecycleManager getDatasetLifecycleManager() {
+        return (DatasetLifecycleManager) getAppRuntimeContext().getDatasetLifecycleManager();
+    }
+
+    @SuppressWarnings("unused")
+    private class PrimaryIndexInfo {
+        private Dataset dataset;
+        private IAType[] primaryKeyTypes;
+        private ARecordType recordType;
+        private ARecordType metaType;
+        private ILSMMergePolicyFactory mergePolicyFactory;
+        private Map<String, String> mergePolicyProperties;
+        private int[] filterFields;
+        private int primaryIndexNumOfTupleFields;
+        private IBinaryComparatorFactory[] primaryIndexComparatorFactories;
+        private ITypeTraits[] primaryIndexTypeTraits;
+        private ISerializerDeserializer<?>[] primaryIndexSerdes;
+        private int[] primaryIndexBloomFilterKeyFields;
+        private ITypeTraits[] filterTypeTraits;
+        private IBinaryComparatorFactory[] filterCmpFactories;
+        private int[] btreeFields;
+        private ILocalResourceFactoryProvider localResourceFactoryProvider;
+        private ConstantFileSplitProvider fileSplitProvider;
+        private RecordDescriptor rDesc;
+        private int[] primaryIndexInsertFieldsPermutations;
+        private int[] primaryKeyIndexes;
+
+        public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
+                ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+                int[] filterFields) throws AlgebricksException {
+            this.dataset = dataset;
+            this.primaryKeyTypes = primaryKeyTypes;
+            this.recordType = recordType;
+            this.metaType = metaType;
+            this.mergePolicyFactory = mergePolicyFactory;
+            this.mergePolicyProperties = mergePolicyProperties;
+            this.filterFields = filterFields;
+            primaryIndexNumOfTupleFields = primaryKeyTypes.length + (1 + ((metaType == null) ? 0 : 1));
+            primaryIndexTypeTraits =
+                    createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
+            primaryIndexComparatorFactories = createPrimaryIndexComparatorFactories(primaryKeyTypes);
+            primaryIndexBloomFilterKeyFields = createPrimaryIndexBloomFilterFields(primaryKeyTypes.length);
+            filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recordType);
+            filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, recordType,
+                    NonTaggedDataFormat.INSTANCE.getBinaryComparatorFactoryProvider());
+            btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+            localResourceFactoryProvider = getPrimaryIndexLocalResourceMetadataProvider(dataset, primaryIndexTypeTraits,
+                    primaryIndexComparatorFactories, primaryIndexBloomFilterKeyFields, mergePolicyFactory,
+                    mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, filterFields);
+            fileSplitProvider = getFileSplitProvider(dataset);
+            primaryIndexSerdes =
+                    createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
+            rDesc = new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits);
+            primaryIndexInsertFieldsPermutations = new int[primaryIndexNumOfTupleFields];
+            for (int i = 0; i < primaryIndexNumOfTupleFields; i++) {
+                primaryIndexInsertFieldsPermutations[i] = i;
+            }
+            primaryKeyIndexes = new int[primaryKeyTypes.length];
+            for (int i = 0; i < primaryKeyIndexes.length; i++) {
+                primaryKeyIndexes[i] = i;
+            }
+        }
+
+        public IRecordDescriptorProvider getInsertRecordDescriptorProvider() {
+            IRecordDescriptorProvider rDescProvider = Mockito.mock(IRecordDescriptorProvider.class);
+            Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), Mockito.anyInt())).thenReturn(rDesc);
+            return rDescProvider;
+        }
+
+        public IRecordDescriptorProvider getSearchRecordDescriptorProvider() {
+            ITypeTraits[] primaryKeyTypeTraits = new ITypeTraits[primaryKeyTypes.length];
+            ISerializerDeserializer<?>[] primaryKeySerdes = new ISerializerDeserializer<?>[primaryKeyTypes.length];
+            for (int i = 0; i < primaryKeyTypes.length; i++) {
+                primaryKeyTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i]);
+                primaryKeySerdes[i] =
+                        AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(primaryKeyTypes[i]);
+            }
+            RecordDescriptor searcgRecDesc = new RecordDescriptor(primaryKeySerdes, primaryKeyTypeTraits);
+            IRecordDescriptorProvider rDescProvider = Mockito.mock(IRecordDescriptorProvider.class);
+            Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), Mockito.anyInt()))
+                    .thenReturn(searcgRecDesc);
+            return rDescProvider;
+        }
+    }
+
+    public RecordDescriptor getSearchOutputDesc(IAType[] keyTypes, ARecordType recordType, ARecordType metaType) {
+        int primaryIndexNumOfTupleFields = keyTypes.length + (1 + ((metaType == null) ? 0 : 1));
+        ITypeTraits[] primaryIndexTypeTraits =
+                createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, keyTypes, recordType, metaType);
+        ISerializerDeserializer<?>[] primaryIndexSerdes =
+                createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, keyTypes, recordType, metaType);
+        return new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits);
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ABooleanFieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ABooleanFieldValueGenerator.java
new file mode 100644
index 0000000..2eba473
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ABooleanFieldValueGenerator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.om.types.ATypeTag;
+
+public class ABooleanFieldValueGenerator implements IAsterixFieldValueGenerator<Boolean> {
+    private final GenerationFunction generationFunction;
+    private final boolean tagged;
+    private final Random rand = new Random();
+    private boolean value;
+
+    public ABooleanFieldValueGenerator(GenerationFunction generationFunction, boolean tagged) {
+        this.generationFunction = generationFunction;
+        this.tagged = tagged;
+        switch (generationFunction) {
+            case DECREASING:
+                value = true;
+                break;
+            case DETERMINISTIC:
+                value = false;
+                break;
+            case INCREASING:
+                value = false;
+                break;
+            case RANDOM:
+                value = rand.nextBoolean();
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
+        }
+        generate();
+        out.writeBoolean(value);
+    }
+
+    private void generate() {
+        switch (generationFunction) {
+            case DETERMINISTIC:
+                value = !value;
+                break;
+            case RANDOM:
+                value = rand.nextBoolean();
+                break;
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public Boolean next() throws IOException {
+        generate();
+        return value;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
+        }
+        out.writeBoolean(value);
+    }
+
+    @Override
+    public Boolean get() throws IOException {
+        return value;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ADoubleFieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ADoubleFieldValueGenerator.java
new file mode 100644
index 0000000..e698676
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ADoubleFieldValueGenerator.java
@@ -0,0 +1,153 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.om.types.ATypeTag;
+
+public class ADoubleFieldValueGenerator implements IAsterixFieldValueGenerator<Double> {
+    private static final double START = 1000000000.0;
+    private static final int BATCH_SIZE = 1000;
+    private static final double INCREMENT = 0.1;
+    private final GenerationFunction generationFunction;
+    private final boolean unique;
+    private final boolean tagged;
+    private final Random rand = new Random();
+    private double value;
+    private int cycle;
+    private List<Double> uniques;
+    private Iterator<Double> iterator;
+
+    public ADoubleFieldValueGenerator(GenerationFunction generationFunction, boolean unique, boolean tagged) {
+        this.generationFunction = generationFunction;
+        this.unique = unique;
+        this.tagged = tagged;
+        switch (generationFunction) {
+            case DECREASING:
+                value = Integer.MAX_VALUE;
+                break;
+            case DETERMINISTIC:
+                value = START;
+                break;
+            case INCREASING:
+                value = 0;
+                break;
+            case RANDOM:
+                if (unique) {
+                    double lowerBound = START;
+                    double upperBound = lowerBound + (BATCH_SIZE * INCREMENT);
+                    uniques = new ArrayList<>();
+                    while (lowerBound < upperBound) {
+                        uniques.add(lowerBound);
+                        lowerBound += INCREMENT;
+                    }
+                    Collections.shuffle(uniques);
+                    iterator = uniques.iterator();
+                }
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_DOUBLE_TYPE_TAG);
+        }
+        generate();
+        out.writeDouble(value);
+    }
+
+    private void generate() {
+        switch (generationFunction) {
+            case DECREASING:
+                value -= INCREMENT;
+            case DETERMINISTIC:
+                if (value >= START) {
+                    cycle++;
+                    value = START - (cycle * INCREMENT);
+                } else {
+                    value = START + (cycle * INCREMENT);
+                }
+                break;
+            case INCREASING:
+                value += INCREMENT;
+                break;
+            case RANDOM:
+                if (unique) {
+                    if (iterator.hasNext()) {
+                        value = iterator.next();
+                    } else {
+                        // generate next patch
+                        cycle++;
+                        double lowerBound;
+                        if (cycle % 2 == 0) {
+                            // even
+                            lowerBound = START + ((cycle / 2) * (BATCH_SIZE * INCREMENT));
+                        } else {
+                            // odd
+                            lowerBound = START - ((cycle / 2 + 1) * (BATCH_SIZE * INCREMENT));
+                        }
+                        double upperBound = lowerBound + (BATCH_SIZE * INCREMENT);
+                        uniques.clear();
+                        while (lowerBound < upperBound) {
+                            uniques.add(lowerBound);
+                            lowerBound += INCREMENT;
+                        }
+                        Collections.shuffle(uniques);
+                        iterator = uniques.iterator();
+                        value = iterator.next();
+                    }
+                } else {
+                    value = rand.nextDouble();
+                }
+                break;
+            default:
+                break;
+
+        }
+    }
+
+    @Override
+    public Double next() throws IOException {
+        generate();
+        return value;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_DOUBLE_TYPE_TAG);
+        }
+        out.writeDouble(value);
+    }
+
+    @Override
+    public Double get() throws IOException {
+        return value;
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java
new file mode 100644
index 0000000..7c6556b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java
@@ -0,0 +1,152 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.om.types.ATypeTag;
+
+public class AInt32FieldValueGenerator implements IAsterixFieldValueGenerator<Integer> {
+    private static final int START = 1000000000;
+    private static final int BATCH_SIZE = 1000;
+    private final GenerationFunction generationFunction;
+    private final boolean unique;
+    private final boolean tagged;
+    private final Random rand = new Random();
+    private int value;
+    private int cycle;
+    private List<Integer> uniques;
+    private Iterator<Integer> iterator;
+
+    public AInt32FieldValueGenerator(GenerationFunction generationFunction, boolean unique, boolean tagged) {
+        this.generationFunction = generationFunction;
+        this.unique = unique;
+        this.tagged = tagged;
+        switch (generationFunction) {
+            case DECREASING:
+                value = Integer.MAX_VALUE;
+                break;
+            case DETERMINISTIC:
+                value = START;
+                break;
+            case INCREASING:
+                value = 0;
+                break;
+            case RANDOM:
+                if (unique) {
+                    int lowerBound = START;
+                    int upperBound = lowerBound + BATCH_SIZE;
+                    uniques = new ArrayList<>();
+                    while (lowerBound < upperBound) {
+                        uniques.add(lowerBound);
+                        lowerBound++;
+                    }
+                    Collections.shuffle(uniques);
+                    iterator = uniques.iterator();
+                }
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
+        }
+        generate();
+        out.writeInt(value);
+    }
+
+    private void generate() {
+        switch (generationFunction) {
+            case DECREASING:
+                value--;
+            case DETERMINISTIC:
+                if (value >= START) {
+                    cycle++;
+                    value = START - cycle;
+                } else {
+                    value = START + cycle;
+                }
+                break;
+            case INCREASING:
+                value++;
+                break;
+            case RANDOM:
+                if (unique) {
+                    if (iterator.hasNext()) {
+                        value = iterator.next();
+                    } else {
+                        // generate next patch
+                        cycle++;
+                        int lowerBound;
+                        if (cycle % 2 == 0) {
+                            // even
+                            lowerBound = START + ((cycle / 2) * BATCH_SIZE);
+                        } else {
+                            // odd
+                            lowerBound = START - ((cycle / 2 + 1) * BATCH_SIZE);
+                        }
+                        int upperBound = lowerBound + BATCH_SIZE;
+                        uniques.clear();
+                        while (lowerBound < upperBound) {
+                            uniques.add(lowerBound);
+                            lowerBound++;
+                        }
+                        Collections.shuffle(uniques);
+                        iterator = uniques.iterator();
+                        value = iterator.next();
+                    }
+                } else {
+                    value = rand.nextInt();
+                }
+                break;
+            default:
+                break;
+
+        }
+    }
+
+    @Override
+    public Integer next() throws IOException {
+        generate();
+        return value;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
+        }
+        out.writeInt(value);
+    }
+
+    @Override
+    public Integer get() throws IOException {
+        return value;
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt64FieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt64FieldValueGenerator.java
new file mode 100644
index 0000000..2a2496e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt64FieldValueGenerator.java
@@ -0,0 +1,152 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.om.types.ATypeTag;
+
+public class AInt64FieldValueGenerator implements IAsterixFieldValueGenerator<Long> {
+    private static final long START = 4500000000000000000L;
+    private static final long BATCH_SIZE = 1000L;
+    private final GenerationFunction generationFunction;
+    private final boolean unique;
+    private final boolean tagged;
+    private final Random rand = new Random();
+    private long value;
+    private int cycle;
+    private List<Long> uniques;
+    private Iterator<Long> iterator;
+
+    public AInt64FieldValueGenerator(GenerationFunction generationFunction, boolean unique, boolean tagged) {
+        this.generationFunction = generationFunction;
+        this.unique = unique;
+        this.tagged = tagged;
+        switch (generationFunction) {
+            case DECREASING:
+                value = Long.MAX_VALUE;
+                break;
+            case DETERMINISTIC:
+                value = START;
+                break;
+            case INCREASING:
+                value = 0L;
+                break;
+            case RANDOM:
+                if (unique) {
+                    long lowerBound = START;
+                    long upperBound = lowerBound + BATCH_SIZE;
+                    uniques = new ArrayList<>();
+                    while (lowerBound < upperBound) {
+                        uniques.add(lowerBound);
+                        lowerBound++;
+                    }
+                    Collections.shuffle(uniques);
+                    iterator = uniques.iterator();
+                }
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+        }
+        generate();
+        out.writeLong(value);
+    }
+
+    private void generate() {
+        switch (generationFunction) {
+            case DECREASING:
+                value--;
+            case DETERMINISTIC:
+                if (value >= START) {
+                    cycle++;
+                    value = START - cycle;
+                } else {
+                    value = START + cycle;
+                }
+                break;
+            case INCREASING:
+                value++;
+                break;
+            case RANDOM:
+                if (unique) {
+                    if (iterator.hasNext()) {
+                        value = iterator.next();
+                    } else {
+                        // generate next patch
+                        cycle++;
+                        long lowerBound;
+                        if (cycle % 2 == 0) {
+                            // even
+                            lowerBound = START + ((cycle / 2) * BATCH_SIZE);
+                        } else {
+                            // odd
+                            lowerBound = START - ((cycle / 2 + 1) * BATCH_SIZE);
+                        }
+                        long upperBound = lowerBound + BATCH_SIZE;
+                        uniques.clear();
+                        while (lowerBound < upperBound) {
+                            uniques.add(lowerBound);
+                            lowerBound++;
+                        }
+                        Collections.shuffle(uniques);
+                        iterator = uniques.iterator();
+                        value = iterator.next();
+                    }
+                } else {
+                    value = rand.nextLong();
+                }
+                break;
+            default:
+                break;
+
+        }
+    }
+
+    @Override
+    public Long next() throws IOException {
+        generate();
+        return value;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+        }
+        out.writeLong(value);
+    }
+
+    @Override
+    public Long get() throws IOException {
+        return value;
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ARecordValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ARecordValueGenerator.java
new file mode 100644
index 0000000..df717e9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ARecordValueGenerator.java
@@ -0,0 +1,119 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.test.common.TestTupleReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class ARecordValueGenerator implements IAsterixFieldValueGenerator<ITupleReference> {
+    private final IAsterixFieldValueGenerator<?>[] generators;
+    private final boolean tagged;
+    private final ARecordType recordType;
+    private final RecordBuilder recBuilder;
+    private final ArrayBackedValueStorage fieldValueBuffer;
+    private final TestTupleReference tuple;
+
+    public ARecordValueGenerator(GenerationFunction[] generationFunctions, ARecordType recordType, boolean[] uniques,
+            boolean tagged) {
+        this.tagged = tagged;
+        this.recordType = recordType;
+        tuple = new TestTupleReference(1);
+        fieldValueBuffer = new ArrayBackedValueStorage();
+        recBuilder = new RecordBuilder();
+        recBuilder.reset(recordType);
+        recBuilder.init();
+        generators = new IAsterixFieldValueGenerator<?>[recordType.getFieldTypes().length];
+        for (int i = 0; i < recordType.getFieldTypes().length; i++) {
+            ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
+            switch (tag) {
+                case BOOLEAN:
+                    generators[i] = new ABooleanFieldValueGenerator(generationFunctions[i], true);
+                    break;
+                case DOUBLE:
+                    generators[i] = new ADoubleFieldValueGenerator(generationFunctions[i], uniques[i], true);
+                    break;
+                case INT32:
+                    generators[i] = new AInt32FieldValueGenerator(generationFunctions[i], uniques[i], true);
+                    break;
+                case INT64:
+                    generators[i] = new AInt64FieldValueGenerator(generationFunctions[i], uniques[i], true);
+                    break;
+                case STRING:
+                    generators[i] = new AStringFieldValueGenerator(generationFunctions[i], uniques[i], true);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unsupported type " + tag);
+            }
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        recBuilder.reset(recordType);
+        recBuilder.init();
+        for (int i = 0; i < generators.length; i++) {
+            fieldValueBuffer.reset();
+            generators[i].next(fieldValueBuffer.getDataOutput());
+            recBuilder.addField(i, fieldValueBuffer);
+        }
+        recBuilder.write(out, tagged);
+    }
+
+    @Override
+    public ITupleReference next() throws IOException {
+        tuple.reset();
+        next(tuple.getFields()[0].getDataOutput());
+        return tuple;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        recBuilder.reset(recordType);
+        recBuilder.init();
+        for (int i = 0; i < generators.length; i++) {
+            fieldValueBuffer.reset();
+            generators[i].get(fieldValueBuffer.getDataOutput());
+            recBuilder.addField(i, fieldValueBuffer);
+        }
+        recBuilder.write(out, tagged);
+    }
+
+    @Override
+    public ITupleReference get() throws IOException {
+        tuple.reset();
+        get(tuple.getFields()[0].getDataOutput());
+        return tuple;
+    }
+
+    public void get(int i, DataOutput out) throws IOException {
+        generators[i].get(out);
+    }
+
+    public Object get(int i) throws IOException {
+        return generators[i].get();
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AStringFieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AStringFieldValueGenerator.java
new file mode 100644
index 0000000..5ee6d40
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AStringFieldValueGenerator.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+public class AStringFieldValueGenerator implements IAsterixFieldValueGenerator<String> {
+    private static final String PREFIX = "A String Value #";
+    private static final int START = 1000000000;
+    private static final int BATCH_SIZE = 1000;
+    private final GenerationFunction generationFunction;
+    private final boolean unique;
+    private final boolean tagged;
+    private final Random rand = new Random();
+    private int value;
+    private int cycle;
+    private List<Integer> uniques;
+    private Iterator<Integer> iterator;
+    private String aString;
+    private UTF8StringSerializerDeserializer stringSerde =
+            new UTF8StringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader());
+
+    public AStringFieldValueGenerator(GenerationFunction generationFunction, boolean unique, boolean tagged) {
+        this.generationFunction = generationFunction;
+        this.unique = unique;
+        this.tagged = tagged;
+        switch (generationFunction) {
+            case DECREASING:
+                value = Integer.MAX_VALUE;
+                break;
+            case DETERMINISTIC:
+                value = START;
+                break;
+            case INCREASING:
+                value = 0;
+                break;
+            case RANDOM:
+                if (unique) {
+                    int lowerBound = START;
+                    int upperBound = lowerBound + BATCH_SIZE;
+                    uniques = new ArrayList<>();
+                    while (lowerBound < upperBound) {
+                        uniques.add(lowerBound);
+                        lowerBound++;
+                    }
+                    Collections.shuffle(uniques);
+                    iterator = uniques.iterator();
+                }
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
+        }
+        generate();
+        stringSerde.serialize(aString, out);
+    }
+
+    private void generate() {
+        switch (generationFunction) {
+            case DECREASING:
+                value--;
+            case DETERMINISTIC:
+                if (value >= START) {
+                    cycle++;
+                    value = START - cycle;
+                } else {
+                    value = START + cycle;
+                }
+                break;
+            case INCREASING:
+                value++;
+                break;
+            case RANDOM:
+                if (unique) {
+                    if (iterator.hasNext()) {
+                        value = iterator.next();
+                    } else {
+                        // generate next patch
+                        cycle++;
+                        int lowerBound;
+                        if (cycle % 2 == 0) {
+                            // even
+                            lowerBound = START + ((cycle / 2) * BATCH_SIZE);
+                        } else {
+                            // odd
+                            lowerBound = START - ((cycle / 2 + 1) * BATCH_SIZE);
+                        }
+                        int upperBound = lowerBound + BATCH_SIZE;
+                        uniques.clear();
+                        while (lowerBound < upperBound) {
+                            uniques.add(lowerBound);
+                            lowerBound++;
+                        }
+                        Collections.shuffle(uniques);
+                        iterator = uniques.iterator();
+                        value = iterator.next();
+                    }
+                } else {
+                    value = rand.nextInt();
+                }
+                break;
+            default:
+                break;
+        }
+        aString = PREFIX + String.format("%08d", value);
+    }
+
+    @Override
+    public String next() throws IOException {
+        generate();
+        return aString;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
+        }
+        stringSerde.serialize(aString, out);
+    }
+
+    @Override
+    public String get() throws IOException {
+        return aString;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/IAsterixFieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/IAsterixFieldValueGenerator.java
new file mode 100644
index 0000000..17bdcf8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/IAsterixFieldValueGenerator.java
@@ -0,0 +1,48 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public interface IAsterixFieldValueGenerator<T> {
+    /**
+     * @param out
+     * @throws IOException
+     */
+    public void next(DataOutput out) throws IOException;
+
+    /**
+     * @return
+     * @throws IOException
+     */
+    public T next() throws IOException;
+
+    /**
+     * @param out
+     * @throws IOException
+     */
+    public void get(DataOutput out) throws IOException;
+
+    /**
+     * @return
+     * @throws IOException
+     */
+    public T get() throws IOException;
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TestTupleCounterFrameWriter.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TestTupleCounterFrameWriter.java
new file mode 100644
index 0000000..ee3de51
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TestTupleCounterFrameWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.data.gen;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.test.CountAnswer;
+import org.apache.hyracks.api.test.TestFrameWriter;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class TestTupleCounterFrameWriter extends TestFrameWriter {
+
+    private final FrameTupleAccessor accessor;
+    private int count = 0;
+
+    public TestTupleCounterFrameWriter(RecordDescriptor recordDescriptor, CountAnswer openAnswer,
+            CountAnswer nextAnswer, CountAnswer flushAnswer, CountAnswer failAnswer, CountAnswer closeAnswer,
+            boolean deepCopyInputFrames) {
+        super(openAnswer, nextAnswer, flushAnswer, failAnswer, closeAnswer, deepCopyInputFrames);
+        accessor = new FrameTupleAccessor(recordDescriptor);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        super.nextFrame(buffer);
+        accessor.reset(buffer);
+        count += accessor.getTupleCount();
+    }
+
+    public int getCount() {
+        return count;
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TupleGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TupleGenerator.java
new file mode 100644
index 0000000..98c57a0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TupleGenerator.java
@@ -0,0 +1,126 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.IOException;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class TupleGenerator {
+
+    private final int[] keyIndexes;
+    private final int[] keyIndicators;
+    private final ARecordValueGenerator recordGenerator;
+    private final ARecordValueGenerator metaGenerator;
+    private final TestTupleReference tuple;
+
+    public enum GenerationFunction {
+        RANDOM,
+        DETERMINISTIC,
+        INCREASING,
+        DECREASING
+    }
+
+    /**
+     * @param recordType
+     * @param metaType
+     * @param key
+     * @param keyIndexes
+     * @param keyIndicators
+     * @param recordGeneration
+     * @param uniqueRecordFields
+     * @param metaGeneration
+     * @param uniqueMetaFields
+     */
+    public TupleGenerator(ARecordType recordType, ARecordType metaType, int[] keyIndexes, int[] keyIndicators,
+            GenerationFunction[] recordGeneration, boolean[] uniqueRecordFields, GenerationFunction[] metaGeneration,
+            boolean[] uniqueMetaFields) {
+        this.keyIndexes = keyIndexes;
+        this.keyIndicators = keyIndicators;
+        for (IAType field : recordType.getFieldTypes()) {
+            validate(field);
+        }
+        recordGenerator = new ARecordValueGenerator(recordGeneration, recordType, uniqueRecordFields, true);
+        if (metaType != null) {
+            for (IAType field : metaType.getFieldTypes()) {
+                validate(field);
+            }
+            metaGenerator = new ARecordValueGenerator(metaGeneration, metaType, uniqueMetaFields, true);
+        } else {
+            metaGenerator = null;
+        }
+        int numOfFields = keyIndexes.length + 1 + ((metaType != null) ? 1 : 0);
+        tuple = new TestTupleReference(numOfFields);
+        boolean atLeastOneKeyFieldIsNotRandomAndNotBoolean = false;
+        for (int i = 0; i < keyIndexes.length; i++) {
+            if (keyIndicators[i] < 0 || keyIndicators[i] > 1) {
+                throw new IllegalArgumentException("key field indicator must be either 0 or 1");
+            }
+            atLeastOneKeyFieldIsNotRandomAndNotBoolean = atLeastOneKeyFieldIsNotRandomAndNotBoolean
+                    || validateKey(keyIndexes[i], keyIndicators[i] == 0 ? recordType : metaType,
+                            keyIndicators[i] == 0 ? uniqueRecordFields[i] : uniqueMetaFields[i]);
+        }
+        if (!atLeastOneKeyFieldIsNotRandomAndNotBoolean) {
+            throw new IllegalArgumentException("at least one key field must be unique and not boolean");
+        }
+        if (keyIndexes.length != keyIndicators.length) {
+            throw new IllegalArgumentException("number of key indexes must equals number of key indicators");
+        }
+    }
+
+    private boolean validateKey(int i, ARecordType type, boolean unique) {
+        if (type.getFieldNames().length <= i) {
+            throw new IllegalArgumentException("key index must be less than number of fields");
+        }
+        return unique && type.getFieldTypes()[i].getTypeTag() != ATypeTag.BOOLEAN;
+    }
+
+    public ITupleReference next() throws IOException {
+        tuple.reset();
+        recordGenerator.next(tuple.getFields()[keyIndexes.length].getDataOutput());
+        if (metaGenerator != null) {
+            recordGenerator.next(tuple.getFields()[keyIndexes.length + 1].getDataOutput());
+        }
+        for (int i = 0; i < keyIndexes.length; i++) {
+            if (keyIndicators[i] == 0) {
+                recordGenerator.get(keyIndexes[i], tuple.getFields()[i].getDataOutput());
+            } else {
+                metaGenerator.get(keyIndexes[i], tuple.getFields()[i].getDataOutput());
+            }
+        }
+        return tuple;
+    }
+
+    private void validate(IAType field) {
+        switch (field.getTypeTag()) {
+            case BOOLEAN:
+            case DOUBLE:
+            case INT32:
+            case INT64:
+            case STRING:
+                break;
+            default:
+                throw new IllegalArgumentException("Generating data of type " + field + " is not supported");
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
index e267cc7..54bdb1b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
@@ -96,33 +96,4 @@
         }
         return tuple;
     }
-
-    private class TestTupleReference implements ITupleReference {
-        private final GrowableArray[] fields;
-
-        private TestTupleReference(GrowableArray[] fields) {
-            this.fields = fields;
-        }
-
-        @Override
-        public int getFieldCount() {
-            return fields.length;
-        }
-
-        @Override
-        public byte[] getFieldData(int fIdx) {
-
-            return fields[fIdx].getByteArray();
-        }
-
-        @Override
-        public int getFieldStart(int fIdx) {
-            return 0;
-        }
-
-        @Override
-        public int getFieldLength(int fIdx) {
-            return fields[fIdx].getLength();
-        }
-    }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleReference.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleReference.java
new file mode 100644
index 0000000..b676dfd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleReference.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.common;
+
+import org.apache.hyracks.data.std.util.GrowableArray;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class TestTupleReference implements ITupleReference {
+    private GrowableArray[] fields;
+    private int[] offsets;
+
+    public TestTupleReference(GrowableArray[] fields) {
+        this.fields = fields;
+        offsets = new int[fields.length];
+    }
+
+    public TestTupleReference(int numfields) {
+        this.fields = new GrowableArray[numfields];
+        for (int i = 0; i < numfields; i++) {
+            fields[i] = new GrowableArray();
+        }
+        offsets = new int[fields.length];
+    }
+
+    public GrowableArray[] getFields() {
+        return fields;
+    }
+
+    public void setFields(GrowableArray[] fields) {
+        this.fields = fields;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return fields.length;
+    }
+
+    @Override
+    public byte[] getFieldData(int fIdx) {
+        return fields[fIdx].getByteArray();
+    }
+
+    @Override
+    public int getFieldStart(int fIdx) {
+        return offsets[fIdx];
+    }
+
+    @Override
+    public int getFieldLength(int fIdx) {
+        return fields[fIdx].getLength();
+    }
+
+    public void reset() {
+        for (GrowableArray field : fields) {
+            field.reset();
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
index 2f712cc..a253ac0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
@@ -36,6 +36,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
 import org.apache.hyracks.api.test.TestFrameWriter;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -44,6 +45,7 @@
 import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.PartitionWithMessageDataWriter;
 import org.apache.hyracks.test.support.TestUtils;
@@ -70,7 +72,7 @@
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             VSizeFrame message = new VSizeFrame(ctx);
             VSizeFrame tempBuffer = new VSizeFrame(ctx);
-            ctx.setSharedObject(message);
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
             message.getBuffer().clear();
             message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
             message.getBuffer().flip();
@@ -144,8 +146,8 @@
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             VSizeFrame message = new VSizeFrame(ctx);
             VSizeFrame tempBuffer = new VSizeFrame(ctx);
-            ctx.setSharedObject(message);
-            writeRandomMessage(message, MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, DEFAULT_FRAME_SIZE + 1);
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+            writeRandomMessage(message, MessagingFrameTupleAppender.MARKER_MESSAGE, DEFAULT_FRAME_SIZE + 1);
             ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
                     Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
                     BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
@@ -165,7 +167,7 @@
                 fta.reset(writer.getLastFrame());
                 Assert.assertEquals(fta.getTupleCount(), 1);
                 FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
-                Assert.assertEquals(MessagingFrameTupleAppender.SNAPSHOT_MESSAGE,
+                Assert.assertEquals(MessagingFrameTupleAppender.MARKER_MESSAGE,
                         MessagingFrameTupleAppender.getMessageType(tempBuffer));
             }
             message.getBuffer().clear();
@@ -228,9 +230,9 @@
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             VSizeFrame message = new VSizeFrame(ctx);
             VSizeFrame tempBuffer = new VSizeFrame(ctx);
-            ctx.setSharedObject(message);
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
             message.getBuffer().clear();
-            writeRandomMessage(message, MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, DEFAULT_FRAME_SIZE);
+            writeRandomMessage(message, MessagingFrameTupleAppender.MARKER_MESSAGE, DEFAULT_FRAME_SIZE);
             ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
                     Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
                     BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
@@ -264,7 +266,7 @@
                 fta.reset(writer.getLastFrame());
                 Assert.assertEquals(fta.getTupleCount(), 1);
                 FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
-                Assert.assertEquals(MessagingFrameTupleAppender.SNAPSHOT_MESSAGE,
+                Assert.assertEquals(MessagingFrameTupleAppender.MARKER_MESSAGE,
                         MessagingFrameTupleAppender.getMessageType(tempBuffer));
             }
             partitioner.close();
@@ -286,7 +288,7 @@
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             VSizeFrame message = new VSizeFrame(ctx);
             VSizeFrame tempBuffer = new VSizeFrame(ctx);
-            ctx.setSharedObject(message);
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
             message.getBuffer().clear();
             message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
             message.getBuffer().flip();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
new file mode 100644
index 0000000..a0ef31e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.transaction.management.service.logging.LogReader;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.test.CountAnswer;
+import org.apache.hyracks.api.test.FrameWriterTestUtils;
+import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelper;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogMarkerTest {
+
+    private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+    private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+    private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+            new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+    private static final GenerationFunction[] RECORD_GEN_FUNCTION =
+            { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+    private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+    private static final ARecordType META_TYPE = null;
+    private static final GenerationFunction[] META_GEN_FUNCTION = null;
+    private static final boolean[] UNIQUE_META_FIELDS = null;
+    private static final int[] KEY_INDEXES = { 0 };
+    private static final int[] KEY_INDICATORS = { 0 };
+    private static final int NUM_OF_RECORDS = 100000;
+    private static final int SNAPSHOT_SIZE = 1000;
+    private static final int DATASET_ID = 101;
+    private static final String SPILL_AREA = "target" + File.separator + "spill_area";
+    private static final String DATAVERSE_NAME = "TestDV";
+    private static final String DATASET_NAME = "TestDS";
+    private static final String DATA_TYPE_NAME = "DUMMY";
+    private static final String NODE_GROUP_NAME = "DEFAULT";
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+        System.out.println("SetUp: ");
+        File f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "txnLogDir");
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+        f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "IODevice");
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+        f = new File(System.getProperty("user.dir") + File.separator + SPILL_AREA);
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        System.out.println("TearDown");
+        File f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "txnLogDir");
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+        f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "IODevice");
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+        f = new File(System.getProperty("user.dir") + File.separator + SPILL_AREA);
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+    }
+
+    @Test
+    public void testInsertWithSnapshot() {
+        try {
+            TestNodeController nc = new TestNodeController();
+            nc.init();
+            Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
+                    NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                            Collections.emptyList(), null, null, null, false, null, false),
+                    null, DatasetType.INTERNAL, DATASET_ID, 0);
+            try {
+                nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,
+                        null);
+                IHyracksTaskContext ctx = nc.createTestContext();
+                nc.newJobId();
+                ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+                AsterixLSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+                        RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null);
+                insertOp.open();
+                TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                        RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+                VSizeFrame frame = new VSizeFrame(ctx);
+                VSizeFrame marker = new VSizeFrame(ctx);
+                FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+                long markerId = 0L;
+                for (int j = 0; j < NUM_OF_RECORDS; j++) {
+                    if (j % SNAPSHOT_SIZE == 0) {
+                        marker.reset();
+                        marker.getBuffer().put(MessagingFrameTupleAppender.MARKER_MESSAGE);
+                        marker.getBuffer().putLong(markerId);
+                        marker.getBuffer().flip();
+                        markerId++;
+                        TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, marker, ctx);
+                        tupleAppender.flush(insertOp);
+                    }
+                    ITupleReference tuple = tupleGenerator.next();
+                    DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+                }
+                if (tupleAppender.getTupleCount() > 0) {
+                    tupleAppender.write(insertOp, true);
+                }
+                insertOp.close();
+                nc.getTransactionManager().completedTransaction(txnCtx, new DatasetId(-1), -1, true);
+                LSMBTreeDataflowHelper dataflowHelper = nc.getPrimaryIndexDataflowHelper(dataset, KEY_TYPES,
+                        RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null);
+                dataflowHelper.open();
+                LSMBTree btree = (LSMBTree) dataflowHelper.getIndexInstance();
+                long lsn = btree.getMostRecentMarkerLSN();
+                int numOfMarkers = 0;
+                LogReader logReader = (LogReader) nc.getTransactionSubsystem().getLogManager().getLogReader(false);
+                long expectedMarkerId = markerId - 1;
+                while (lsn >= 0) {
+                    numOfMarkers++;
+                    ILogRecord logRecord = logReader.read(lsn);
+                    lsn = logRecord.getPreviousMarkerLSN();
+                    long logMarkerId = logRecord.getMarker().getLong();
+                    Assert.assertEquals(expectedMarkerId, logMarkerId);
+                    expectedMarkerId--;
+                }
+                logReader.close();
+                dataflowHelper.close();
+                Assert.assertEquals(markerId, numOfMarkers);
+                nc.newJobId();
+                TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
+                        Collections.emptyList(), Collections.emptyList(), false);
+                IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE,
+                        META_TYPE, new NoMergePolicyFactory(), null, null);
+                emptyTupleOp.open();
+                emptyTupleOp.close();
+                Assert.assertEquals(NUM_OF_RECORDS, countOp.getCount());
+            } finally {
+                nc.deInit();
+            }
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+
+    }
+
+    public static TestTupleCounterFrameWriter create(RecordDescriptor recordDescriptor,
+            Collection<FrameWriterOperation> exceptionThrowingOperations,
+            Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {
+        CountAnswer openAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Open,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer nextAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.NextFrame,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer flushAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Flush,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer failAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Fail,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer closeAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Close,
+                exceptionThrowingOperations, errorThrowingOperations);
+        return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer,
+                closeAnswer, deepCopyInputFrames);
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
deleted file mode 100644
index 536bf3a..0000000
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.test.dataflow;
-
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class TestRecordDescriptorFactory {
-    public RecordDescriptor createRecordDescriptor(ISerializerDeserializer<?>... serdes) {
-        return null;
-    }
-}
diff --git a/asterixdb/asterix-common/pom.xml b/asterixdb/asterix-common/pom.xml
index fc1c221..f6c0c99 100644
--- a/asterixdb/asterix-common/pom.xml
+++ b/asterixdb/asterix-common/pom.xml
@@ -252,6 +252,11 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>1.2.17</version>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index b3eb281..71c30d5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -79,7 +79,7 @@
     @Override
     public synchronized void completeOperation(ILSMIndex index, LSMOperationType opType,
             ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
             decrementNumActiveOperations(modificationCallback);
             if (numActiveOperations.get() == 0) {
@@ -148,12 +148,12 @@
         for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
 
             //get resource
-            ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
+            ILSMIndexAccessor accessor =
+                    lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
 
             //update resource lsn
-            AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
-                    .getIOOperationCallback();
+            AbstractLSMIOOperationCallback ioOpCallback =
+                    (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
             ioOpCallback.updateLastLSN(logRecord.getLSN());
 
             //schedule flush after update
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 9a76b40..cf66d30 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -22,6 +22,8 @@
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.exceptions.FrameDataException;
+import org.apache.asterix.common.transactions.ILogMarkerCallback;
+import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -31,6 +33,7 @@
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -41,7 +44,9 @@
 
 public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
+    public static final String KEY_INDEX = "Index";
     private final boolean isPrimary;
+    // This class has both lsmIndex and index (in super class) pointing to the same object
     private AbstractLSMIndex lsmIndex;
     private int i = 0;
 
@@ -59,10 +64,6 @@
     private int currentTupleIdx;
     private int lastFlushedTupleIdx;
 
-    public boolean isPrimary() {
-        return isPrimary;
-    }
-
     public AsterixLSMInsertDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op,
             boolean isPrimary) {
@@ -79,6 +80,10 @@
         indexHelper.open();
         lsmIndex = (AbstractLSMIndex) indexHelper.getIndexInstance();
         try {
+            if (isPrimary && ctx.getSharedObject() != null) {
+                PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback(lsmIndex);
+                TaskUtils.putInSharedMap(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
+            }
             writer.open();
             modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
                     indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(),
@@ -185,4 +190,8 @@
             writer.fail();
         }
     }
+
+    public boolean isPrimary() {
+        return isPrimary;
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogMarkerCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogMarkerCallback.java
new file mode 100644
index 0000000..11d649b
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogMarkerCallback.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This interface provide callback mechanism for adding marker logs to the transaction log file
+ */
+public interface ILogMarkerCallback {
+
+    String KEY_MARKER_CALLBACK = "MARKER_CALLBACK";
+
+    /**
+     * Called before writing the marker log allowing addition of specific information to the log record
+     *
+     * @param buffer:
+     *            the log buffer to write to
+     */
+    void before(ByteBuffer buffer);
+
+    /**
+     * Called after the log's been appended to the log tail passing the position of the log used for random access
+     *
+     * @param lsn
+     */
+    void after(long lsn);
+
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index cd05ba0..29af931 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -32,11 +32,38 @@
         LARGE_RECORD
     }
 
-    public static final int JOB_TERMINATE_LOG_SIZE = 14; //JOB_COMMIT or ABORT log type
-    public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 30;
-    public static final int UPDATE_LOG_BASE_SIZE = 51;
-    public static final int FLUSH_LOG_SIZE = 18;
-    public static final int WAIT_LOG_SIZE = 14;
+    public static final int CHKSUM_LEN = Long.BYTES;
+    public static final int FLDCNT_LEN = Integer.BYTES;
+    public static final int DS_LEN = Integer.BYTES;
+    public static final int LOG_SOURCE_LEN = Byte.BYTES;
+    public static final int LOGRCD_SZ_LEN = Integer.BYTES;
+    public static final int NEWOP_LEN = Byte.BYTES;
+    public static final int NEWVALSZ_LEN = Integer.BYTES;
+    public static final int PKHASH_LEN = Integer.BYTES;
+    public static final int PKSZ_LEN = Integer.BYTES;
+    public static final int PRVLSN_LEN = Long.BYTES;
+    public static final int RS_PARTITION_LEN = Integer.BYTES;
+    public static final int RSID_LEN = Long.BYTES;
+    public static final int SEQ_NUM_LEN = Long.BYTES;
+    public static final int TYPE_LEN = Byte.BYTES;
+    public static final int UUID_LEN = Long.BYTES;
+    public static final int VBUCKET_ID_LEN = Short.BYTES;
+
+    public static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
+    public static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN;
+    public static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
+    public static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
+    // What are these fields? vvvvv
+    public static final int REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN = Long.BYTES + Integer.BYTES + Integer.BYTES;
+
+    // How are the following computed?
+    public static final int JOB_TERMINATE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
+    public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 30; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +?
+    public static final int UPDATE_LOG_BASE_SIZE = 51; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +?
+    public static final int FLUSH_LOG_SIZE = 18; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +?
+    public static final int WAIT_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
+    public static final int MARKER_BASE_LOG_SIZE =
+            ALL_RECORD_HEADER_LEN + CHKSUM_LEN + DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN;
 
     public RecordReadStatus readLogRecord(ByteBuffer buffer);
 
@@ -135,7 +162,15 @@
 
     public ITupleReference getOldValue();
 
-    public void setOldValue(ITupleReference oldValue);
+    public void setOldValue(ITupleReference tupleBefore);
 
-    public void setOldValueSize(int oldValueSize);
+    public void setOldValueSize(int beforeSize);
+
+    public boolean isMarker();
+
+    public ByteBuffer getMarker();
+
+    public void logAppended(long lsn);
+
+    public long getPreviousMarkerLSN();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 23fdd0f..306b888 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -71,24 +71,6 @@
 
 public class LogRecord implements ILogRecord {
 
-    private static final int LOG_SOURCE_LEN = Byte.BYTES;
-    private static final int TYPE_LEN = Byte.BYTES;
-    public static final int PKHASH_LEN = Integer.BYTES;
-    public static final int PKSZ_LEN = Integer.BYTES;
-    private static final int RS_PARTITION_LEN = Integer.BYTES;
-    private static final int RSID_LEN = Long.BYTES;
-    private static final int LOGRCD_SZ_LEN = Integer.BYTES;
-    private static final int FLDCNT_LEN = Integer.BYTES;
-    private static final int NEWOP_LEN = Byte.BYTES;
-    private static final int NEWVALSZ_LEN = Integer.BYTES;
-    private static final int CHKSUM_LEN = Long.BYTES;
-
-    private static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
-    private static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN
-            + PKSZ_LEN;
-    private static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
-    private static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
-
     // ------------- fields in a log record (begin) ------------//
     private byte logSource;
     private byte logType;
@@ -108,8 +90,10 @@
     private ITupleReference oldValue;
     private int oldValueFieldCount;
     private long checksum;
+    private long prevMarkerLSN;
+    private ByteBuffer marker;
     // ------------- fields in a log record (end) --------------//
-
+    private final ILogMarkerCallback callback; // A callback for log mark operations
     private int PKFieldCnt;
     private ITransactionContext txnCtx;
     private long LSN;
@@ -129,7 +113,8 @@
     private String nodeId;
     private boolean replicated = false;
 
-    public LogRecord() {
+    public LogRecord(ILogMarkerCallback callback) {
+        this.callback = callback;
         isFlushed = new AtomicBoolean(false);
         readPKValue = new PrimaryKeyTupleReference();
         readNewValue = SimpleTupleWriter.INSTANCE.createTupleReference();
@@ -138,49 +123,70 @@
         logSource = LogSource.LOCAL;
     }
 
-    private void writeLogRecordCommonFields(ByteBuffer buffer) {
+    public LogRecord() {
+        this(null);
+    }
+
+    private void doWriteLogRecord(ByteBuffer buffer) {
         buffer.put(logSource);
         buffer.put(logType);
         buffer.putInt(jobId);
-        if (logType == LogType.UPDATE || logType == LogType.ENTITY_COMMIT || logType == LogType.UPSERT_ENTITY_COMMIT) {
-            buffer.putInt(resourcePartition);
-            buffer.putInt(datasetId);
-            buffer.putInt(PKHashValue);
-            if (PKValueSize <= 0) {
-                throw new IllegalStateException("Primary Key Size is less than or equal to 0");
-            }
-            buffer.putInt(PKValueSize);
-            writePKValue(buffer);
+        switch (logType) {
+            case LogType.ENTITY_COMMIT:
+            case LogType.UPSERT_ENTITY_COMMIT:
+                writeEntityInfo(buffer);
+                break;
+            case LogType.UPDATE:
+                writeEntityInfo(buffer);
+                buffer.putLong(resourceId);
+                buffer.putInt(logSize);
+                buffer.putInt(newValueFieldCount);
+                buffer.put(newOp);
+                buffer.putInt(newValueSize);
+                writeTuple(buffer, newValue, newValueSize);
+                if (oldValueSize > 0) {
+                    buffer.putInt(oldValueSize);
+                    buffer.putInt(oldValueFieldCount);
+                    writeTuple(buffer, oldValue, oldValueSize);
+                }
+                break;
+            case LogType.FLUSH:
+                buffer.putInt(datasetId);
+                break;
+            case LogType.MARKER:
+                buffer.putInt(datasetId);
+                buffer.putInt(resourcePartition);
+                callback.before(buffer);
+                buffer.putInt(logSize);
+                buffer.put(marker);
+                break;
+            default:
+                // Do nothing
         }
-        if (logType == LogType.UPDATE) {
-            buffer.putLong(resourceId);
-            buffer.putInt(logSize);
-            buffer.putInt(newValueFieldCount);
-            buffer.put(newOp);
-            buffer.putInt(newValueSize);
-            writeTuple(buffer, newValue, newValueSize);
-            if (oldValueSize > 0) {
-                buffer.putInt(oldValueSize);
-                buffer.putInt(oldValueFieldCount);
-                writeTuple(buffer, oldValue, oldValueSize);
-            }
+    }
+
+    private void writeEntityInfo(ByteBuffer buffer) {
+        buffer.putInt(resourcePartition);
+        buffer.putInt(datasetId);
+        buffer.putInt(PKHashValue);
+        if (PKValueSize <= 0) {
+            throw new IllegalStateException("Primary Key Size is less than or equal to 0");
         }
-        if (logType == LogType.FLUSH) {
-            buffer.putInt(datasetId);
-        }
+        buffer.putInt(PKValueSize);
+        writePKValue(buffer);
     }
 
     @Override
     public void writeLogRecord(ByteBuffer buffer) {
         int beginOffset = buffer.position();
-        writeLogRecordCommonFields(buffer);
+        doWriteLogRecord(buffer);
         checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
         buffer.putLong(checksum);
     }
 
     @Override
     public void writeRemoteLogRecord(ByteBuffer buffer) {
-        writeLogRecordCommonFields(buffer);
+        doWriteLogRecord(buffer);
         if (logType == LogType.FLUSH) {
             buffer.putLong(LSN);
             buffer.putInt(numOfFlushedIndexes);
@@ -222,7 +228,7 @@
         int beginOffset = buffer.position();
 
         //read common fields
-        RecordReadStatus status = readLogCommonFields(buffer);
+        RecordReadStatus status = doReadLogRecord(buffer);
         if (status != RecordReadStatus.OK) {
             buffer.position(beginOffset);
             return status;
@@ -241,7 +247,7 @@
         return RecordReadStatus.OK;
     }
 
-    private RecordReadStatus readLogCommonFields(ByteBuffer buffer) {
+    private RecordReadStatus doReadLogRecord(ByteBuffer buffer) {
         //first we need the logtype and Job ID, if the buffer isn't that big, then no dice.
         if (buffer.remaining() < ALL_RECORD_HEADER_LEN) {
             return RecordReadStatus.TRUNCATED;
@@ -255,64 +261,88 @@
                     return RecordReadStatus.TRUNCATED;
                 }
                 datasetId = buffer.getInt();
-                resourceId = 0L;
+                resourceId = 0l;
+                // fall throuh
+            case LogType.WAIT:
+                computeAndSetLogSize();
                 break;
-            case LogType.ABORT:
             case LogType.JOB_COMMIT:
+            case LogType.ABORT:
                 datasetId = -1;
                 PKHashValue = -1;
+                computeAndSetLogSize();
                 break;
             case LogType.ENTITY_COMMIT:
             case LogType.UPSERT_ENTITY_COMMIT:
-                if (!readEntityInfo(buffer)) {
+                if (readEntityInfo(buffer)) {
+                    computeAndSetLogSize();
+                } else {
                     return RecordReadStatus.TRUNCATED;
                 }
                 break;
             case LogType.UPDATE:
-                if (!readEntityInfo(buffer)) {
+                if (readEntityInfo(buffer)) {
+                    if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) {
+                        return RecordReadStatus.TRUNCATED;
+                    }
+                    resourceId = buffer.getLong();
+                    logSize = buffer.getInt();
+                    newValueFieldCount = buffer.getInt();
+                    newOp = buffer.get();
+                    newValueSize = buffer.getInt();
+                    if (buffer.remaining() < newValueSize) {
+                        if (logSize > buffer.capacity()) {
+                            return RecordReadStatus.LARGE_RECORD;
+                        }
+                        return RecordReadStatus.TRUNCATED;
+                    }
+                    newValue = readTuple(buffer, readNewValue, newValueFieldCount, newValueSize);
+                    if (logSize > getUpdateLogSizeWithoutOldValue()) {
+                        // Prev Image exists
+                        if (buffer.remaining() < Integer.BYTES) {
+                            return RecordReadStatus.TRUNCATED;
+                        }
+                        oldValueSize = buffer.getInt();
+                        if (buffer.remaining() < Integer.BYTES) {
+                            return RecordReadStatus.TRUNCATED;
+                        }
+                        oldValueFieldCount = buffer.getInt();
+                        if (buffer.remaining() < oldValueSize) {
+                            return RecordReadStatus.TRUNCATED;
+                        }
+                        oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize);
+                    } else {
+                        oldValueSize = 0;
+                    }
+                } else {
                     return RecordReadStatus.TRUNCATED;
                 }
-                if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) {
+                break;
+            case LogType.MARKER:
+                if (buffer.remaining() < DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN) {
                     return RecordReadStatus.TRUNCATED;
                 }
-                resourceId = buffer.getLong();
+                datasetId = buffer.getInt();
+                resourcePartition = buffer.getInt();
+                prevMarkerLSN = buffer.getLong();
                 logSize = buffer.getInt();
-                newValueFieldCount = buffer.getInt();
-                newOp = buffer.get();
-                newValueSize = buffer.getInt();
-                return readEntity(buffer);
+                int lenRemaining = logSize - MARKER_BASE_LOG_SIZE;
+                if (buffer.remaining() < lenRemaining) {
+                    return RecordReadStatus.TRUNCATED;
+                }
+
+                if (marker == null || marker.capacity() < lenRemaining) {
+                    // TODO(amoudi): account for memory allocation
+                    marker = ByteBuffer.allocate(lenRemaining + Short.BYTES);
+                }
+                marker.clear();
+                buffer.get(marker.array(), 0, lenRemaining);
+                marker.position(lenRemaining);
+                marker.flip();
+                break;
             default:
                 break;
         }
-        computeAndSetLogSize();
-        return RecordReadStatus.OK;
-    }
-
-    private RecordReadStatus readEntity(ByteBuffer buffer) {
-        if (buffer.remaining() < newValueSize) {
-            if (logSize > buffer.capacity()) {
-                return RecordReadStatus.LARGE_RECORD;
-            }
-            return RecordReadStatus.TRUNCATED;
-        }
-        newValue = readTuple(buffer, readNewValue, newValueFieldCount, newValueSize);
-        if (logSize > getUpdateLogSizeWithoutOldValue()) {
-            // Prev Image exists
-            if (buffer.remaining() < Integer.BYTES) {
-                return RecordReadStatus.TRUNCATED;
-            }
-            oldValueSize = buffer.getInt();
-            if (buffer.remaining() < Integer.BYTES) {
-                return RecordReadStatus.TRUNCATED;
-            }
-            oldValueFieldCount = buffer.getInt();
-            if (buffer.remaining() < oldValueSize) {
-                return RecordReadStatus.TRUNCATED;
-            }
-            oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize);
-        } else {
-            oldValueSize = 0;
-        }
         return RecordReadStatus.OK;
     }
 
@@ -339,7 +369,7 @@
     @Override
     public void readRemoteLog(ByteBuffer buffer) {
         //read common fields
-        readLogCommonFields(buffer);
+        doReadLogRecord(buffer);
 
         if (logType == LogType.FLUSH) {
             LSN = buffer.getLong();
@@ -412,11 +442,18 @@
             case LogType.WAIT:
                 logSize = WAIT_LOG_SIZE;
                 break;
+            case LogType.MARKER:
+                setMarkerLogSize();
+                break;
             default:
                 throw new IllegalStateException("Unsupported Log Type");
         }
     }
 
+    private void setMarkerLogSize() {
+        logSize = MARKER_BASE_LOG_SIZE + marker.remaining();
+    }
+
     @Override
     public String getLogRecordForDisplay() {
         StringBuilder builder = new StringBuilder();
@@ -688,4 +725,28 @@
     public void setOldValueSize(int oldValueSize) {
         this.oldValueSize = oldValueSize;
     }
+
+    public void setMarker(ByteBuffer marker) {
+        this.marker = marker;
+    }
+
+    @Override
+    public boolean isMarker() {
+        return logType == LogType.MARKER;
+    }
+
+    @Override
+    public void logAppended(long lsn) {
+        callback.after(lsn);
+    }
+
+    @Override
+    public long getPreviousMarkerLSN() {
+        return prevMarkerLSN;
+    }
+
+    @Override
+    public ByteBuffer getMarker() {
+        return marker;
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
index 714b8f7..269e4b9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
@@ -27,6 +27,7 @@
     public static final byte FLUSH = 4;
     public static final byte UPSERT_ENTITY_COMMIT = 5;
     public static final byte WAIT = 6;
+    public static final byte MARKER = 7;
 
     private static final String STRING_UPDATE = "UPDATE";
     private static final String STRING_JOB_COMMIT = "JOB_COMMIT";
@@ -35,8 +36,8 @@
     private static final String STRING_FLUSH = "FLUSH";
     private static final String STRING_UPSERT_ENTITY_COMMIT = "UPSERT_ENTITY_COMMIT";
     private static final String STRING_WAIT = "WAIT";
-
-    private static final String STRING_INVALID_LOG_TYPE = "INVALID_LOG_TYPE";
+    private static final String STRING_MARKER = "MARKER";
+    private static final String STRING_UNKNOWN_LOG_TYPE = "UNKNOWN_LOG_TYPE";
 
     public static String toString(byte logType) {
         switch (logType) {
@@ -54,8 +55,10 @@
                 return STRING_UPSERT_ENTITY_COMMIT;
             case LogType.WAIT:
                 return STRING_WAIT;
+            case LogType.MARKER:
+                return STRING_MARKER;
             default:
-                return STRING_INVALID_LOG_TYPE;
+                return STRING_UNKNOWN_LOG_TYPE;
         }
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
new file mode 100644
index 0000000..7dae65f
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+
+/**
+ * A basic callback used to write marker to transaction logs
+ */
+public class PrimaryIndexLogMarkerCallback implements ILogMarkerCallback {
+
+    private AbstractLSMIndex index;
+
+    /**
+     * @param index:
+     *            a pointer to the primary index used to store marker log info
+     * @throws HyracksDataException
+     */
+    public PrimaryIndexLogMarkerCallback(AbstractLSMIndex index) throws HyracksDataException {
+        this.index = index;
+    }
+
+    @Override
+    public void before(ByteBuffer buffer) {
+        buffer.putLong(index.getCurrentMemoryComponent().getMostRecentMarkerLSN());
+    }
+
+    @Override
+    public void after(long lsn) {
+        index.getCurrentMemoryComponent().setMostRecentMarkerLSN(lsn);
+    }
+}
diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixConstants.java
similarity index 79%
copy from asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
copy to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixConstants.java
index 11a2510..4bca216 100644
--- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixConstants.java
@@ -16,10 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.event.util;
+package org.apache.asterix.common.utils;
 
+/**
+ * A static class that stores asterix constants
+ */
 public class AsterixConstants {
+    public static final String ASTERIX_ROOT_METADATA_DIR = "asterix_root_metadata";
 
-    public static String ASTERIX_ROOT_METADATA_DIR = "asterix_root_metadata";
+    private AsterixConstants() {
+    }
 
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/FrameStack.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/FrameStack.java
new file mode 100644
index 0000000..8c83687
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/FrameStack.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.utils;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Not thread safe stack that is used to store fixed size buffers in memory
+ * Once memory is consumed, it uses disk to store buffers
+ */
+public class FrameStack implements Closeable {
+    private static final AtomicInteger stackIdGenerator = new AtomicInteger(0);
+    private static final String STACK_FILE_NAME = "stack";
+    private final int stackId;
+    private final int frameSize;
+    private final int numOfMemoryFrames;
+    private final ArrayDeque<ByteBuffer> fullBuffers;
+    private final ArrayDeque<ByteBuffer> emptyBuffers;
+    private int totalWriteCount = 0;
+    private int totalReadCount = 0;
+    private final File file;
+    private final RandomAccessFile iostream;
+    private final byte[] frame;
+
+    /**
+     * Create a hybrid of memory and disk stack of byte buffers
+     *
+     * @param dir
+     * @param frameSize
+     * @param numOfMemoryFrames
+     * @throws HyracksDataException
+     * @throws FileNotFoundException
+     */
+    public FrameStack(String dir, int frameSize, int numOfMemoryFrames)
+            throws HyracksDataException, FileNotFoundException {
+        this.stackId = stackIdGenerator.getAndIncrement();
+        this.frameSize = frameSize;
+        this.numOfMemoryFrames = numOfMemoryFrames;
+        this.fullBuffers = numOfMemoryFrames <= 0 ? null : new ArrayDeque<>();
+        this.emptyBuffers = numOfMemoryFrames <= 0 ? null : new ArrayDeque<>();
+        this.file = StoragePathUtil.createFile(
+                ((dir == null) ? "" : (dir.endsWith(File.separator) ? dir : (dir + File.separator))) + STACK_FILE_NAME,
+                stackId);
+        this.iostream = new RandomAccessFile(file, "rw");
+        this.frame = new byte[frameSize];
+    }
+
+    /**
+     * @return the number of remaining frames to be read in the stack
+     */
+    public int remaining() {
+        return totalWriteCount - totalReadCount;
+    }
+
+    /**
+     * copy content of buffer into the stack
+     *
+     * @param buffer
+     * @throws IOException
+     */
+    public synchronized void push(ByteBuffer buffer) throws IOException {
+        int diff = totalWriteCount - totalReadCount;
+        if (diff < numOfMemoryFrames) {
+            ByteBuffer aBuffer = allocate();
+            aBuffer.put(buffer.array());
+            aBuffer.flip();
+            fullBuffers.push(aBuffer);
+        } else {
+            long position = (long) (diff - numOfMemoryFrames) * frameSize;
+            if (position != iostream.getFilePointer()) {
+                iostream.seek(position);
+            }
+            iostream.write(buffer.array());
+        }
+        totalWriteCount++;
+    }
+
+    private ByteBuffer allocate() {
+        ByteBuffer aBuffer = emptyBuffers.poll();
+        if (aBuffer == null) {
+            aBuffer = ByteBuffer.allocate(frameSize);
+        }
+        aBuffer.clear();
+        return aBuffer;
+    }
+
+    /**
+     * Free a frame off of the stack and copy it into dest
+     *
+     * @param dest
+     * @throws IOException
+     */
+    public synchronized void pop(ByteBuffer dest) throws IOException {
+        dest.clear();
+        int diff = totalWriteCount - totalReadCount - 1;
+        if (diff >= 0) {
+            if (diff < numOfMemoryFrames) {
+                totalReadCount++;
+                ByteBuffer aBuffer = fullBuffers.pop();
+                emptyBuffers.push(aBuffer);
+                dest.put(aBuffer.array());
+            } else {
+                long position = (long) (diff - numOfMemoryFrames) * frameSize;
+                iostream.seek(position);
+                iostream.readFully(frame);
+                dest.put(frame);
+            }
+        }
+        dest.flip();
+    }
+
+    /**
+     * Closing this stack will result in the data being deleted
+     *
+     * @throws IOException
+     */
+    @Override
+    public void close() throws IOException {
+        iostream.close();
+        Files.delete(file.toPath());
+    }
+
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 78b06fb..615e8af 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -24,12 +24,16 @@
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 
 public class StoragePathUtil {
+    private static final Logger LOGGER = Logger.getLogger(StoragePathUtil.class.getName());
     public static final String PARTITION_DIR_PREFIX = "partition_";
     public static final String TEMP_DATASETS_STORAGE_FOLDER = "temp";
     public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_";
@@ -70,4 +74,39 @@
     public static int getPartitionNumFromName(String name) {
         return Integer.parseInt(name.substring(PARTITION_DIR_PREFIX.length()));
     }
+
+    /**
+     * Create a file
+     * Note: this method is not thread safe. It is the responsibility of the caller to ensure no path conflict when
+     * creating files simultaneously
+     *
+     * @param name
+     * @param count
+     * @return
+     * @throws HyracksDataException
+     */
+    public static File createFile(String name, int count) throws HyracksDataException {
+        try {
+            String fileName = name + "_" + count;
+            File file = new File(fileName);
+            if (file.getParentFile() != null) {
+                file.getParentFile().mkdirs();
+            }
+            if (!file.exists()) {
+                boolean success = file.createNewFile();
+                if (!success) {
+                    throw new HyracksDataException("Unable to create spill file " + fileName);
+                } else {
+                    if (LOGGER.isEnabledFor(Level.INFO)) {
+                        LOGGER.info("Created spill file " + file.getAbsolutePath());
+                    }
+                }
+            } else {
+                throw new HyracksDataException("spill file " + fileName + " already exists");
+            }
+            return file;
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
index 1d5b15e..2878d5a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
@@ -18,9 +18,12 @@
  */
 package org.apache.asterix.common.utils;
 
+import java.nio.ByteBuffer;
+
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
@@ -64,4 +67,17 @@
         logRecord.computeAndSetPKValueSize();
         logRecord.computeAndSetLogSize();
     }
+
+    public static void formMarkerLogRecord(LogRecord logRecord, ITransactionContext txnCtx, int datasetId,
+            int resourcePartition, ByteBuffer marker) {
+        logRecord.setTxnCtx(txnCtx);
+        logRecord.setLogSource(LogSource.LOCAL);
+        logRecord.setLogType(LogType.MARKER);
+        logRecord.setJobId(txnCtx.getJobId().getId());
+        logRecord.setDatasetId(datasetId);
+        logRecord.setResourcePartition(resourcePartition);
+        marker.get(); // read the first byte since it is not part of the marker object
+        logRecord.setMarker(marker);
+        logRecord.computeAndSetLogSize();
+    }
 }
diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
index b19a722..1780a51 100644
--- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
+++ b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
@@ -27,6 +27,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.asterix.common.utils.AsterixConstants;
 import org.apache.asterix.event.driver.EventDriver;
 import org.apache.asterix.event.error.VerificationUtil;
 import org.apache.asterix.event.model.AsterixInstance;
@@ -72,8 +73,8 @@
 
         for (Node node : cluster.getNode()) {
             if (copyHyracksToNC) {
-                Pattern copyHyracksForNC = createCopyHyracksPattern(asterixInstanceName, cluster, node.getClusterIp(),
-                        destDir);
+                Pattern copyHyracksForNC =
+                        createCopyHyracksPattern(asterixInstanceName, cluster, node.getClusterIp(), destDir);
                 ps.add(copyHyracksForNC);
             }
         }
@@ -389,8 +390,8 @@
         Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
         String username = cluster.getUsername() != null ? cluster.getUsername() : System.getProperty("user.name");
         String workingDir = cluster.getWorkingDir().getDir();
-        String destDir = workingDir + File.separator + "library" + File.separator + dataverse + File.separator
-                + libraryName;
+        String destDir =
+                workingDir + File.separator + "library" + File.separator + dataverse + File.separator + libraryName;
         String fileToTransfer = new File(libraryPath).getAbsolutePath();
 
         Iterator<Node> installTargets = cluster.getNode().iterator();
@@ -434,8 +435,8 @@
         patternList.add(p);
 
         Iterator<Node> uninstallTargets = cluster.getNode().iterator();
-        String libDir = workingDir + File.separator + "library" + File.separator + dataverse + File.separator
-                + libraryName;
+        String libDir =
+                workingDir + File.separator + "library" + File.separator + dataverse + File.separator + libraryName;
         Node uninstallNode = uninstallTargets.next();
         nodeid = new Nodeid(new Value(null, uninstallNode.getId()));
         event = new Event("file_delete", nodeid, libDir);
@@ -606,8 +607,8 @@
         String username = cluster.getUsername() == null ? System.getProperty("user.name") : cluster.getUsername();
         String srcHost = cluster.getMasterNode().getClientIp();
         Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
-        String srcDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir()
-                : cluster.getMasterNode().getLogDir();
+        String srcDir =
+                cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir() : cluster.getMasterNode().getLogDir();
         String destDir = outputDir + File.separator + "cc";
         String pargs = username + " " + srcHost + " " + srcDir + " " + destDir;
         Event event = new Event("directory_copy", nodeid, pargs);
diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedMarker.java
similarity index 66%
copy from asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
copy to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedMarker.java
index 11a2510..487b47d 100644
--- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedMarker.java
@@ -16,10 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.event.util;
+package org.apache.asterix.external.api;
 
-public class AsterixConstants {
+import org.apache.hyracks.api.comm.VSizeFrame;
 
-    public static String ASTERIX_ROOT_METADATA_DIR = "asterix_root_metadata";
+public interface IFeedMarker {
+
+    /**
+     * Mark the frame with a mark denoting the progress of the feed
+     * The mark will be eventually written to the transaction log
+     *
+     * @param mark
+     *            a frame to write the progress mark in
+     * @return
+     */
+    public boolean mark(VSizeFrame mark);
 
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
index 0f5ada4..9d9ff28 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 
+@FunctionalInterface
 public interface IRecordConverter<I, O> {
 
     public O convert(IRawRecord<? extends I> input) throws IOException;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index 9cce1c9..08ffe18 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -73,4 +73,8 @@
      * gives the record reader a chance to recover from IO errors during feed intake
      */
     public boolean handleException(Throwable th);
+
+    public default IFeedMarker getProgressReporter() {
+        return null;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
index a301ac9..7806489 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -34,9 +34,9 @@
 
     public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
             final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader)
+            final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader, boolean sendMarker)
             throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
         this.dataParser = dataParser;
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index b47d278..7d65c52 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -32,9 +32,9 @@
 
     public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
             final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader)
-                    throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+            final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader, boolean sendMarker)
+            throws HyracksDataException {
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 87daffa..be9056b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -19,10 +19,15 @@
 package org.apache.asterix.external.dataflow;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nonnull;
 
+import org.apache.asterix.external.api.IFeedMarker;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordReader;
@@ -30,9 +35,13 @@
 import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.log4j.Logger;
 
 public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
@@ -40,38 +49,52 @@
     protected final IRecordDataParser<T> dataParser;
     protected final IRecordReader<? extends T> recordReader;
     protected final AtomicBoolean closed = new AtomicBoolean(false);
-    protected final long interval = 1000;
+    protected static final long INTERVAL = 1000;
+    protected final Object mutex = new Object();
+    protected final boolean sendMarker;
     protected boolean failed = false;
 
     public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
             @Nonnull FeedLogManager feedLogManager, int numOfOutputFields, @Nonnull IRecordDataParser<T> dataParser,
-            @Nonnull IRecordReader<T> recordReader) throws HyracksDataException {
+            @Nonnull IRecordReader<T> recordReader, boolean sendMarker) throws HyracksDataException {
         super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
         this.dataParser = dataParser;
         this.recordReader = recordReader;
+        this.sendMarker = sendMarker;
         recordReader.setFeedLogManager(feedLogManager);
         recordReader.setController(this);
     }
 
     @Override
     public void start(IFrameWriter writer) throws HyracksDataException {
+        ExecutorService executorService = sendMarker ? Executors.newSingleThreadExecutor() : null;
+        Future<?> result = null;
+        if (sendMarker) {
+            DataflowMarker dataflowMarker = new DataflowMarker(recordReader.getProgressReporter(),
+                    TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx));
+            result = executorService.submit(dataflowMarker);
+        }
         HyracksDataException hde = null;
         try {
             failed = false;
             tupleForwarder.initialize(ctx, writer);
             while (recordReader.hasNext()) {
-                IRawRecord<? extends T> record = recordReader.next();
-                if (record == null) {
-                    flush();
-                    Thread.sleep(interval);
-                    continue;
+                // synchronized on mutex before we call next() so we don't a marker before its record
+                synchronized (mutex) {
+                    IRawRecord<? extends T> record = recordReader.next();
+                    if (record == null) {
+                        flush();
+                        wait(INTERVAL);
+                        continue;
+                    }
+                    tb.reset();
+                    parseAndForward(record);
                 }
-                tb.reset();
-                parseAndForward(record);
             }
         } catch (InterruptedException e) {
             //TODO: Find out what could cause an interrupted exception beside termination of a job/feed
-            LOGGER.warn("Feed has been interrupted. Closing the feed");
+            LOGGER.warn("Feed has been interrupted. Closing the feed", e);
+            Thread.currentThread().interrupt();
         } catch (Exception e) {
             failed = true;
             tupleForwarder.flush();
@@ -90,10 +113,13 @@
             hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
         } finally {
             closeSignal();
-            if (hde != null) {
-                throw hde;
+            if (sendMarker && result != null) {
+                result.cancel(true);
             }
         }
+        if (hde != null) {
+            throw hde;
+        }
     }
 
     private void parseAndForward(IRawRecord<? extends T> record) throws IOException {
@@ -170,4 +196,53 @@
         // This is not a parser record. most likely, this error happened in the record reader.
         return recordReader.handleException(th);
     }
+
+    private class DataflowMarker implements Runnable {
+        private final IFeedMarker marker;
+        private final VSizeFrame mark;
+        private volatile boolean stopped = false;
+
+        public DataflowMarker(IFeedMarker marker, VSizeFrame mark) {
+            this.marker = marker;
+            this.mark = mark;
+        }
+
+        public synchronized void stop() {
+            stopped = true;
+            notify();
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    synchronized (this) {
+                        if (!stopped) {
+                            // TODO (amoudi): find a better reactive way to do this
+                            // sleep for two seconds
+                            wait(TimeUnit.SECONDS.toMillis(2));
+                        } else {
+                            break;
+                        }
+                    }
+                    synchronized (mutex) {
+                        if (marker.mark(mark)) {
+                            // broadcast
+                            tupleForwarder.flush();
+                            // clear
+                            mark.getBuffer().clear();
+                            mark.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+                            mark.getBuffer().flip();
+                        }
+                    }
+                }
+            } catch (InterruptedException e) {
+                LOGGER.warn("Marker stopped", e);
+                Thread.currentThread().interrupt();
+                return;
+            } catch (Exception e) {
+                LOGGER.warn("Marker stopped", e);
+            }
+        }
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index f1eb870..36c6c2f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -30,10 +30,12 @@
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 
 public class FeedTupleForwarder implements ITupleForwarder {
 
@@ -59,7 +61,7 @@
             this.writer = writer;
             this.appender = new FrameTupleAppender(frame);
             // Set null feed message
-            VSizeFrame message = (VSizeFrame) ctx.getSharedObject();
+            VSizeFrame message = TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
             // a null message
             message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
             message.getBuffer().flip();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index 44aac60..790289a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -32,8 +32,8 @@
 
     public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
             FeedLogManager feedLogManager, int numOfOutputFields, IRecordWithMetadataParser<T> dataParser,
-            IRecordReader<T> recordReader) throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+            IRecordReader<T> recordReader, boolean sendMarker) throws HyracksDataException {
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 2e1c83f..6159908 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -72,11 +72,12 @@
             IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool framePool)
             throws HyracksDataException {
         this.writer = writer;
+
         this.spiller =
-                new FrameSpiller(ctx,
+                fpa.spillToDiskOnCongestion() ? new FrameSpiller(ctx,
                         connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_"
                                 + runtimeId.getFeedRuntimeType() + "_" + runtimeId.getPartition(),
-                        fpa.getMaxSpillOnDisk());
+                        fpa.getMaxSpillOnDisk()) : null;
         this.exceptionHandler = new FeedExceptionHandler(ctx, fta);
         this.fpa = fpa;
         this.framePool = framePool;
@@ -122,7 +123,9 @@
             LOGGER.log(Level.WARNING, th.getMessage(), th);
         }
         try {
-            spiller.close();
+            if (spiller != null) {
+                spiller.close();
+            }
         } catch (Throwable th) {
             LOGGER.log(Level.WARNING, th.getMessage(), th);
         }
@@ -459,34 +462,34 @@
                     frame = inbox.poll();
                     if (frame == null) {
                         // Memory queue is empty. Check spill
-                        frame = spiller.next();
-                        while (frame != null) {
-                            if (consume(frame) != null) {
-                                // We don't release the frame since this is a spill frame that we didn't get from memory
-                                // manager
-                                return;
-                            }
+                        if (spiller != null) {
                             frame = spiller.next();
+                            while (frame != null) {
+                                if (consume(frame) != null) {
+                                    // We don't release the frame since this is a spill frame that we didn't get from memory
+                                    // manager
+                                    return;
+                                }
+                                frame = spiller.next();
+                            }
                         }
                         writer.flush();
                         // At this point. We consumed all memory and spilled
                         // We can't assume the next will be in memory. what if there is 0 memory?
                         synchronized (mutex) {
                             frame = inbox.poll();
-                            if (frame == null) {
-                                // Nothing in memory
-                                if (spiller.switchToMemory()) {
-                                    if (poisoned) {
-                                        break;
-                                    }
-                                    if (DEBUG) {
-                                        LOGGER.info("Consumer is going to sleep");
-                                    }
-                                    // Nothing in disk
-                                    mutex.wait();
-                                    if (DEBUG) {
-                                        LOGGER.info("Consumer is waking up");
-                                    }
+                            // Nothing in memory
+                            if (frame == null && (spiller == null || spiller.switchToMemory())) {
+                                if (poisoned) {
+                                    break;
+                                }
+                                if (DEBUG) {
+                                    LOGGER.info("Consumer is going to sleep");
+                                }
+                                // Nothing in disk
+                                mutex.wait();
+                                if (DEBUG) {
+                                    LOGGER.info("Consumer is waking up");
                                 }
                             }
                         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
index a2f19bb..09e03a3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
@@ -28,10 +28,10 @@
 import java.nio.file.Files;
 import java.util.ArrayDeque;
 
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 /**
@@ -49,28 +49,30 @@
     private final String fileNamePrefix;
     private final ArrayDeque<File> files = new ArrayDeque<>();
     private final VSizeFrame frame;
-    private final int budget;           // Max current frames in disk allowed
-    private BufferedOutputStream bos;   // Current output stream
-    private BufferedInputStream bis;    // Current input stream
-    private File currentWriteFile;      // Current write file
-    private File currentReadFile;       // Current read file
-    private int currentWriteCount = 0;  // Current file write count
-    private int currentReadCount = 0;   // Current file read count
-    private int totalWriteCount = 0;    // Total frames spilled
-    private int totalReadCount = 0;     // Total frames read
-    private int fileCount = 0;          // How many spill files?
+    private final int budget; // Max current frames in disk allowed
+    private BufferedOutputStream bos; // Current output stream
+    private BufferedInputStream bis; // Current input stream
+    private File currentWriteFile; // Current write file
+    private File currentReadFile; // Current read file
+    private int currentWriteCount = 0; // Current file write count
+    private int currentReadCount = 0; // Current file read count
+    private int totalWriteCount = 0; // Total frames spilled
+    private int totalReadCount = 0; // Total frames read
+    private int fileCount = 0; // How many spill files?
 
     public FrameSpiller(IHyracksTaskContext ctx, String fileNamePrefix, long budgetInBytes)
             throws HyracksDataException {
         this.frame = new VSizeFrame(ctx);
         this.fileNamePrefix = fileNamePrefix;
-        this.budget = (int) (budgetInBytes / ctx.getInitialFrameSize());
-
+        this.budget = (int) Math.min(budgetInBytes / ctx.getInitialFrameSize(), Integer.MAX_VALUE);
+        if (budget <= 0) {
+            throw new HyracksDataException("Invalid budget " + budgetInBytes + ". Budget must be larger than 0");
+        }
     }
 
     public void open() throws HyracksDataException {
         try {
-            this.currentWriteFile = createFile();
+            this.currentWriteFile = StoragePathUtil.createFile(fileNamePrefix, fileCount++);
             this.currentReadFile = currentWriteFile;
             this.bos = new BufferedOutputStream(new FileOutputStream(currentWriteFile));
             this.bis = new BufferedInputStream(new FileInputStream(currentReadFile));
@@ -135,7 +137,7 @@
     }
 
     public double usedBudget() {
-        return ((double) (totalWriteCount - totalReadCount) / (double) budget);
+        return (double) (totalWriteCount - totalReadCount) / (double) budget;
     }
 
     public synchronized boolean spill(ByteBuffer frame) throws HyracksDataException {
@@ -150,7 +152,7 @@
             if (currentWriteCount >= FRAMES_PER_FILE) {
                 bos.close();
                 currentWriteCount = 0;
-                currentWriteFile = createFile();
+                currentWriteFile = StoragePathUtil.createFile(fileNamePrefix, fileCount++);
                 files.add(currentWriteFile);
                 bos = new BufferedOutputStream(new FileOutputStream(currentWriteFile));
             }
@@ -161,26 +163,6 @@
         }
     }
 
-    private File createFile() throws HyracksDataException {
-        try {
-            String fileName = fileNamePrefix + "_" + fileCount++;
-            File file = new File(fileName);
-            if (!file.exists()) {
-                boolean success = file.createNewFile();
-                if (!success) {
-                    throw new HyracksDataException("Unable to create spill file " + fileName);
-                } else {
-                    if (LOGGER.isEnabledFor(Level.INFO)) {
-                        LOGGER.info("Created spill file " + file.getAbsolutePath());
-                    }
-                }
-            }
-            return file;
-        } catch (Throwable th) {
-            throw new HyracksDataException(th);
-        }
-    }
-
     public synchronized void close() {
         // Do proper cleanup
         if (bos != null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index 8ee3e2b..9661890 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -28,6 +28,8 @@
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 
 public class IngestionRuntime extends SubscribableRuntime {
 
@@ -48,8 +50,9 @@
         dWriter.subscribe(collector);
         subscribers.add(collectionRuntime);
         if (numSubscribers == 0) {
-            ctx.setSharedObject(new VSizeFrame(ctx));
-            collectionRuntime.getCtx().setSharedObject(ctx.getSharedObject());
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE,
+                    TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx), collectionRuntime.getCtx());
             adapterRuntimeManager.start();
         }
         numSubscribers++;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 54e17ef..37a42a7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -40,7 +40,9 @@
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 /*
@@ -112,7 +114,7 @@
         this.feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getFeedManager();
         this.message = new VSizeFrame(ctx);
-        ctx.setSharedObject(message);
+        TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
         this.opDesc = feedMetaOperatorDescriptor;
         this.recordDescProvider = recordDescProvider;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 6f679f7..95bebad 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -41,7 +41,9 @@
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -107,7 +109,7 @@
                 .getApplicationObject()).getFeedManager();
         this.targetId = targetId;
         this.message = new VSizeFrame(ctx);
-        ctx.setSharedObject(message);
+        TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
         this.recordDescProvider = recordDescProvider;
         this.opDesc = feedMetaOperatorDescriptor;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 4ad08b3..98cb4b0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -59,7 +59,7 @@
     public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx,
             int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory,
             Map<String, String> configuration, boolean indexingOp, boolean isFeed, FeedLogManager feedLogManager)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         try {
             switch (dataSourceFactory.getDataSourceType()) {
                 case RECORDS:
@@ -67,6 +67,7 @@
                     IRecordReader<?> recordReader = recordReaderFactory.createRecordReader(ctx, partition);
                     IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
                     IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
+                    boolean sendMarker = ExternalDataUtils.isSendMarker(configuration);
                     if (indexingOp) {
                         return new IndexingDataFlowController(ctx,
                                 DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser,
@@ -80,18 +81,19 @@
                             if (isChangeFeed) {
                                 int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
                                 return new ChangeFeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager,
-                                        numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader);
+                                        numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader,
+                                        sendMarker);
                             } else {
                                 return new FeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager, 2,
-                                        (IRecordWithMetadataParser) dataParser, recordReader);
+                                        (IRecordWithMetadataParser) dataParser, recordReader, sendMarker);
                             }
                         } else if (isChangeFeed) {
                             int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
                             return new ChangeFeedDataFlowController(ctx, tupleForwarder, feedLogManager, numOfKeys + 1,
-                                    (IRecordWithPKDataParser) dataParser, recordReader);
+                                    (IRecordWithPKDataParser) dataParser, recordReader, sendMarker);
                         } else {
                             return new FeedRecordDataFlowController(ctx, tupleForwarder, feedLogManager, 1, dataParser,
-                                    recordReader);
+                                    recordReader, sendMarker);
                         }
                     } else {
                         return new RecordDataFlowController(ctx,
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
index ad945f2..ed811ad 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
 public class DataflowUtils {
     public static void addTupleToFrame(FrameTupleAppender appender, ArrayTupleBuilder tb, IFrameWriter writer)
@@ -67,4 +68,14 @@
                 throw new HyracksDataException("Unknown tuple forward policy");
         }
     }
+
+    public static void addTupleToFrame(FrameTupleAppender appender, ITupleReference tuple, IFrameWriter writer)
+            throws HyracksDataException {
+        if (!appender.append(tuple)) {
+            appender.write(writer, true);
+            if (!appender.append(tuple)) {
+                throw new HyracksDataException("Tuple is too large for a frame");
+            }
+        }
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 55dee04..e251f32 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -46,6 +46,8 @@
     public static final String KEY_FILESYSTEM = "fs";
     // specifies the address of the HDFS name node
     public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
+    // specifies whether a feed sends progress markers or not
+    public static final String KEY_SEND_MARKER = "send-marker";
     // specifies the class implementation of the accessed instance of HDFS
     public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
     public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 19781f9..23cd39c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -332,4 +332,8 @@
         }
         return intIndicators;
     }
+
+    public static boolean isSendMarker(Map<String, String> configuration) {
+        return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_SEND_MARKER));
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 6b7eb31..6e1b9e8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -64,6 +64,9 @@
         SPILL,              // Memory budget has been consumed. Now we're writing to disk
         DISCARD             // Memory and Disk space budgets have been consumed. Now we're discarding
     }
+    
+    private FeedUtils() {
+    }
 
     private static String prepareDataverseFeedName(String dataverseName, String feedName) {
         return dataverseName + File.separator + feedName;
@@ -87,7 +90,7 @@
             throw new AsterixException("Can't create file splits for adapter with count partitioning constraints");
         }
         String[] locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
-        List<FileSplit> splits = new ArrayList<FileSplit>();
+        List<FileSplit> splits = new ArrayList<>();
         for (String nd : locations) {
             splits.add(splitsForAdapter(dataverseName, feedName, nd,
                     AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0]));
@@ -120,6 +123,7 @@
         int offset = fta.getTupleStartOffset(tc);
         int len = fta.getTupleLength(tc);
         int newSize = FrameHelper.calcAlignedFrameSizeToStore(1, len, message.getMinSize());
+        message.reset();
         message.ensureFrameSize(newSize);
         message.getBuffer().clear();
         message.getBuffer().put(input.array(), offset, len);
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
index 820ae5f..7a5b722 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
@@ -22,6 +22,7 @@
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 
 public class CreateDataverseStatement extends Statement {
 
@@ -31,11 +32,7 @@
 
     public CreateDataverseStatement(Identifier dataverseName, String format, boolean ifNotExists) {
         this.dataverseName = dataverseName;
-        if (format == null) {
-            this.format = "org.apache.asterix.runtime.formats.NonTaggedDataFormat";
-        } else {
-            this.format = format;
-        }
+        this.format = (format == null) ? NonTaggedDataFormat.class.getName() : format;
         this.ifNotExists = ifNotExists;
     }
 
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java
index 8738c97..000339f7 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java
@@ -58,7 +58,7 @@
         return (((long) (bytes[offset] & 0xff)) << 56) + (((long) (bytes[offset + 1] & 0xff)) << 48)
                 + (((long) (bytes[offset + 2] & 0xff)) << 40) + (((long) (bytes[offset + 3] & 0xff)) << 32)
                 + (((long) (bytes[offset + 4] & 0xff)) << 24) + (((long) (bytes[offset + 5] & 0xff)) << 16)
-                + (((long) (bytes[offset + 6] & 0xff)) << 8) + (((long) (bytes[offset + 7] & 0xff)) << 0);
+                + (((long) (bytes[offset + 6] & 0xff)) << 8) + (bytes[offset + 7] & 0xff);
     }
 
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index cabfc77..d247490 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -119,8 +119,9 @@
         lsmComponentLSNMappingService = new LSMComponentsSyncService();
         replicationNotifier = new ReplicationNotifier();
         replicationThreads = Executors.newCachedThreadPool(appContext.getThreadFactory());
-        Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
-                .getAppContext()).getMetadataProperties().getNodePartitions();
+        Map<String, ClusterPartition[]> nodePartitions =
+                ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider.getAppContext()).getMetadataProperties()
+                        .getNodePartitions();
         Set<String> nodeReplicationClients = replicationProperties.getNodeReplicationClients(nodeId);
         List<Integer> clientsPartitions = new ArrayList<>();
         for (String clientId : nodeReplicationClients) {
@@ -141,8 +142,8 @@
         try {
             serverSocketChannel = ServerSocketChannel.open();
             serverSocketChannel.configureBlocking(true);
-            InetSocketAddress replicationChannelAddress = new InetSocketAddress(InetAddress.getByName(nodeIP),
-                    dataPort);
+            InetSocketAddress replicationChannelAddress =
+                    new InetSocketAddress(InetAddress.getByName(nodeIP), dataPort);
             serverSocketChannel.socket().bind(replicationChannelAddress);
             lsmComponentLSNMappingService.start();
             replicationNotifier.start();
@@ -169,8 +170,9 @@
         if (remainingFile == 0) {
             if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null
                     && replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) {
-                int remainingIndexes = replicaUniqueLSN2RemoteMapping
-                        .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet();
+                int remainingIndexes =
+                        replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes
+                                .decrementAndGet();
                 if (remainingIndexes == 0) {
                     /**
                      * Note: there is a chance that this will never be removed because some
@@ -216,8 +218,8 @@
         public void run() {
             Thread.currentThread().setName("Replication Thread");
             try {
-                ReplicationRequestType replicationFunction = ReplicationProtocol.getRequestType(socketChannel,
-                        inBuffer);
+                ReplicationRequestType replicationFunction =
+                        ReplicationProtocol.getRequestType(socketChannel, inBuffer);
                 while (replicationFunction != ReplicationRequestType.GOODBYE) {
                     switch (replicationFunction) {
                         case REPLICATE_LOG:
@@ -281,8 +283,8 @@
             Set<Integer> datasetsToForceFlush = new HashSet<>();
             for (IndexInfo iInfo : openIndexesInfo) {
                 if (requestedIndexesToBeFlushed.contains(iInfo.getResourceId())) {
-                    AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex()
-                            .getIOOperationCallback();
+                    AbstractLSMIOOperationCallback ioCallback =
+                            (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
                     //if an index has a pending flush, then the request to flush it will succeed.
                     if (ioCallback.hasPendingFlush()) {
                         //remove index to indicate that it will be flushed
@@ -373,8 +375,9 @@
             List<String> filesList;
             Set<String> replicaIds = request.getReplicaIds();
             Set<String> requesterExistingFiles = request.getExistingFiles();
-            Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) appContextProvider
-                    .getAppContext()).getMetadataProperties().getNodePartitions();
+            Map<String, ClusterPartition[]> nodePartitions =
+                    ((IAsterixPropertiesProvider) appContextProvider.getAppContext()).getMetadataProperties()
+                            .getNodePartitions();
             for (String replicaId : replicaIds) {
                 //get replica partitions
                 ClusterPartition[] replicaPatitions = nodePartitions.get(replicaId);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index 857f1e2..afd6019 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -25,6 +25,8 @@
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.transactions.ILogMarkerCallback;
+import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
@@ -41,6 +43,7 @@
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
 import org.apache.hyracks.storage.am.common.api.IIndexCursor;
@@ -114,8 +117,11 @@
         writer.open();
         indexHelper.open();
         index = indexHelper.getIndexInstance();
-
         try {
+            if (ctx.getSharedObject() != null) {
+                PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) index);
+                TaskUtils.putInSharedMap(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
+            }
             missingTupleBuilder = new ArrayTupleBuilder(1);
             DataOutput out = missingTupleBuilder.getDataOutput();
             try {
@@ -135,8 +141,8 @@
                     .createSearchOperationCallback(indexHelper.getResourceID(), ctx, this));
             cursor = indexAccessor.createSearchCursor(false);
             frameTuple = new FrameTupleReference();
-            IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                    .getApplicationContext().getApplicationObject();
+            IAsterixAppRuntimeContext runtimeCtx =
+                    (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
             AsterixLSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
                     runtimeCtx.getTransactionSubsystem().getLogManager());
         } catch (Exception e) {
@@ -241,10 +247,7 @@
                 writeOutput(i, recordWasInserted);
                 i++;
             }
-            if (tupleCount > 0) {
-                // All tuples has to move forward to maintain the correctness of the transaction pipeline
-                appender.write(writer, true);
-            }
+            appender.write(writer, true);
         } catch (IndexException | IOException | AsterixException e) {
             throw new HyracksDataException(e);
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 94d2a8c..9a66aa5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -80,8 +80,8 @@
 
     public LogManager(TransactionSubsystem txnSubsystem) {
         this.txnSubsystem = txnSubsystem;
-        logManagerProperties = new LogManagerProperties(this.txnSubsystem.getTransactionProperties(),
-                this.txnSubsystem.getId());
+        logManagerProperties =
+                new LogManagerProperties(this.txnSubsystem.getTransactionProperties(), this.txnSubsystem.getId());
         logFileSize = logManagerProperties.getLogPartitionSize();
         logPageSize = logManagerProperties.getLogPageSize();
         numLogPages = logManagerProperties.getNumLogPages();
@@ -172,6 +172,9 @@
         if (logRecord.getLogType() == LogType.FLUSH) {
             logRecord.setLSN(appendLSN.get());
         }
+        if (logRecord.isMarker()) {
+            logRecord.logAppended(appendLSN.get());
+        }
         appendLSN.addAndGet(logRecord.getLogSize());
     }
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index afb926b..0183b29 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -92,7 +92,7 @@
     private final long SHARP_CHECKPOINT_LSN = -1;
     private final boolean replicationEnabled;
     public static final long NON_SHARP_CHECKPOINT_TARGET_LSN = -1;
-    private final static String RECOVERY_FILES_DIR_NAME = "recovery_temp";
+    private static final String RECOVERY_FILES_DIR_NAME = "recovery_temp";
     private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null;
     private final long cachedEntityCommitsPerJobSize;
     private final PersistentLocalResourceRepository localResourceRepository;
@@ -108,8 +108,8 @@
         this.txnSubsystem = txnSubsystem;
         logMgr = (LogManager) txnSubsystem.getLogManager();
         checkpointHistory = txnSubsystem.getTransactionProperties().getCheckpointHistory();
-        IAsterixPropertiesProvider propertiesProvider = (IAsterixPropertiesProvider) txnSubsystem
-                .getAsterixAppRuntimeContextProvider().getAppContext();
+        IAsterixPropertiesProvider propertiesProvider =
+                (IAsterixPropertiesProvider) txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext();
         replicationEnabled = propertiesProvider.getReplicationProperties().isReplicationEnabled();
         localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getLocalResourceRepository();
@@ -271,6 +271,7 @@
                     break;
                 case LogType.FLUSH:
                 case LogType.WAIT:
+                case LogType.MARKER:
                     break;
                 default:
                     throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
@@ -361,10 +362,10 @@
                                 //if index is not registered into IndexLifeCycleManager,
                                 //create the index using LocalMetadata stored in LocalResourceRepository
                                 //get partition path in this node
-                                String partitionIODevicePath = localResourceRepository
-                                        .getPartitionPath(localResource.getPartition());
-                                String resourceAbsolutePath = partitionIODevicePath + File.separator
-                                        + localResource.getResourceName();
+                                String partitionIODevicePath =
+                                        localResourceRepository.getPartitionPath(localResource.getPartition());
+                                String resourceAbsolutePath =
+                                        partitionIODevicePath + File.separator + localResource.getResourceName();
                                 localResource.setResourcePath(resourceAbsolutePath);
                                 index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
                                 if (index == null) {
@@ -379,8 +380,8 @@
                                     //#. get maxDiskLastLSN
                                     ILSMIndex lsmIndex = index;
                                     try {
-                                        maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex
-                                                .getIOOperationCallback())
+                                        maxDiskLastLsn =
+                                                ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
                                                         .getComponentLSN(lsmIndex.getImmutableComponents());
                                     } catch (HyracksDataException e) {
                                         datasetLifecycleManager.close(resourceAbsolutePath);
@@ -405,6 +406,7 @@
                     case LogType.ABORT:
                     case LogType.FLUSH:
                     case LogType.UPSERT_ENTITY_COMMIT:
+                    case LogType.MARKER:
                         //do nothing
                         break;
                     default:
@@ -443,8 +445,8 @@
         //right after the new checkpoint file is written.
         File[] prevCheckpointFiles = getPreviousCheckpointFiles();
 
-        IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getDatasetLifecycleManager();
+        IDatasetLifecycleManager datasetLifecycleManager =
+                txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
         //flush all in-memory components if it is the sharp checkpoint
         if (isSharpCheckpoint) {
             datasetLifecycleManager.flushAllDatasets();
@@ -467,8 +469,8 @@
                         Set<Integer> deadReplicasPartitions = new HashSet<>();
                         //get partitions of the dead replicas that are not active on this node
                         for (String deadReplicaId : deadReplicaIds) {
-                            ClusterPartition[] nodePartitons = metadataProperties.getNodePartitions()
-                                    .get(deadReplicaId);
+                            ClusterPartition[] nodePartitons =
+                                    metadataProperties.getNodePartitions().get(deadReplicaId);
                             for (ClusterPartition partition : nodePartitons) {
                                 if (!localResourceRepository.getActivePartitions()
                                         .contains(partition.getPartitionId())) {
@@ -492,8 +494,8 @@
                 datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(nonSharpCheckpointTargetLSN);
                 if (replicationEnabled) {
                     //request remote replicas to flush lagging indexes
-                    IReplicationManager replicationManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                            .getAppContext().getReplicationManager();
+                    IReplicationManager replicationManager =
+                            txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicationManager();
                     try {
                         replicationManager.requestFlushLaggingReplicaIndexes(nonSharpCheckpointTargetLSN);
                     } catch (IOException e) {
@@ -564,16 +566,16 @@
 
     @Override
     public long getLocalMinFirstLSN() throws HyracksDataException {
-        IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getDatasetLifecycleManager();
+        IDatasetLifecycleManager datasetLifecycleManager =
+                txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
         List<IIndex> openIndexList = datasetLifecycleManager.getOpenIndexes();
         long firstLSN;
         //the min first lsn can only be the current append or smaller
         long minFirstLSN = logMgr.getAppendLSN();
         if (openIndexList.size() > 0) {
             for (IIndex index : openIndexList) {
-                AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) index)
-                        .getIOOperationCallback();
+                AbstractLSMIOOperationCallback ioCallback =
+                        (AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback();
 
                 if (!((AbstractLSMIndex) index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
                     firstLSN = ioCallback.getFirstLSN();
@@ -585,8 +587,8 @@
     }
 
     private long getRemoteMinFirstLSN() {
-        IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getAppContext().getReplicaResourcesManager();
+        IReplicaResourcesManager remoteResourcesManager =
+                txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
         long minRemoteLSN = remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions());
         return minRemoteLSN;
     }
@@ -783,6 +785,7 @@
                     case LogType.ABORT:
                     case LogType.FLUSH:
                     case LogType.WAIT:
+                    case LogType.MARKER:
                         //ignore
                         break;
                     default:
@@ -798,8 +801,8 @@
             //undo loserTxn's effect
             LOGGER.log(Level.INFO, "undoing loser transaction's effect");
 
-            IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                    .getDatasetLifecycleManager();
+            IDatasetLifecycleManager datasetLifecycleManager =
+                    txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
             //TODO sort loser entities by smallest LSN to undo in one pass.
             Iterator<Entry<TxnId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator();
             int undoCount = 0;
@@ -855,10 +858,10 @@
 
     private static void undo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) {
         try {
-            ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
-                    logRecord.getResourceId());
-            ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
+            ILSMIndex index =
+                    (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
+            ILSMIndexAccessor indexAccessor =
+                    index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
                 indexAccessor.forceDelete(logRecord.getNewValue());
             } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {
@@ -873,10 +876,10 @@
 
     private static void redo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) {
         try {
-            ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
-                    logRecord.getResourceId());
-            ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
+            ILSMIndex index =
+                    (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
+            ILSMIndexAccessor indexAccessor =
+                    index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
                 indexAccessor.forceInsert(logRecord.getNewValue());
             } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
index d94f933..19d7afb 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
@@ -64,6 +64,7 @@
 
     /**
      * Provide data to the stream of this {@link IFrameWriter}.
+     *
      * @param buffer
      *            - Buffer containing data.
      * @throws HyracksDataException
@@ -72,21 +73,24 @@
 
     /**
      * request the frame to push its content forward and flush its consumers
+     *
      * @throws HyracksDataException
      */
     public default void flush() throws HyracksDataException {
-        throw new HyracksDataException("flush() is not supported in this IFrameWriter");
+        // No Op
     }
 
     /**
      * Indicate that a failure was encountered and the current stream is to be
      * aborted.
+     *
      * @throws HyracksDataException
      */
     public void fail() throws HyracksDataException;
 
     /**
      * Close this {@link IFrameWriter} and give up all resources.
+     *
      * @throws HyracksDataException
      */
     public void close() throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index 3781489..4eb3ebf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -45,7 +45,7 @@
 
     public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId) throws Exception;
 
-    public void setSharedObject(Object sharedObject);
+    public void setSharedObject(Object object);
 
     public Object getSharedObject();
 }
diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
similarity index 82%
rename from asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
index 11a2510..8d55235 100644
--- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
@@ -16,10 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.event.util;
+package org.apache.hyracks.api.util;
 
-public class AsterixConstants {
+public class HyracksConstants {
+    public static final String KEY_MESSAGE = "HYX:MSG";
 
-    public static String ASTERIX_ROOT_METADATA_DIR = "asterix_root_metadata";
-
+    private HyracksConstants() {
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 2f8def1..43cac74 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -260,7 +260,8 @@
         init();
 
         datasetNetworkManager.start();
-        IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), ncConfig.retries);
+        IIPCHandle ccIPCHandle =
+                ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), ncConfig.retries);
         this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
@@ -270,12 +271,11 @@
         // Use "public" versions of network addresses and ports
         NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
         NetworkAddress netAddress = netManager.getPublicNetworkAddress();
-        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress,
-                datasetAddress, osMXBean.getName(), osMXBean.getArch(), osMXBean
-                        .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean
-                        .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean
-                        .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
-                runtimeMXBean.getSystemProperties(), hbSchema));
+        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
+                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
+                runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+                runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
 
         synchronized (this) {
             while (registrationPending) {
@@ -490,7 +490,8 @@
             CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
             switch (fn.getFunctionId()) {
                 case SEND_APPLICATION_MESSAGE: {
-                    CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+                    CCNCFunctions.SendApplicationMessageFunction amf =
+                            (CCNCFunctions.SendApplicationMessageFunction) fn;
                     queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
                             amf.getDeploymentId(), amf.getNodeId()));
                     return;
@@ -515,7 +516,8 @@
                 }
 
                 case REPORT_PARTITION_AVAILABILITY: {
-                    CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
+                    CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
+                            (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
                     queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this,
                             rpaf.getPartitionId(), rpaf.getNetworkAddress()));
                     return;
@@ -528,7 +530,8 @@
                 }
 
                 case GET_NODE_CONTROLLERS_INFO_RESPONSE: {
-                    CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf = (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
+                    CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf =
+                            (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
                     setNodeControllersInfo(gncirf.getNodeControllerInfos());
                     return;
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 134154c..f463bfa 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -394,8 +394,8 @@
     }
 
     @Override
-    public void setSharedObject(Object sharedObject) {
-        this.sharedObject = sharedObject;
+    public void setSharedObject(Object object) {
+        this.sharedObject = object;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 7d12296..7d90a8b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -106,20 +106,20 @@
                                 ByteBuffer buffer = ctx.allocateFrame();
                                 boolean fail = false;
                                 boolean done = false;
-                                boolean flush = false;
                                 while (!fail && !done) {
                                     synchronized (MaterializingPipelinedPartition.this) {
-                                        if (flushRequest) {
-                                            flushRequest = false;
-                                            flush = true;
-                                        }
-                                        while (offset >= size && !eos && !failed && !flush) {
+                                        while (offset >= size && !eos && !failed) {
+                                            if (flushRequest) {
+                                                flushRequest = false;
+                                                writer.flush();
+                                            }
                                             try {
                                                 MaterializingPipelinedPartition.this.wait();
                                             } catch (InterruptedException e) {
                                                 throw new HyracksDataException(e);
                                             }
                                         }
+                                        flushRequest = false;
                                         fail = failed;
                                         done = eos && offset >= size;
                                     }
@@ -134,10 +134,6 @@
                                         offset += readLen;
                                         buffer.flip();
                                         writer.nextFrame(buffer);
-                                        if (flush) {
-                                            writer.flush();
-                                            flush = false;
-                                        }
                                     }
                                 }
                             }
@@ -213,4 +209,4 @@
         flushRequest = true;
         notifyAll();
     }
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index e7131d5..57f8072 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -89,9 +89,7 @@
     @Override
     public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
         getBuffer().clear();
-        if (getTupleCount() > 0) {
-            outWriter.nextFrame(getBuffer());
-        }
+        outWriter.nextFrame(getBuffer());
         if (clearFrame) {
             frame.reset();
             reset(getBuffer(), true);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index ef11b5b..3ef8b28 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -45,13 +45,13 @@
      * append fieldSlots and bytes to the current frame
      */
     @Override
-    public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException {
-        if (canHoldNewTuple(fieldSlots.length, length)) {
-            for (int i = 0; i < fieldSlots.length; ++i) {
-                IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fieldSlots[i]);
+    public boolean append(int[] fieldEndOffsets, byte[] bytes, int offset, int length) throws HyracksDataException {
+        if (canHoldNewTuple(fieldEndOffsets.length, length)) {
+            for (int i = 0; i < fieldEndOffsets.length; ++i) {
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fieldEndOffsets[i]);
             }
-            System.arraycopy(bytes, offset, array, tupleDataEndOffset + fieldSlots.length * 4, length);
-            tupleDataEndOffset += fieldSlots.length * 4 + length;
+            System.arraycopy(bytes, offset, array, tupleDataEndOffset + fieldEndOffsets.length * 4, length);
+            tupleDataEndOffset += fieldEndOffsets.length * 4 + length;
             IntSerDeUtils.putInt(getBuffer().array(),
                     FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
@@ -63,19 +63,24 @@
     }
 
     public boolean append(ITupleReference tuple) throws HyracksDataException {
-        int tupleSize = 0;
+        int length = 0;
         for (int i = 0; i < tuple.getFieldCount(); i++) {
-            tupleSize += tuple.getFieldLength(i);
+            length += tuple.getFieldLength(i);
         }
-        if (canHoldNewTuple(tuple.getFieldCount(), tupleSize)) {
-            int offset = 0;
+
+        if (canHoldNewTuple(tuple.getFieldCount(), length)) {
+            length = 0;
             for (int i = 0; i < tuple.getFieldCount(); ++i) {
-                IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, offset);
-                System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i), array,
-                        tupleDataEndOffset + tuple.getFieldCount() * 4, tuple.getFieldLength(i));
-                offset += tuple.getFieldLength(i);
+                length += tuple.getFieldLength(i);
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, length);
             }
-            tupleDataEndOffset += tuple.getFieldCount() * 4 + tupleSize;
+            length = 0;
+            for (int i = 0; i < tuple.getFieldCount(); ++i) {
+                System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i), array,
+                        tupleDataEndOffset + tuple.getFieldCount() * 4 + length, tuple.getFieldLength(i));
+                length += tuple.getFieldLength(i);
+            }
+            tupleDataEndOffset += tuple.getFieldCount() * 4 + length;
             IntSerDeUtils.putInt(getBuffer().array(),
                     FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index cae659d..7f518cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -26,7 +26,9 @@
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.util.IntSerDeUtils;
 
 /**
@@ -39,7 +41,9 @@
     private static final int NULL_MESSAGE_SIZE = 1;
     public static final byte NULL_FEED_MESSAGE = 0x01;
     public static final byte ACK_REQ_FEED_MESSAGE = 0x02;
-    public static final byte SNAPSHOT_MESSAGE = 0x03;
+    public static final byte MARKER_MESSAGE = 0x03;
+    private boolean initialized = false;
+    private VSizeFrame message;
 
     public MessagingFrameTupleAppender(IHyracksTaskContext ctx) {
         this.ctx = ctx;
@@ -59,8 +63,8 @@
             case ACK_REQ_FEED_MESSAGE:
                 aString.append("Ack Request, ");
                 break;
-            case SNAPSHOT_MESSAGE:
-                aString.append("Snapshot, ");
+            case MARKER_MESSAGE:
+                aString.append("Marker, ");
                 break;
             default:
                 aString.append("Unknown, ");
@@ -78,8 +82,8 @@
                 return NULL_FEED_MESSAGE;
             case ACK_REQ_FEED_MESSAGE:
                 return ACK_REQ_FEED_MESSAGE;
-            case SNAPSHOT_MESSAGE:
-                return SNAPSHOT_MESSAGE;
+            case MARKER_MESSAGE:
+                return MARKER_MESSAGE;
             default:
                 throw new HyracksDataException("Unknown message type");
         }
@@ -101,24 +105,35 @@
 
     @Override
     public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
+        if (!initialized) {
+            message = TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
+            initialized = true;
+        }
         // If message fits, we append it, otherwise, we append a null message, then send a message only
         // frame with the message
-        ByteBuffer message = ((VSizeFrame) ctx.getSharedObject()).getBuffer();
-        int messageSize = message.limit() - message.position();
-        if (hasEnoughSpace(1, messageSize)) {
-            appendMessage(message);
-            forward(outWriter);
-        } else {
+        if (message == null) {
             if (tupleCount > 0) {
                 appendNullMessage();
                 forward(outWriter);
             }
-            if (!hasEnoughSpace(1, messageSize)) {
-                frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize, frame.getMinSize()));
-                reset(frame.getBuffer(), true);
+        } else {
+            ByteBuffer buffer = message.getBuffer();
+            int messageSize = buffer.limit() - buffer.position();
+            if (hasEnoughSpace(1, messageSize)) {
+                appendMessage(buffer);
+                forward(outWriter);
+            } else {
+                if (tupleCount > 0) {
+                    appendNullMessage();
+                    forward(outWriter);
+                }
+                if (!hasEnoughSpace(1, messageSize)) {
+                    frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize, frame.getMinSize()));
+                    reset(frame.getBuffer(), true);
+                }
+                appendMessage(buffer);
+                forward(outWriter);
             }
-            appendMessage(message);
-            forward(outWriter);
         }
     }
 
@@ -130,8 +145,9 @@
     }
 
     private void appendMessage(ByteBuffer message) {
-        System.arraycopy(message.array(), message.position(), array, tupleDataEndOffset, message.limit());
-        tupleDataEndOffset += message.limit();
+        int messageLength = message.limit() - message.position();
+        System.arraycopy(message.array(), message.position(), array, tupleDataEndOffset, messageLength);
+        tupleDataEndOffset += messageLength;
         IntSerDeUtils.putInt(getBuffer().array(),
                 FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
         ++tupleCount;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/TaskUtils.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/TaskUtils.java
new file mode 100644
index 0000000..4f27d79
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/TaskUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * A Utility class for facilitating common operations used with a hyracks task
+ */
+public class TaskUtils {
+    private TaskUtils() {
+    }
+
+    /**
+     * get the shared object of a task as a Map<String,Object>
+     *
+     * @param ctx
+     *            the task context
+     * @param create
+     * @return the task shared map
+     */
+    @SuppressWarnings("unchecked")
+    public static Map<String, Object> getSharedMap(IHyracksTaskContext ctx, boolean create) {
+        if (ctx.getSharedObject() != null) {
+            return (Map<String, Object>) ctx.getSharedObject();
+        } else if (create) {
+            Map<String, Object> taskMap = new HashMap<>();
+            ctx.setSharedObject(taskMap);
+            return taskMap;
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * put the key value pair in a map task object
+     *
+     * @param key
+     * @param ctx
+     * @param object
+     */
+    public static void putInSharedMap(String key, Object object, IHyracksTaskContext ctx) {
+        TaskUtils.getSharedMap(ctx, true).put(key, object);
+    }
+
+    /**
+     * get a <T> object from the shared map of the task
+     *
+     * @param key
+     * @param ctx
+     * @return the value associated with the key casted as T
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T get(String key, IHyracksTaskContext ctx) {
+        Map<String, Object> sharedMap = TaskUtils.getSharedMap(ctx, false);
+        return sharedMap == null ? null : (T) sharedMap.get(key);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 30ee3c0..b4e51be 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -65,8 +65,8 @@
             RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException {
         this.groupFields = groupFields;
         this.comparators = comparators;
-        this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields,
-                writer);
+        this.aggregator =
+                aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields, writer);
         this.aggregateState = aggregator.createAggregateStates();
         copyFrame = new VSizeFrame(ctx);
         inFrameAccessor = new FrameTupleAccessor(inRecordDesc);
@@ -91,29 +91,32 @@
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         inFrameAccessor.reset(buffer);
         int nTuples = inFrameAccessor.getTupleCount();
-        for (int i = 0; i < nTuples; ++i) {
-            if (first) {
+        if (nTuples != 0) {
+            for (int i = 0; i < nTuples; ++i) {
+                if (first) {
 
-                tupleBuilder.reset();
-                for (int j = 0; j < groupFields.length; j++) {
-                    tupleBuilder.addField(inFrameAccessor, i, groupFields[j]);
-                }
-                aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
+                    tupleBuilder.reset();
+                    for (int j = 0; j < groupFields.length; j++) {
+                        tupleBuilder.addField(inFrameAccessor, i, groupFields[j]);
+                    }
+                    aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
 
-                first = false;
+                    first = false;
 
-            } else {
-                if (i == 0) {
-                    switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
                 } else {
-                    switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
-                }
+                    if (i == 0) {
+                        switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor,
+                                i);
+                    } else {
+                        switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+                    }
 
+                }
             }
+            copyFrame.ensureFrameSize(buffer.capacity());
+            FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
+            copyFrameAccessor.reset(copyFrame.getBuffer());
         }
-        copyFrame.ensureFrameSize(buffer.capacity());
-        FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
-        copyFrameAccessor.reset(copyFrame.getBuffer());
     }
 
     private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
diff --git a/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml b/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
index 7267cb7..7784199 100644
--- a/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
@@ -33,4 +33,35 @@
     <root.dir>${basedir}/../../..</root.dir>
   </properties>
 
+  <build>
+      <pluginManagement>
+          <plugins>
+              <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+              <plugin>
+                  <groupId>org.eclipse.m2e</groupId>
+                  <artifactId>lifecycle-mapping</artifactId>
+                  <version>1.0.0</version>
+                  <configuration>
+                      <lifecycleMappingMetadata>
+                          <pluginExecutions>
+                              <pluginExecution>
+                                  <pluginExecutionFilter>
+                                      <groupId>org.apache.maven.plugins</groupId>
+                                      <artifactId>maven-plugin-plugin</artifactId>
+                                      <versionRange>[3.3,)</versionRange>
+                                      <goals>
+                                          <goal>descriptor</goal>
+                                      </goals>
+                                  </pluginExecutionFilter>
+                                  <action>
+                                      <ignore></ignore>
+                                  </action>
+                              </pluginExecution>
+                          </pluginExecutions>
+                      </lifecycleMappingMetadata>
+                  </configuration>
+              </plugin>
+          </plugins>
+      </pluginManagement>
+  </build>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetaDataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetaDataPageManager.java
index 2550ab4..3982a3a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetaDataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetaDataPageManager.java
@@ -31,12 +31,15 @@
      */
     /**
      * Open an index file's metadata
-     * @param fileId The file which to open the metadata of
+     *
+     * @param fileId
+     *            The file which to open the metadata of
      */
     public void open(int fileId);
 
     /**
      * Close an index file's metadata.
+     *
      * @throws HyracksDataException
      */
 
@@ -44,7 +47,9 @@
 
     /**
      * Get the location of a free page to use for index operations
-     * @param metaFrame A metadata frame to use to wrap the raw page
+     *
+     * @param metaFrame
+     *            A metadata frame to use to wrap the raw page
      * @return A page location, or -1 if no free page could be found or allocated
      * @throws HyracksDataException
      */
@@ -53,7 +58,9 @@
 
     /**
      * Get the location of a block of free pages to use for index operations
-     * @param metaFrame A metadata frame to use to wrap the raw page
+     *
+     * @param metaFrame
+     *            A metadata frame to use to wrap the raw page
      * @return The starting page location, or -1 if a block of free pages could be found or allocated
      * @throws HyracksDataException
      */
@@ -62,8 +69,11 @@
 
     /**
      * Add a page back to the pool of free pages within an index file
-     * @param metaFrame A metadata frame to use to wrap the raw page
-     * @param freePage The page which to return to the free space
+     *
+     * @param metaFrame
+     *            A metadata frame to use to wrap the raw page
+     * @param freePage
+     *            The page which to return to the free space
      * @throws HyracksDataException
      */
 
@@ -74,7 +84,9 @@
 
     /**
      * Gets the highest page offset according to the metadata
-     * @param metaFrame A metadata frame to use to wrap the raw page
+     *
+     * @param metaFrame
+     *            A metadata frame to use to wrap the raw page
      * @return The locaiton of the highest offset page
      * @throws HyracksDataException
      */
@@ -83,8 +95,11 @@
 
     /**
      * Initializes the index metadata
-     * @param metaFrame A metadata farme to use to wrap the raw page
-     * @param currentMaxPage The highest page offset to consider valid
+     *
+     * @param metaFrame
+     *            A metadata farme to use to wrap the raw page
+     * @param currentMaxPage
+     *            The highest page offset to consider valid
      * @throws HyracksDataException
      */
 
@@ -105,6 +120,7 @@
 
     /**
      * Determines where the metadata page is located in an index file
+     *
      * @return The locaiton of the metadata page, or -1 if the file appears to be corrupt
      * @throws HyracksDataException
      */
@@ -113,7 +129,9 @@
 
     /**
      * Initializes the metadata manager on an open index file
-     * @param metaFrame A metadata frame used to wrap the raw page
+     *
+     * @param metaFrame
+     *            A metadata frame used to wrap the raw page
      * @throws HyracksDataException
      */
 
@@ -121,6 +139,7 @@
 
     /**
      * Locate the filter page in an index file
+     *
      * @return The offset of the filter page if it exists, or less than zero if no filter page exists yet
      * @throws HyracksDataException
      */
@@ -135,7 +154,9 @@
 
     /**
      * Set the cached page to manage for filter data
-     * @param page The page to manage
+     *
+     * @param page
+     *            The page to manage
      */
 
     void setFilterPage(ICachedPage page);
@@ -149,4 +170,6 @@
      * @throws HyracksDataException
      */
     long getLSNOffset() throws HyracksDataException;
+
+    public long getLastMarkerLSN() throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetaDataFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetaDataFrame.java
index e33b949..bdfa9e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetaDataFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetaDataFrame.java
@@ -62,4 +62,9 @@
     public long getLSN();
 
     public void setLSN(long lsn);
+
+    // Special placeholder for LSN information of a marker log. used for rollback information
+    public long getLastMarkerLSN();
+
+    public void setLastMarkerLSN(long lsn);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 4f9e6c4..44778a2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -199,7 +199,9 @@
         if (index != null) {
             // if index == null, then the index open was not successful
             try {
-                appender.write(writer, true);
+                if (appender.getTupleCount() > 0) {
+                    appender.write(writer, true);
+                }
             } catch (Throwable th) {
                 closeException = new HyracksDataException(th);
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
index 16fdecd..38b43c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
@@ -34,38 +34,43 @@
     // Arbitrarily chosen magic integer.
     protected static final int MAGIC_VALID_INT = 0x5bd1e995;
 
-    protected static final int tupleCountOff = 0; //0
-    protected static final int freeSpaceOff = tupleCountOff + 4; //4
-    protected static final int maxPageOff = freeSpaceOff + 4; //8
-    protected static final int levelOff = maxPageOff + 12; //20
-    protected static final int nextPageOff = levelOff + 1; // 21
-    protected static final int validOff = nextPageOff + 4; // 25
+    protected static final int TUPLE_COUNT_OFFSET = 0; //0
+    protected static final int FREE_SPACE_OFFSET = TUPLE_COUNT_OFFSET + 4; //4
+    protected static final int MAX_PAGE_OFFSET = FREE_SPACE_OFFSET + 4; //8
+    protected static final int LEVEL_OFFSET = MAX_PAGE_OFFSET + 12; //20
+    protected static final int NEXT_PAGE_OFFSET = LEVEL_OFFSET + 1; // 21
+    protected static final int VALID_OFFSET = NEXT_PAGE_OFFSET + 4; // 25
 
     // The additionalFilteringPageOff is used only for LSM indexes.
     // We store the page id that will be used to store the information of the the filter that is associated with a disk component.
     // It is only set in the first meta page other meta pages (i.e., with level -2) have junk in the max page field.
-    private static final int additionalFilteringPageOff = validOff + 4; // 29
-    public static final int lsnOff = additionalFilteringPageOff + 4; // 33
+    private static final int ADDITIONAL_FILTERING_PAGE_OFFSET = VALID_OFFSET + 4; // 29
+    public static final int LSN_OFFSET = ADDITIONAL_FILTERING_PAGE_OFFSET + 4; // 33
+    private static final int LAST_MARKER_LSN_OFFSET = LSN_OFFSET + 8; // 41
+    private static final int HEADER_END_OFFSET = LAST_MARKER_LSN_OFFSET + 8;
 
     protected ICachedPage page = null;
     protected ByteBuffer buf = null;
 
+    @Override
     public int getMaxPage() {
-        return buf.getInt(maxPageOff);
+        return buf.getInt(MAX_PAGE_OFFSET);
     }
 
+    @Override
     public void setMaxPage(int maxPage) {
-        buf.putInt(maxPageOff, maxPage);
+        buf.putInt(MAX_PAGE_OFFSET, maxPage);
     }
 
+    @Override
     public int getFreePage() {
-        int tupleCount = buf.getInt(tupleCountOff);
+        int tupleCount = buf.getInt(TUPLE_COUNT_OFFSET);
         if (tupleCount > 0) {
             // return the last page from the linked list of free pages
             // TODO: this is a dumb policy, but good enough for now
-            int lastPageOff = buf.getInt(freeSpaceOff) - 4;
-            buf.putInt(freeSpaceOff, lastPageOff);
-            buf.putInt(tupleCountOff, tupleCount - 1);
+            int lastPageOff = buf.getInt(FREE_SPACE_OFFSET) - 4;
+            buf.putInt(FREE_SPACE_OFFSET, lastPageOff);
+            buf.putInt(TUPLE_COUNT_OFFSET, tupleCount - 1);
             return buf.getInt(lastPageOff);
         } else {
             return -1;
@@ -75,26 +80,28 @@
     // must be checked before adding free page
     // user of this class is responsible for getting a free page as a new meta
     // page, latching it, etc. if there is no space on this page
+    @Override
     public boolean hasSpace() {
-        return buf.getInt(freeSpaceOff) + 4 < buf.capacity();
+        return buf.getInt(FREE_SPACE_OFFSET) + 4 < buf.capacity();
     }
 
     // no bounds checking is done, there must be free space
+    @Override
     public void addFreePage(int freePage) {
-        int freeSpace = buf.getInt(freeSpaceOff);
+        int freeSpace = buf.getInt(FREE_SPACE_OFFSET);
         buf.putInt(freeSpace, freePage);
-        buf.putInt(freeSpaceOff, freeSpace + 4);
-        buf.putInt(tupleCountOff, buf.getInt(tupleCountOff) + 1);
+        buf.putInt(FREE_SPACE_OFFSET, freeSpace + 4);
+        buf.putInt(TUPLE_COUNT_OFFSET, buf.getInt(TUPLE_COUNT_OFFSET) + 1);
     }
 
     @Override
     public byte getLevel() {
-        return buf.get(levelOff);
+        return buf.get(LEVEL_OFFSET);
     }
 
     @Override
     public void setLevel(byte level) {
-        buf.put(levelOff, level);
+        buf.put(LEVEL_OFFSET, level);
     }
 
     @Override
@@ -110,56 +117,67 @@
 
     @Override
     public void initBuffer(byte level) {
-        buf.putInt(tupleCountOff, 0);
-        buf.putInt(freeSpaceOff, lsnOff + 8);
-        buf.putInt(maxPageOff, 0);
-        buf.put(levelOff, level);
-        buf.putInt(nextPageOff, -1);
-        buf.putInt(additionalFilteringPageOff, -1);
+        buf.putInt(TUPLE_COUNT_OFFSET, 0);
+        buf.putInt(FREE_SPACE_OFFSET, HEADER_END_OFFSET);
+        buf.putInt(MAX_PAGE_OFFSET, 0);
+        buf.put(LEVEL_OFFSET, level);
+        buf.putInt(NEXT_PAGE_OFFSET, -1);
+        buf.putInt(ADDITIONAL_FILTERING_PAGE_OFFSET, -1);
+        buf.putLong(LAST_MARKER_LSN_OFFSET, -1L);
         setValid(false);
     }
 
     @Override
     public int getNextPage() {
-        return buf.getInt(nextPageOff);
+        return buf.getInt(NEXT_PAGE_OFFSET);
     }
 
     @Override
     public void setNextPage(int nextPage) {
-        buf.putInt(nextPageOff, nextPage);
+        buf.putInt(NEXT_PAGE_OFFSET, nextPage);
     }
 
     @Override
     public boolean isValid() {
-        return buf.getInt(validOff) == MAGIC_VALID_INT;
+        return buf.getInt(VALID_OFFSET) == MAGIC_VALID_INT;
     }
 
     @Override
     public void setValid(boolean isValid) {
         if (isValid) {
-            buf.putInt(validOff, MAGIC_VALID_INT);
+            buf.putInt(VALID_OFFSET, MAGIC_VALID_INT);
         } else {
-            buf.putInt(validOff, 0);
+            buf.putInt(VALID_OFFSET, 0);
         }
     }
 
     @Override
     public long getLSN() {
-        return buf.getLong(lsnOff);
+        return buf.getLong(LSN_OFFSET);
     }
 
     @Override
     public void setLSN(long lsn) {
-        buf.putLong(lsnOff, lsn);
+        buf.putLong(LSN_OFFSET, lsn);
+    }
+
+    @Override
+    public long getLastMarkerLSN() {
+        return buf.getLong(LAST_MARKER_LSN_OFFSET);
+    }
+
+    @Override
+    public void setLastMarkerLSN(long lsn) {
+        buf.putLong(LAST_MARKER_LSN_OFFSET, lsn);
     }
 
     @Override
     public int getLSMComponentFilterPageId() {
-        return buf.getInt(additionalFilteringPageOff);
+        return buf.getInt(ADDITIONAL_FILTERING_PAGE_OFFSET);
     }
 
     @Override
     public void setLSMComponentFilterPageId(int filterPage) {
-        buf.putInt(additionalFilteringPageOff, filterPage);
+        buf.putInt(ADDITIONAL_FILTERING_PAGE_OFFSET, filterPage);
     }
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
index 468654a..4d56a80 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
@@ -361,8 +361,9 @@
      */
     @Override
     public int getFirstMetadataPage() throws HyracksDataException {
-        if (headPage != IBufferCache.INVALID_PAGEID)
+        if (headPage != IBufferCache.INVALID_PAGEID) {
             return headPage;
+        }
 
         ITreeIndexMetaDataFrame metaFrame = metaDataFrameFactory.createFrame();
 
@@ -472,8 +473,29 @@
     public long getLSNOffset() throws HyracksDataException {
         int metadataPageNum = getFirstMetadataPage();
         if (metadataPageNum != IBufferCache.INVALID_PAGEID) {
-            return (metadataPageNum * bufferCache.getPageSize()) + LIFOMetaDataFrame.lsnOff;
+            return (metadataPageNum * (long) bufferCache.getPageSize()) + LIFOMetaDataFrame.LSN_OFFSET;
         }
         return IMetaDataPageManager.INVALID_LSN_OFFSET;
     }
+
+    @Override
+    public long getLastMarkerLSN() throws HyracksDataException {
+        ICachedPage metaNode;
+        if (!appendOnly || (appendOnly && confiscatedMetaNode == null)) {
+            metaNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, getFirstMetadataPage()), false);
+        } else {
+            metaNode = confiscatedMetaNode;
+        }
+        ITreeIndexMetaDataFrame metaFrame = metaDataFrameFactory.createFrame();
+        metaNode.acquireReadLatch();
+        try {
+            metaFrame.setPage(metaNode);
+            return metaFrame.getLastMarkerLSN();
+        } finally {
+            metaNode.releaseReadLatch();
+            if (!appendOnly || (appendOnly && confiscatedMetaNode == null)) {
+                bufferCache.unpin(metaNode);
+            }
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 9e75360..6d673a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -129,8 +129,8 @@
             ++i;
         }
         componentFactory = new LSMBTreeDiskComponentFactory(diskBTreeFactory, bloomFilterFactory, filterFactory);
-        bulkLoadComponentFactory = new LSMBTreeDiskComponentFactory(bulkLoadBTreeFactory, bloomFilterFactory,
-                filterFactory);
+        bulkLoadComponentFactory =
+                new LSMBTreeDiskComponentFactory(bulkLoadBTreeFactory, bloomFilterFactory, filterFactory);
         this.needKeyDupCheck = needKeyDupCheck;
         this.btreeFields = btreeFields;
         this.hasBloomFilter = needKeyDupCheck;
@@ -184,9 +184,9 @@
         for (LSMComponentFileReferences lsmComonentFileReference : validFileReferences) {
             LSMBTreeDiskComponent component;
             try {
-                component = createDiskComponent(componentFactory,
-                        lsmComonentFileReference.getInsertIndexFileReference(),
-                        lsmComonentFileReference.getBloomFilterFileReference(), false);
+                component =
+                        createDiskComponent(componentFactory, lsmComonentFileReference.getInsertIndexFileReference(),
+                                lsmComonentFileReference.getBloomFilterFileReference(), false);
             } catch (IndexException e) {
                 throw new HyracksDataException(e);
             }
@@ -424,7 +424,7 @@
         ILSMIndexAccessorInternal flushAccessor = new LSMBTreeAccessor(lsmHarness, opCtx);
         ioScheduler.scheduleOperation(new LSMBTreeFlushOperation(flushAccessor, flushingComponent,
                 componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(),
-                callback, fileManager.getBaseDir()));
+                callback, fileManager.getBaseDir(), flushingComponent.getMostRecentMarkerLSN()));
     }
 
     @Override
@@ -489,7 +489,7 @@
             filterManager.updateFilterInfo(component.getLSMComponentFilter(), filterTuples);
             filterManager.writeFilterInfo(component.getLSMComponentFilter(), component.getBTree());
         }
-
+        component.setMostRecentMarkerLSN(flushOp.getPrevMarkerLSN());
         bulkLoader.end();
 
         return component;
@@ -511,8 +511,8 @@
         BTree lastBTree = ((LSMBTreeDiskComponent) mergingComponents.get(mergingComponents.size() - 1)).getBTree();
         FileReference firstFile = firstBTree.getFileReference();
         FileReference lastFile = lastBTree.getFileReference();
-        LSMComponentFileReferences relMergeFileRefs = fileManager
-                .getRelMergeFileReference(firstFile.getFile().getName(), lastFile.getFile().getName());
+        LSMComponentFileReferences relMergeFileRefs =
+                fileManager.getRelMergeFileReference(firstFile.getFile().getName(), lastFile.getFile().getName());
         ILSMIndexAccessorInternal accessor = new LSMBTreeAccessor(lsmHarness, opCtx);
         ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, mergingComponents, cursor,
                 relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(),
@@ -542,8 +542,8 @@
         LSMBTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getBTreeMergeTarget(),
                 mergeOp.getBloomFilterMergeTarget(), true);
 
-        IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements, false,
-                true);
+        IIndexBulkLoader bulkLoader =
+                mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements, false, true);
         IIndexBulkLoader builder = null;
         if (hasBloomFilter) {
             builder = mergedComponent.getBloomFilter().createBuilder(numElements, bloomFilterSpec.getNumHashes(),
@@ -574,6 +574,8 @@
             filterManager.writeFilterInfo(mergedComponent.getLSMComponentFilter(), mergedComponent.getBTree());
         }
 
+        mergedComponent
+                .setMostRecentMarkerLSN(mergedComponents.get(mergedComponents.size() - 1).getMostRecentMarkerLSN());
         bulkLoader.end();
 
         return mergedComponent;
@@ -581,7 +583,7 @@
 
     protected LSMBTreeDiskComponent createDiskComponent(LSMBTreeDiskComponentFactory factory,
             FileReference btreeFileRef, FileReference bloomFilterFileRef, boolean createComponent)
-                    throws HyracksDataException, IndexException {
+            throws HyracksDataException, IndexException {
         // Create new BTree instance.
         LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) factory
                 .createLSMComponentInstance(new LSMComponentFileReferences(btreeFileRef, null, bloomFilterFileRef));
@@ -595,6 +597,11 @@
         if (component.getLSMComponentFilter() != null && !createComponent) {
             filterManager.readFilterInfo(component.getLSMComponentFilter(), component.getBTree());
         }
+
+        if (!createComponent) {
+            component.readMostRecentMarkerLSN(component.getBTree());
+        }
+
         return component;
     }
 
@@ -651,8 +658,8 @@
 
             if (hasBloomFilter) {
                 int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
-                BloomFilterSpecification bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement,
-                        bloomFilterFalsePositiveRate);
+                BloomFilterSpecification bloomFilterSpec =
+                        BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
                 builder = ((LSMBTreeDiskComponent) component).getBloomFilter().createBuilder(numElementsHint,
                         bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
             } else {
@@ -727,7 +734,7 @@
                     filterManager.writeFilterInfo(component.getLSMComponentFilter(),
                             ((LSMBTreeDiskComponent) component).getBTree());
                 }
-
+                component.setMostRecentMarkerLSN(-1L);
                 bulkLoader.end();
 
                 if (isEmptyComponent) {
@@ -792,36 +799,36 @@
 
     @Override
     public ITreeIndexFrameFactory getInteriorFrameFactory() {
-        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) memoryComponents
-                .get(currentMutableComponentId.get());
+        LSMBTreeMemoryComponent mutableComponent =
+                (LSMBTreeMemoryComponent) memoryComponents.get(currentMutableComponentId.get());
         return mutableComponent.getBTree().getInteriorFrameFactory();
     }
 
     @Override
     public int getFieldCount() {
-        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) memoryComponents
-                .get(currentMutableComponentId.get());
+        LSMBTreeMemoryComponent mutableComponent =
+                (LSMBTreeMemoryComponent) memoryComponents.get(currentMutableComponentId.get());
         return mutableComponent.getBTree().getFieldCount();
     }
 
     @Override
     public int getFileId() {
-        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) memoryComponents
-                .get(currentMutableComponentId.get());
+        LSMBTreeMemoryComponent mutableComponent =
+                (LSMBTreeMemoryComponent) memoryComponents.get(currentMutableComponentId.get());
         return mutableComponent.getBTree().getFileId();
     }
 
     @Override
     public IMetaDataPageManager getMetaManager() {
-        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) memoryComponents
-                .get(currentMutableComponentId.get());
+        LSMBTreeMemoryComponent mutableComponent =
+                (LSMBTreeMemoryComponent) memoryComponents.get(currentMutableComponentId.get());
         return mutableComponent.getBTree().getMetaManager();
     }
 
     @Override
     public ITreeIndexFrameFactory getLeafFrameFactory() {
-        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) memoryComponents
-                .get(currentMutableComponentId.get());
+        LSMBTreeMemoryComponent mutableComponent =
+                (LSMBTreeMemoryComponent) memoryComponents.get(currentMutableComponentId.get());
         return mutableComponent.getBTree().getLeafFrameFactory();
     }
 
@@ -838,8 +845,8 @@
 
     @Override
     public int getRootPageId() {
-        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) memoryComponents
-                .get(currentMutableComponentId.get());
+        LSMBTreeMemoryComponent mutableComponent =
+                (LSMBTreeMemoryComponent) memoryComponents.get(currentMutableComponentId.get());
         return mutableComponent.getBTree().getRootPageId();
     }
 
@@ -892,11 +899,21 @@
         if (memoryComponentsAllocated) {
             return;
         }
+        long markerLSN = -1L;
+        if (!diskComponents.isEmpty()) {
+            markerLSN = diskComponents.get(diskComponents.size() - 1).getMostRecentMarkerLSN();
+        } else {
+            // Needed in case a marker was added before any record
+            if (memoryComponents != null && !memoryComponents.isEmpty()) {
+                markerLSN = memoryComponents.get(0).getMostRecentMarkerLSN();
+            }
+        }
         for (ILSMComponent c : memoryComponents) {
             LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
             ((IVirtualBufferCache) mutableComponent.getBTree().getBufferCache()).open();
             mutableComponent.getBTree().create();
             mutableComponent.getBTree().activate();
+            mutableComponent.setMostRecentMarkerLSN(markerLSN);
         }
         memoryComponentsAllocated = true;
     }
@@ -946,4 +963,14 @@
             memoryComponentsAllocated = false;
         }
     }
+
+    public synchronized long getMostRecentMarkerLSN() throws HyracksDataException {
+        if (!isPrimaryIndex()) {
+            throw new HyracksDataException("Markers are only supported for primary indexes");
+        }
+        LSMBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+        opCtx.setOperation(IndexOperation.SEARCH);
+        getOperationalComponents(opCtx);
+        return !opCtx.getComponentHolder().isEmpty() ? opCtx.getComponentHolder().get(0).getMostRecentMarkerLSN() : -1L;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index c43590b..424dfd3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -21,6 +21,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
 import org.apache.hyracks.storage.am.btree.impls.BTree;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractDiskLSMComponent;
 
@@ -62,4 +63,9 @@
     public int getFileReferenceCount() {
         return btree.getBufferCache().getFileReferenceCount(btree.getFileId());
     }
+
+    public void readMostRecentMarkerLSN(BTree treeIndex) throws HyracksDataException {
+        IMetaDataPageManager treeMetaManager = treeIndex.getMetaManager();
+        mostRecentMarkerLSN = treeMetaManager.getLastMarkerLSN();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
index a30527d..f82a1b0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
@@ -39,16 +39,18 @@
     private final FileReference bloomFilterFlushTarget;
     private final ILSMIOOperationCallback callback;
     private final String indexIdentifier;
+    private final long prevMarkerLSN;
 
     public LSMBTreeFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
             FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback,
-            String indexIdentifier) {
+            String indexIdentifier, long prevMarkerLSN) {
         this.accessor = accessor;
         this.flushingComponent = flushingComponent;
         this.btreeFlushTarget = btreeFlushTarget;
         this.bloomFilterFlushTarget = bloomFilterFlushTarget;
         this.callback = callback;
         this.indexIdentifier = indexIdentifier;
+        this.prevMarkerLSN = prevMarkerLSN;
     }
 
     @Override
@@ -107,4 +109,8 @@
     public int compareTo(LSMBTreeFlushOperation o) {
         return btreeFlushTarget.getFile().getName().compareTo(o.getBTreeFlushTarget().getFile().getName());
     }
+
+    public long getPrevMarkerLSN() {
+        return prevMarkerLSN;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMemoryComponent.java
index dbf47a1..1a103f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMemoryComponent.java
@@ -29,11 +29,16 @@
 
     private final BTree btree;
 
-    public LSMBTreeMemoryComponent(BTree btree, IVirtualBufferCache vbc, boolean isActive, ILSMComponentFilter filter) {
-        super(vbc, isActive, filter);
+    public LSMBTreeMemoryComponent(BTree btree, IVirtualBufferCache vbc, boolean isActive, ILSMComponentFilter filter,
+            long mostRecentMarkerLSN) {
+        super(vbc, isActive, filter, mostRecentMarkerLSN);
         this.btree = btree;
     }
 
+    public LSMBTreeMemoryComponent(BTree btree, IVirtualBufferCache vbc, boolean isActive, ILSMComponentFilter filter) {
+        this(btree, vbc, isActive, filter, -1L);
+    }
+
     public BTree getBTree() {
         return btree;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
index b847876..a888dd5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
@@ -46,4 +46,8 @@
     public ComponentState getState();
 
     public ILSMComponentFilter getLSMComponentFilter();
+
+    public void setMostRecentMarkerLSN(long lsn);
+
+    public long getMostRecentMarkerLSN();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualMetaDataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualMetaDataPageManager.java
index c83cbf2..158c68f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualMetaDataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualMetaDataPageManager.java
@@ -67,10 +67,12 @@
         return NullMetadataFrameFactory.INSTANCE;
     }
 
+    @Override
     public int getCapacity() {
         return capacity - 2;
     }
 
+    @Override
     public void reset() {
         currentPageId.set(1);
     }
@@ -157,10 +159,12 @@
         // Method doesn't make sense for this free page manager.
     }
 
+    @Override
     public void setFilterPage(ICachedPage page) {
         // Method doesn't make sense for this free page manager.
     }
 
+    @Override
     public ICachedPage getFilterPage() {
         return null;
     }
@@ -174,4 +178,10 @@
     public long getLSNOffset() throws HyracksDataException {
         return IMetaDataPageManager.INVALID_LSN_OFFSET;
     }
+
+    @Override
+    public long getLastMarkerLSN() throws HyracksDataException {
+        // Method doesn't make sense for this free page manager.
+        return -1L;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
index b2c55dc..c678878 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
@@ -26,10 +26,16 @@
     protected ComponentState state;
     protected int readerCount;
     protected final ILSMComponentFilter filter;
+    protected long mostRecentMarkerLSN;
+
+    public AbstractLSMComponent(ILSMComponentFilter filter, long mostRecentMarkerLSN) {
+        this.filter = filter;
+        this.mostRecentMarkerLSN = mostRecentMarkerLSN;
+        readerCount = 0;
+    }
 
     public AbstractLSMComponent(ILSMComponentFilter filter) {
-        this.filter = filter;
-        readerCount = 0;
+        this(filter, -1L);
     }
 
     public AbstractLSMComponent() {
@@ -45,4 +51,14 @@
     public ILSMComponentFilter getLSMComponentFilter() {
         return filter;
     }
+
+    @Override
+    public long getMostRecentMarkerLSN() {
+        return mostRecentMarkerLSN;
+    }
+
+    @Override
+    public void setMostRecentMarkerLSN(long lsn) {
+        this.mostRecentMarkerLSN = lsn;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 440ad31..cb3a2db 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -275,8 +275,8 @@
         }
 
         //create replication job and submit it
-        LSMIndexReplicationJob job = new LSMIndexReplicationJob(this, ctx, componentFiles, operation, executionType,
-                opType);
+        LSMIndexReplicationJob job =
+                new LSMIndexReplicationJob(this, ctx, componentFiles, operation, executionType, opType);
         try {
             diskBufferCache.getIOReplicationManager().submitJob(job);
         } catch (IOException e) {
@@ -296,4 +296,8 @@
     public boolean isDurable() {
         return durable;
     }
+
+    public ILSMComponent getCurrentMemoryComponent() {
+        return memoryComponents.get(currentMutableComponentId.get());
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
index 4b8fc9b..500996f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
@@ -32,8 +32,9 @@
     private int writerCount;
     private boolean requestedToBeActive;
 
-    public AbstractMemoryLSMComponent(IVirtualBufferCache vbc, boolean isActive, ILSMComponentFilter filter) {
-        super(filter);
+    public AbstractMemoryLSMComponent(IVirtualBufferCache vbc, boolean isActive, ILSMComponentFilter filter,
+            long mostRecentMarkerLSN) {
+        super(filter, mostRecentMarkerLSN);
         this.vbc = vbc;
         writerCount = 0;
         if (isActive) {
@@ -44,6 +45,10 @@
         isModified = new AtomicBoolean();
     }
 
+    public AbstractMemoryLSMComponent(IVirtualBufferCache vbc, boolean isActive, ILSMComponentFilter filter) {
+        this(vbc, isActive, filter, -1L);
+    }
+
     public AbstractMemoryLSMComponent(IVirtualBufferCache vbc, boolean isActive) {
         this(vbc, isActive, null);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 8ddab88..896d513 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -172,7 +172,11 @@
                 lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.FLUSH);
                 // Changing the flush status should *always* precede changing the mutable component.
                 lsmIndex.changeFlushStatusForCurrentMutableCompoent(false);
+                // Flushing! => carry over the marker lsn to the next component
+                long mostRecentMarkerLSN =
+                        ((AbstractLSMIndex) lsmIndex).getCurrentMemoryComponent().getMostRecentMarkerLSN();
                 lsmIndex.changeMutableComponent();
+                ((AbstractLSMIndex) lsmIndex).getCurrentMemoryComponent().setMostRecentMarkerLSN(mostRecentMarkerLSN);
                 // Notify all waiting threads whenever a flush has been scheduled since they will check
                 // again if they can grab and enter the mutable component.
                 opTracker.notifyAll();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 3cca96e..92155e4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -133,7 +133,8 @@
     }
 
     @Override
-    public void scheduleReplication(List<ILSMComponent> lsmComponents, boolean bulkload, LSMOperationType opType) throws HyracksDataException {
+    public void scheduleReplication(List<ILSMComponent> lsmComponents, boolean bulkload, LSMOperationType opType)
+            throws HyracksDataException {
         ctx.setOperation(IndexOperation.REPLICATE);
         ctx.getComponentsToBeReplicated().clear();
         ctx.getComponentsToBeReplicated().addAll(lsmComponents);
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index f2ec64b..aa0a7bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -42,7 +42,6 @@
     private final TestJobletContext jobletContext;
     private final TaskAttemptId taskId;
     private WorkspaceFileFactory fileFactory;
-
     private Map<Object, IStateObject> stateObjectMap = new HashMap<>();
     private Object sharedObject;
 
@@ -149,12 +148,12 @@
     }
 
     @Override
-    public Object getSharedObject() {
-        return sharedObject;
+    public void setSharedObject(Object object) {
+        sharedObject = object;
     }
 
     @Override
-    public void setSharedObject(Object sharedObject) {
-        this.sharedObject = sharedObject;
+    public Object getSharedObject() {
+        return sharedObject;
     }
 }
