Add Test NodeController, Test Data Generator, and Marker Logs

This test enable creating a node controller for unit test purposes.
The Node controller is identical to the regular node controller
except that it doesn't communicate with a cluster controller at all.

In this change, Test Data Generator is introduced which should
facilitate writing unit test cases which requires data generation.
The change also includes enabling feeds to send progress data. progress
information can then be sent through the pipeline and persisted in the
transaction logs and primary index component. A Unit test case has
been created to test adding progress markers to logs and index
components and then reading them.

The last part of this change is the addition of marker logs and their
callbacks. They enable components to create arbitrary logs and get a
callback when they are written to the transaction logs. Initial set of
unit tests were added for marker logs.

Change-Id: I3b9aa8de758b7d26ca34868b16e5ce693e0c0243
Reviewed-on: https://asterix-gerrit.ics.uci.edu/962
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
index 0f31935..c6c71f6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
@@ -24,6 +24,7 @@
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.ILogMarkerCallback;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.JobId;
@@ -32,12 +33,16 @@
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
 
 public class CommitRuntime implements IPushRuntime {
@@ -52,17 +57,18 @@
     protected final boolean isTemporaryDatasetWriteJob;
     protected final boolean isWriteTransaction;
     protected final long[] longHashes;
-    protected final LogRecord logRecord;
     protected final FrameTupleReference frameTupleReference;
-
-    protected ITransactionContext transactionContext;
-    protected FrameTupleAccessor frameTupleAccessor;
+    protected final IHyracksTaskContext ctx;
     protected final int resourcePartition;
+    protected ITransactionContext transactionContext;
+    protected LogRecord logRecord;
+    protected FrameTupleAccessor frameTupleAccessor;
 
     public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
             boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition) {
-        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
+        this.ctx = ctx;
+        IAsterixAppRuntimeContext runtimeCtx =
+                (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
         this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
         this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
         this.jobId = jobId;
@@ -73,7 +79,6 @@
         this.isWriteTransaction = isWriteTransaction;
         this.resourcePartition = resourcePartition;
         longHashes = new long[2];
-        logRecord = new LogRecord();
     }
 
     @Override
@@ -81,6 +86,9 @@
         try {
             transactionContext = transactionManager.getTransactionContext(jobId, false);
             transactionContext.setWriteTxn(isWriteTransaction);
+            ILogMarkerCallback callback =
+                    TaskUtils.<ILogMarkerCallback> get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
+            logRecord = new LogRecord(callback);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
@@ -113,6 +121,23 @@
                 }
             }
         }
+        VSizeFrame message = TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
+        if (message != null
+                && MessagingFrameTupleAppender.getMessageType(message) == MessagingFrameTupleAppender.MARKER_MESSAGE) {
+            try {
+                formMarkerLogRecords(message.getBuffer());
+                logMgr.log(logRecord);
+            } catch (ACIDException e) {
+                throw new HyracksDataException(e);
+            }
+            message.reset();
+            message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+            message.getBuffer().flip();
+        }
+    }
+
+    private void formMarkerLogRecords(ByteBuffer marker) {
+        TransactionUtil.formMarkerLogRecord(logRecord, transactionContext, datasetId, resourcePartition, marker);
     }
 
     protected void formLogRecord(ByteBuffer buffer, int t) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
new file mode 100644
index 0000000..5c3aefe
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.bootstrap;
+
+import java.io.File;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.asterix.algebra.operators.physical.CommitRuntime;
+import org.apache.asterix.api.common.AsterixAppRuntimeContext;
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.app.external.TestLibrarian;
+import org.apache.asterix.common.api.ILocalResourceMetadata;
+import org.apache.asterix.common.config.AsterixTransactionProperties;
+import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.TransactionSubsystemProvider;
+import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
+import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
+import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import org.apache.asterix.transaction.management.service.logging.LogReader;
+import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
+import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelper;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import org.apache.hyracks.storage.common.file.LocalResource;
+import org.apache.hyracks.test.support.TestUtils;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestNodeController {
+    protected static final Logger LOGGER = Logger.getLogger(TestNodeController.class.getName());
+
+    protected static final String PATH_ACTUAL = "unittest" + File.separator;
+    protected static final String PATH_BASE =
+            StringUtils.join(new String[] { "src", "test", "resources", "nodetests" }, File.separator);
+
+    protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+    protected static AsterixTransactionProperties txnProperties;
+    private static final boolean cleanupOnStart = true;
+    private static final boolean cleanupOnStop = true;
+
+    // Constants
+    public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
+    public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
+    public static final int KB32 = 32768;
+    public static final int PARTITION = 0;
+    public static final double BLOOM_FILTER_FALSE_POSITIVE_RATE = 0.01;
+    public static final TransactionSubsystemProvider TXN_SUBSYSTEM_PROVIDER = new TransactionSubsystemProvider();
+    // Mutables
+    private JobId jobId;
+    private long jobCounter = 0L;
+    private IHyracksJobletContext jobletCtx;
+
+    public TestNodeController() throws AsterixException, HyracksException, ACIDException {
+    }
+
+    public void init() throws Exception {
+        try {
+            File outdir = new File(PATH_ACTUAL);
+            outdir.mkdirs();
+            // remove library directory
+            TestLibrarian.removeLibraryDir();
+            ExecutionTestUtil.setUp(cleanupOnStart);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
+        jobletCtx = Mockito.mock(IHyracksJobletContext.class);
+        Mockito.when(jobletCtx.getApplicationContext())
+                .thenReturn(AsterixHyracksIntegrationUtil.ncs[0].getApplicationContext());
+        Mockito.when(jobletCtx.getJobId()).thenAnswer(new Answer<JobId>() {
+            @Override
+            public JobId answer(InvocationOnMock invocation) throws Throwable {
+                return jobId;
+            }
+        });
+    }
+
+    public void deInit() throws Exception {
+        TestLibrarian.removeLibraryDir();
+        ExecutionTestUtil.tearDown(cleanupOnStop);
+    }
+
+    public org.apache.asterix.common.transactions.JobId getTxnJobId() {
+        return new org.apache.asterix.common.transactions.JobId((int) jobId.getId());
+    }
+
+    public AsterixLSMInsertDeleteOperatorNodePushable getInsertPipeline(IHyracksTaskContext ctx, Dataset dataset,
+            IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields)
+            throws AlgebricksException {
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
+                mergePolicyFactory, mergePolicyProperties, filterFields);
+        IndexOperation op = IndexOperation.INSERT;
+        IModificationOperationCallbackFactory modOpCallbackFactory =
+                new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(), dataset.getDatasetId(),
+                        primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, op, ResourceType.LSM_BTREE, true);
+        AsterixLSMTreeInsertDeleteOperatorDescriptor indexOpDesc =
+                getInsertOpratorDesc(primaryIndexInfo, modOpCallbackFactory);
+        LSMBTreeDataflowHelperFactory dataflowHelperFactory =
+                getPrimaryIndexDataflowHelperFactory(ctx, primaryIndexInfo);
+        Mockito.when(indexOpDesc.getIndexDataflowHelperFactory()).thenReturn(dataflowHelperFactory);
+        IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
+        AsterixLSMInsertDeleteOperatorNodePushable insertOp =
+                new AsterixLSMInsertDeleteOperatorNodePushable(indexOpDesc, ctx, PARTITION,
+                        primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDescProvider, op, true);
+        CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(), dataset.getDatasetId(),
+                primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION);
+        insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
+        commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
+        return insertOp;
+    }
+
+    public IPushRuntime getFullScanPipeline(IFrameWriter countOp, IHyracksTaskContext ctx, Dataset dataset,
+            IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
+            NoMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields)
+            throws HyracksDataException, AlgebricksException {
+        IPushRuntime emptyTupleOp = new EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx);
+        JobSpecification spec = new JobSpecification();
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
+                mergePolicyFactory, mergePolicyProperties, filterFields);
+        LSMBTreeDataflowHelperFactory indexDataflowHelperFactory =
+                getPrimaryIndexDataflowHelperFactory(ctx, primaryIndexInfo);
+        BTreeSearchOperatorDescriptor searchOpDesc = new BTreeSearchOperatorDescriptor(spec, primaryIndexInfo.rDesc,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                primaryIndexInfo.fileSplitProvider, primaryIndexInfo.primaryIndexTypeTraits,
+                primaryIndexInfo.primaryIndexComparatorFactories, primaryIndexInfo.primaryIndexBloomFilterKeyFields,
+                primaryIndexInfo.primaryKeyIndexes, primaryIndexInfo.primaryKeyIndexes, true, true,
+                indexDataflowHelperFactory, false, false, null, NoOpOperationCallbackFactory.INSTANCE, filterFields,
+                filterFields);
+        BTreeSearchOperatorNodePushable searchOp = new BTreeSearchOperatorNodePushable(searchOpDesc, ctx, 0,
+                primaryIndexInfo.getSearchRecordDescriptorProvider(), /*primaryIndexInfo.primaryKeyIndexes*/null,
+                /*primaryIndexInfo.primaryKeyIndexes*/null, true, true, filterFields, filterFields);
+        emptyTupleOp.setFrameWriter(0, searchOp,
+                primaryIndexInfo.getSearchRecordDescriptorProvider().getInputRecordDescriptor(null, 0));
+        searchOp.setOutputFrameWriter(0, countOp, primaryIndexInfo.rDesc);
+        return emptyTupleOp;
+    }
+
+    public LogReader getTransactionLogReader(boolean isRecoveryMode) {
+        return (LogReader) getTransactionSubsystem().getLogManager().getLogReader(isRecoveryMode);
+    }
+
+    public JobId newJobId() {
+        jobId = new JobId(jobCounter++);
+        return jobId;
+    }
+
+    public AsterixLSMTreeInsertDeleteOperatorDescriptor getInsertOpratorDesc(PrimaryIndexInfo primaryIndexInfo,
+            IModificationOperationCallbackFactory modOpCallbackFactory) {
+        AsterixLSMTreeInsertDeleteOperatorDescriptor indexOpDesc =
+                Mockito.mock(AsterixLSMTreeInsertDeleteOperatorDescriptor.class);
+        Mockito.when(indexOpDesc.getLifecycleManagerProvider())
+                .thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+        Mockito.when(indexOpDesc.getStorageManager()).thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+        Mockito.when(indexOpDesc.getFileSplitProvider()).thenReturn(primaryIndexInfo.fileSplitProvider);
+        Mockito.when(indexOpDesc.getLocalResourceFactoryProvider())
+                .thenReturn(primaryIndexInfo.localResourceFactoryProvider);
+        Mockito.when(indexOpDesc.getTreeIndexTypeTraits()).thenReturn(primaryIndexInfo.primaryIndexTypeTraits);
+        Mockito.when(indexOpDesc.getTreeIndexComparatorFactories())
+                .thenReturn(primaryIndexInfo.primaryIndexComparatorFactories);
+        Mockito.when(indexOpDesc.getTreeIndexBloomFilterKeyFields())
+                .thenReturn(primaryIndexInfo.primaryIndexBloomFilterKeyFields);
+        Mockito.when(indexOpDesc.getModificationOpCallbackFactory()).thenReturn(modOpCallbackFactory);
+        return indexOpDesc;
+    }
+
+    public TreeIndexCreateOperatorDescriptor getIndexCreateOpDesc(PrimaryIndexInfo primaryIndexInfo) {
+        TreeIndexCreateOperatorDescriptor indexOpDesc = Mockito.mock(TreeIndexCreateOperatorDescriptor.class);
+        Mockito.when(indexOpDesc.getLifecycleManagerProvider())
+                .thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+        Mockito.when(indexOpDesc.getStorageManager()).thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+        Mockito.when(indexOpDesc.getFileSplitProvider()).thenReturn(primaryIndexInfo.fileSplitProvider);
+        Mockito.when(indexOpDesc.getLocalResourceFactoryProvider())
+                .thenReturn(primaryIndexInfo.localResourceFactoryProvider);
+        Mockito.when(indexOpDesc.getTreeIndexTypeTraits()).thenReturn(primaryIndexInfo.primaryIndexTypeTraits);
+        Mockito.when(indexOpDesc.getTreeIndexComparatorFactories())
+                .thenReturn(primaryIndexInfo.primaryIndexComparatorFactories);
+        Mockito.when(indexOpDesc.getTreeIndexBloomFilterKeyFields())
+                .thenReturn(primaryIndexInfo.primaryIndexBloomFilterKeyFields);
+        return indexOpDesc;
+    }
+
+    public ConstantFileSplitProvider getFileSplitProvider(Dataset dataset) {
+        FileSplit fileSplit = new FileSplit(AsterixHyracksIntegrationUtil.ncs[0].getId(),
+                dataset.getDataverseName() + File.separator + dataset.getDatasetName());
+        return new ConstantFileSplitProvider(new FileSplit[] { fileSplit });
+    }
+
+    public ILocalResourceFactoryProvider getPrimaryIndexLocalResourceMetadataProvider(Dataset dataset,
+            ITypeTraits[] primaryIndexTypeTraits, IBinaryComparatorFactory[] primaryIndexComparatorFactories,
+            int[] primaryIndexBloomFilterKeyFields, ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits,
+            IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields) {
+        ILocalResourceMetadata localResourceMetadata =
+                new LSMBTreeLocalResourceMetadata(primaryIndexTypeTraits, primaryIndexComparatorFactories,
+                        primaryIndexBloomFilterKeyFields, true, dataset.getDatasetId(), mergePolicyFactory,
+                        mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, filterFields);
+        ILocalResourceFactoryProvider localResourceFactoryProvider =
+                new PersistentLocalResourceFactoryProvider(localResourceMetadata, LocalResource.LSMBTreeResource);
+        return localResourceFactoryProvider;
+    }
+
+    public LSMBTreeDataflowHelper getPrimaryIndexDataflowHelper(IHyracksTaskContext ctx,
+            PrimaryIndexInfo primaryIndexInfo, TreeIndexCreateOperatorDescriptor indexOpDesc)
+            throws AlgebricksException {
+        LSMBTreeDataflowHelperFactory dataflowHelperFactory = new LSMBTreeDataflowHelperFactory(
+                new AsterixVirtualBufferCacheProvider(primaryIndexInfo.dataset.getDatasetId()),
+                primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties,
+                new PrimaryIndexOperationTrackerProvider(primaryIndexInfo.dataset.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                BLOOM_FILTER_FALSE_POSITIVE_RATE, true, primaryIndexInfo.filterTypeTraits,
+                primaryIndexInfo.filterCmpFactories, primaryIndexInfo.btreeFields, primaryIndexInfo.filterFields, true);
+        IndexDataflowHelper dataflowHelper =
+                dataflowHelperFactory.createIndexDataflowHelper(indexOpDesc, ctx, PARTITION);
+        return (LSMBTreeDataflowHelper) dataflowHelper;
+    }
+
+    public LSMBTreeDataflowHelperFactory getPrimaryIndexDataflowHelperFactory(IHyracksTaskContext ctx,
+            PrimaryIndexInfo primaryIndexInfo) throws AlgebricksException {
+        return new LSMBTreeDataflowHelperFactory(
+                new AsterixVirtualBufferCacheProvider(primaryIndexInfo.dataset.getDatasetId()),
+                primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties,
+                new PrimaryIndexOperationTrackerProvider(primaryIndexInfo.dataset.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                BLOOM_FILTER_FALSE_POSITIVE_RATE, true, primaryIndexInfo.filterTypeTraits,
+                primaryIndexInfo.filterCmpFactories, primaryIndexInfo.btreeFields, primaryIndexInfo.filterFields, true);
+    }
+
+    public LSMBTreeDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes,
+            ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, int[] filterFields)
+            throws AlgebricksException, HyracksDataException {
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
+                mergePolicyFactory, mergePolicyProperties, filterFields);
+        TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo);
+        return getPrimaryIndexDataflowHelper(createTestContext(), primaryIndexInfo, indexOpDesc);
+    }
+
+    public void createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
+            ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+            int[] filterFields) throws AlgebricksException, HyracksDataException {
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
+                mergePolicyFactory, mergePolicyProperties, filterFields);
+        TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo);
+        LSMBTreeDataflowHelper dataflowHelper =
+                getPrimaryIndexDataflowHelper(createTestContext(), primaryIndexInfo, indexOpDesc);
+        dataflowHelper.create();
+    }
+
+    private int[] createPrimaryIndexBloomFilterFields(int length) {
+        int[] primaryIndexBloomFilterKeyFields = new int[length];
+        for (int j = 0; j < length; ++j) {
+            primaryIndexBloomFilterKeyFields[j] = j;
+        }
+        return primaryIndexBloomFilterKeyFields;
+    }
+
+    private IBinaryComparatorFactory[] createPrimaryIndexComparatorFactories(IAType[] primaryKeyTypes) {
+        IBinaryComparatorFactory[] primaryIndexComparatorFactories =
+                new IBinaryComparatorFactory[primaryKeyTypes.length];
+        for (int j = 0; j < primaryKeyTypes.length; ++j) {
+            primaryIndexComparatorFactories[j] =
+                    AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(primaryKeyTypes[j], true);
+        }
+        return primaryIndexComparatorFactories;
+    }
+
+    private ISerializerDeserializer<?>[] createPrimaryIndexSerdes(int primaryIndexNumOfTupleFields,
+            IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType) {
+        int i = 0;
+        ISerializerDeserializer<?>[] primaryIndexSerdes = new ISerializerDeserializer<?>[primaryIndexNumOfTupleFields];
+        for (; i < primaryKeyTypes.length; i++) {
+            primaryIndexSerdes[i] =
+                    AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(primaryKeyTypes[i]);
+        }
+        primaryIndexSerdes[i++] = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(recordType);
+        if (metaType != null) {
+            primaryIndexSerdes[i] = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
+        }
+        return primaryIndexSerdes;
+    }
+
+    private ITypeTraits[] createPrimaryIndexTypeTraits(int primaryIndexNumOfTupleFields, IAType[] primaryKeyTypes,
+            ARecordType recordType, ARecordType metaType) {
+        ITypeTraits[] primaryIndexTypeTraits = new ITypeTraits[primaryIndexNumOfTupleFields];
+        int i = 0;
+        for (; i < primaryKeyTypes.length; i++) {
+            primaryIndexTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i]);
+        }
+        primaryIndexTypeTraits[i++] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(recordType);
+        if (metaType != null) {
+            primaryIndexTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(metaType);
+        }
+        return primaryIndexTypeTraits;
+    }
+
+    public IHyracksTaskContext createTestContext() throws HyracksDataException {
+        IHyracksTaskContext ctx = TestUtils.create(KB32);
+        TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
+        ctx = Mockito.spy(ctx);
+        Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
+        Mockito.when(ctx.getIOManager())
+                .thenReturn(AsterixHyracksIntegrationUtil.ncs[0].getRootContext().getIOManager());
+        return ctx;
+    }
+
+    public TransactionSubsystem getTransactionSubsystem() {
+        return (TransactionSubsystem) ((AsterixAppRuntimeContext) AsterixHyracksIntegrationUtil.ncs[0]
+                .getApplicationContext().getApplicationObject()).getTransactionSubsystem();
+    }
+
+    public ITransactionManager getTransactionManager() {
+        return getTransactionSubsystem().getTransactionManager();
+    }
+
+    public AsterixAppRuntimeContext getAppRuntimeContext() {
+        return (AsterixAppRuntimeContext) AsterixHyracksIntegrationUtil.ncs[0].getApplicationContext()
+                .getApplicationObject();
+    }
+
+    public DatasetLifecycleManager getDatasetLifecycleManager() {
+        return (DatasetLifecycleManager) getAppRuntimeContext().getDatasetLifecycleManager();
+    }
+
+    @SuppressWarnings("unused")
+    private class PrimaryIndexInfo {
+        private Dataset dataset;
+        private IAType[] primaryKeyTypes;
+        private ARecordType recordType;
+        private ARecordType metaType;
+        private ILSMMergePolicyFactory mergePolicyFactory;
+        private Map<String, String> mergePolicyProperties;
+        private int[] filterFields;
+        private int primaryIndexNumOfTupleFields;
+        private IBinaryComparatorFactory[] primaryIndexComparatorFactories;
+        private ITypeTraits[] primaryIndexTypeTraits;
+        private ISerializerDeserializer<?>[] primaryIndexSerdes;
+        private int[] primaryIndexBloomFilterKeyFields;
+        private ITypeTraits[] filterTypeTraits;
+        private IBinaryComparatorFactory[] filterCmpFactories;
+        private int[] btreeFields;
+        private ILocalResourceFactoryProvider localResourceFactoryProvider;
+        private ConstantFileSplitProvider fileSplitProvider;
+        private RecordDescriptor rDesc;
+        private int[] primaryIndexInsertFieldsPermutations;
+        private int[] primaryKeyIndexes;
+
+        public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
+                ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+                int[] filterFields) throws AlgebricksException {
+            this.dataset = dataset;
+            this.primaryKeyTypes = primaryKeyTypes;
+            this.recordType = recordType;
+            this.metaType = metaType;
+            this.mergePolicyFactory = mergePolicyFactory;
+            this.mergePolicyProperties = mergePolicyProperties;
+            this.filterFields = filterFields;
+            primaryIndexNumOfTupleFields = primaryKeyTypes.length + (1 + ((metaType == null) ? 0 : 1));
+            primaryIndexTypeTraits =
+                    createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
+            primaryIndexComparatorFactories = createPrimaryIndexComparatorFactories(primaryKeyTypes);
+            primaryIndexBloomFilterKeyFields = createPrimaryIndexBloomFilterFields(primaryKeyTypes.length);
+            filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recordType);
+            filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, recordType,
+                    NonTaggedDataFormat.INSTANCE.getBinaryComparatorFactoryProvider());
+            btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+            localResourceFactoryProvider = getPrimaryIndexLocalResourceMetadataProvider(dataset, primaryIndexTypeTraits,
+                    primaryIndexComparatorFactories, primaryIndexBloomFilterKeyFields, mergePolicyFactory,
+                    mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, filterFields);
+            fileSplitProvider = getFileSplitProvider(dataset);
+            primaryIndexSerdes =
+                    createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
+            rDesc = new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits);
+            primaryIndexInsertFieldsPermutations = new int[primaryIndexNumOfTupleFields];
+            for (int i = 0; i < primaryIndexNumOfTupleFields; i++) {
+                primaryIndexInsertFieldsPermutations[i] = i;
+            }
+            primaryKeyIndexes = new int[primaryKeyTypes.length];
+            for (int i = 0; i < primaryKeyIndexes.length; i++) {
+                primaryKeyIndexes[i] = i;
+            }
+        }
+
+        public IRecordDescriptorProvider getInsertRecordDescriptorProvider() {
+            IRecordDescriptorProvider rDescProvider = Mockito.mock(IRecordDescriptorProvider.class);
+            Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), Mockito.anyInt())).thenReturn(rDesc);
+            return rDescProvider;
+        }
+
+        public IRecordDescriptorProvider getSearchRecordDescriptorProvider() {
+            ITypeTraits[] primaryKeyTypeTraits = new ITypeTraits[primaryKeyTypes.length];
+            ISerializerDeserializer<?>[] primaryKeySerdes = new ISerializerDeserializer<?>[primaryKeyTypes.length];
+            for (int i = 0; i < primaryKeyTypes.length; i++) {
+                primaryKeyTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i]);
+                primaryKeySerdes[i] =
+                        AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(primaryKeyTypes[i]);
+            }
+            RecordDescriptor searcgRecDesc = new RecordDescriptor(primaryKeySerdes, primaryKeyTypeTraits);
+            IRecordDescriptorProvider rDescProvider = Mockito.mock(IRecordDescriptorProvider.class);
+            Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), Mockito.anyInt()))
+                    .thenReturn(searcgRecDesc);
+            return rDescProvider;
+        }
+    }
+
+    public RecordDescriptor getSearchOutputDesc(IAType[] keyTypes, ARecordType recordType, ARecordType metaType) {
+        int primaryIndexNumOfTupleFields = keyTypes.length + (1 + ((metaType == null) ? 0 : 1));
+        ITypeTraits[] primaryIndexTypeTraits =
+                createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, keyTypes, recordType, metaType);
+        ISerializerDeserializer<?>[] primaryIndexSerdes =
+                createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, keyTypes, recordType, metaType);
+        return new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits);
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ABooleanFieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ABooleanFieldValueGenerator.java
new file mode 100644
index 0000000..2eba473
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ABooleanFieldValueGenerator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.om.types.ATypeTag;
+
+public class ABooleanFieldValueGenerator implements IAsterixFieldValueGenerator<Boolean> {
+    private final GenerationFunction generationFunction;
+    private final boolean tagged;
+    private final Random rand = new Random();
+    private boolean value;
+
+    public ABooleanFieldValueGenerator(GenerationFunction generationFunction, boolean tagged) {
+        this.generationFunction = generationFunction;
+        this.tagged = tagged;
+        switch (generationFunction) {
+            case DECREASING:
+                value = true;
+                break;
+            case DETERMINISTIC:
+                value = false;
+                break;
+            case INCREASING:
+                value = false;
+                break;
+            case RANDOM:
+                value = rand.nextBoolean();
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
+        }
+        generate();
+        out.writeBoolean(value);
+    }
+
+    private void generate() {
+        switch (generationFunction) {
+            case DETERMINISTIC:
+                value = !value;
+                break;
+            case RANDOM:
+                value = rand.nextBoolean();
+                break;
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public Boolean next() throws IOException {
+        generate();
+        return value;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
+        }
+        out.writeBoolean(value);
+    }
+
+    @Override
+    public Boolean get() throws IOException {
+        return value;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ADoubleFieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ADoubleFieldValueGenerator.java
new file mode 100644
index 0000000..e698676
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ADoubleFieldValueGenerator.java
@@ -0,0 +1,153 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.om.types.ATypeTag;
+
+public class ADoubleFieldValueGenerator implements IAsterixFieldValueGenerator<Double> {
+    private static final double START = 1000000000.0;
+    private static final int BATCH_SIZE = 1000;
+    private static final double INCREMENT = 0.1;
+    private final GenerationFunction generationFunction;
+    private final boolean unique;
+    private final boolean tagged;
+    private final Random rand = new Random();
+    private double value;
+    private int cycle;
+    private List<Double> uniques;
+    private Iterator<Double> iterator;
+
+    public ADoubleFieldValueGenerator(GenerationFunction generationFunction, boolean unique, boolean tagged) {
+        this.generationFunction = generationFunction;
+        this.unique = unique;
+        this.tagged = tagged;
+        switch (generationFunction) {
+            case DECREASING:
+                value = Integer.MAX_VALUE;
+                break;
+            case DETERMINISTIC:
+                value = START;
+                break;
+            case INCREASING:
+                value = 0;
+                break;
+            case RANDOM:
+                if (unique) {
+                    double lowerBound = START;
+                    double upperBound = lowerBound + (BATCH_SIZE * INCREMENT);
+                    uniques = new ArrayList<>();
+                    while (lowerBound < upperBound) {
+                        uniques.add(lowerBound);
+                        lowerBound += INCREMENT;
+                    }
+                    Collections.shuffle(uniques);
+                    iterator = uniques.iterator();
+                }
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_DOUBLE_TYPE_TAG);
+        }
+        generate();
+        out.writeDouble(value);
+    }
+
+    private void generate() {
+        switch (generationFunction) {
+            case DECREASING:
+                value -= INCREMENT;
+            case DETERMINISTIC:
+                if (value >= START) {
+                    cycle++;
+                    value = START - (cycle * INCREMENT);
+                } else {
+                    value = START + (cycle * INCREMENT);
+                }
+                break;
+            case INCREASING:
+                value += INCREMENT;
+                break;
+            case RANDOM:
+                if (unique) {
+                    if (iterator.hasNext()) {
+                        value = iterator.next();
+                    } else {
+                        // generate next patch
+                        cycle++;
+                        double lowerBound;
+                        if (cycle % 2 == 0) {
+                            // even
+                            lowerBound = START + ((cycle / 2) * (BATCH_SIZE * INCREMENT));
+                        } else {
+                            // odd
+                            lowerBound = START - ((cycle / 2 + 1) * (BATCH_SIZE * INCREMENT));
+                        }
+                        double upperBound = lowerBound + (BATCH_SIZE * INCREMENT);
+                        uniques.clear();
+                        while (lowerBound < upperBound) {
+                            uniques.add(lowerBound);
+                            lowerBound += INCREMENT;
+                        }
+                        Collections.shuffle(uniques);
+                        iterator = uniques.iterator();
+                        value = iterator.next();
+                    }
+                } else {
+                    value = rand.nextDouble();
+                }
+                break;
+            default:
+                break;
+
+        }
+    }
+
+    @Override
+    public Double next() throws IOException {
+        generate();
+        return value;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_DOUBLE_TYPE_TAG);
+        }
+        out.writeDouble(value);
+    }
+
+    @Override
+    public Double get() throws IOException {
+        return value;
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java
new file mode 100644
index 0000000..7c6556b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java
@@ -0,0 +1,152 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.om.types.ATypeTag;
+
+public class AInt32FieldValueGenerator implements IAsterixFieldValueGenerator<Integer> {
+    private static final int START = 1000000000;
+    private static final int BATCH_SIZE = 1000;
+    private final GenerationFunction generationFunction;
+    private final boolean unique;
+    private final boolean tagged;
+    private final Random rand = new Random();
+    private int value;
+    private int cycle;
+    private List<Integer> uniques;
+    private Iterator<Integer> iterator;
+
+    public AInt32FieldValueGenerator(GenerationFunction generationFunction, boolean unique, boolean tagged) {
+        this.generationFunction = generationFunction;
+        this.unique = unique;
+        this.tagged = tagged;
+        switch (generationFunction) {
+            case DECREASING:
+                value = Integer.MAX_VALUE;
+                break;
+            case DETERMINISTIC:
+                value = START;
+                break;
+            case INCREASING:
+                value = 0;
+                break;
+            case RANDOM:
+                if (unique) {
+                    int lowerBound = START;
+                    int upperBound = lowerBound + BATCH_SIZE;
+                    uniques = new ArrayList<>();
+                    while (lowerBound < upperBound) {
+                        uniques.add(lowerBound);
+                        lowerBound++;
+                    }
+                    Collections.shuffle(uniques);
+                    iterator = uniques.iterator();
+                }
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
+        }
+        generate();
+        out.writeInt(value);
+    }
+
+    private void generate() {
+        switch (generationFunction) {
+            case DECREASING:
+                value--;
+            case DETERMINISTIC:
+                if (value >= START) {
+                    cycle++;
+                    value = START - cycle;
+                } else {
+                    value = START + cycle;
+                }
+                break;
+            case INCREASING:
+                value++;
+                break;
+            case RANDOM:
+                if (unique) {
+                    if (iterator.hasNext()) {
+                        value = iterator.next();
+                    } else {
+                        // generate next patch
+                        cycle++;
+                        int lowerBound;
+                        if (cycle % 2 == 0) {
+                            // even
+                            lowerBound = START + ((cycle / 2) * BATCH_SIZE);
+                        } else {
+                            // odd
+                            lowerBound = START - ((cycle / 2 + 1) * BATCH_SIZE);
+                        }
+                        int upperBound = lowerBound + BATCH_SIZE;
+                        uniques.clear();
+                        while (lowerBound < upperBound) {
+                            uniques.add(lowerBound);
+                            lowerBound++;
+                        }
+                        Collections.shuffle(uniques);
+                        iterator = uniques.iterator();
+                        value = iterator.next();
+                    }
+                } else {
+                    value = rand.nextInt();
+                }
+                break;
+            default:
+                break;
+
+        }
+    }
+
+    @Override
+    public Integer next() throws IOException {
+        generate();
+        return value;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
+        }
+        out.writeInt(value);
+    }
+
+    @Override
+    public Integer get() throws IOException {
+        return value;
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt64FieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt64FieldValueGenerator.java
new file mode 100644
index 0000000..2a2496e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt64FieldValueGenerator.java
@@ -0,0 +1,152 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.om.types.ATypeTag;
+
+public class AInt64FieldValueGenerator implements IAsterixFieldValueGenerator<Long> {
+    private static final long START = 4500000000000000000L;
+    private static final long BATCH_SIZE = 1000L;
+    private final GenerationFunction generationFunction;
+    private final boolean unique;
+    private final boolean tagged;
+    private final Random rand = new Random();
+    private long value;
+    private int cycle;
+    private List<Long> uniques;
+    private Iterator<Long> iterator;
+
+    public AInt64FieldValueGenerator(GenerationFunction generationFunction, boolean unique, boolean tagged) {
+        this.generationFunction = generationFunction;
+        this.unique = unique;
+        this.tagged = tagged;
+        switch (generationFunction) {
+            case DECREASING:
+                value = Long.MAX_VALUE;
+                break;
+            case DETERMINISTIC:
+                value = START;
+                break;
+            case INCREASING:
+                value = 0L;
+                break;
+            case RANDOM:
+                if (unique) {
+                    long lowerBound = START;
+                    long upperBound = lowerBound + BATCH_SIZE;
+                    uniques = new ArrayList<>();
+                    while (lowerBound < upperBound) {
+                        uniques.add(lowerBound);
+                        lowerBound++;
+                    }
+                    Collections.shuffle(uniques);
+                    iterator = uniques.iterator();
+                }
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+        }
+        generate();
+        out.writeLong(value);
+    }
+
+    private void generate() {
+        switch (generationFunction) {
+            case DECREASING:
+                value--;
+            case DETERMINISTIC:
+                if (value >= START) {
+                    cycle++;
+                    value = START - cycle;
+                } else {
+                    value = START + cycle;
+                }
+                break;
+            case INCREASING:
+                value++;
+                break;
+            case RANDOM:
+                if (unique) {
+                    if (iterator.hasNext()) {
+                        value = iterator.next();
+                    } else {
+                        // generate next patch
+                        cycle++;
+                        long lowerBound;
+                        if (cycle % 2 == 0) {
+                            // even
+                            lowerBound = START + ((cycle / 2) * BATCH_SIZE);
+                        } else {
+                            // odd
+                            lowerBound = START - ((cycle / 2 + 1) * BATCH_SIZE);
+                        }
+                        long upperBound = lowerBound + BATCH_SIZE;
+                        uniques.clear();
+                        while (lowerBound < upperBound) {
+                            uniques.add(lowerBound);
+                            lowerBound++;
+                        }
+                        Collections.shuffle(uniques);
+                        iterator = uniques.iterator();
+                        value = iterator.next();
+                    }
+                } else {
+                    value = rand.nextLong();
+                }
+                break;
+            default:
+                break;
+
+        }
+    }
+
+    @Override
+    public Long next() throws IOException {
+        generate();
+        return value;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+        }
+        out.writeLong(value);
+    }
+
+    @Override
+    public Long get() throws IOException {
+        return value;
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ARecordValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ARecordValueGenerator.java
new file mode 100644
index 0000000..df717e9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ARecordValueGenerator.java
@@ -0,0 +1,119 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.test.common.TestTupleReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class ARecordValueGenerator implements IAsterixFieldValueGenerator<ITupleReference> {
+    private final IAsterixFieldValueGenerator<?>[] generators;
+    private final boolean tagged;
+    private final ARecordType recordType;
+    private final RecordBuilder recBuilder;
+    private final ArrayBackedValueStorage fieldValueBuffer;
+    private final TestTupleReference tuple;
+
+    public ARecordValueGenerator(GenerationFunction[] generationFunctions, ARecordType recordType, boolean[] uniques,
+            boolean tagged) {
+        this.tagged = tagged;
+        this.recordType = recordType;
+        tuple = new TestTupleReference(1);
+        fieldValueBuffer = new ArrayBackedValueStorage();
+        recBuilder = new RecordBuilder();
+        recBuilder.reset(recordType);
+        recBuilder.init();
+        generators = new IAsterixFieldValueGenerator<?>[recordType.getFieldTypes().length];
+        for (int i = 0; i < recordType.getFieldTypes().length; i++) {
+            ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
+            switch (tag) {
+                case BOOLEAN:
+                    generators[i] = new ABooleanFieldValueGenerator(generationFunctions[i], true);
+                    break;
+                case DOUBLE:
+                    generators[i] = new ADoubleFieldValueGenerator(generationFunctions[i], uniques[i], true);
+                    break;
+                case INT32:
+                    generators[i] = new AInt32FieldValueGenerator(generationFunctions[i], uniques[i], true);
+                    break;
+                case INT64:
+                    generators[i] = new AInt64FieldValueGenerator(generationFunctions[i], uniques[i], true);
+                    break;
+                case STRING:
+                    generators[i] = new AStringFieldValueGenerator(generationFunctions[i], uniques[i], true);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unsupported type " + tag);
+            }
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        recBuilder.reset(recordType);
+        recBuilder.init();
+        for (int i = 0; i < generators.length; i++) {
+            fieldValueBuffer.reset();
+            generators[i].next(fieldValueBuffer.getDataOutput());
+            recBuilder.addField(i, fieldValueBuffer);
+        }
+        recBuilder.write(out, tagged);
+    }
+
+    @Override
+    public ITupleReference next() throws IOException {
+        tuple.reset();
+        next(tuple.getFields()[0].getDataOutput());
+        return tuple;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        recBuilder.reset(recordType);
+        recBuilder.init();
+        for (int i = 0; i < generators.length; i++) {
+            fieldValueBuffer.reset();
+            generators[i].get(fieldValueBuffer.getDataOutput());
+            recBuilder.addField(i, fieldValueBuffer);
+        }
+        recBuilder.write(out, tagged);
+    }
+
+    @Override
+    public ITupleReference get() throws IOException {
+        tuple.reset();
+        get(tuple.getFields()[0].getDataOutput());
+        return tuple;
+    }
+
+    public void get(int i, DataOutput out) throws IOException {
+        generators[i].get(out);
+    }
+
+    public Object get(int i) throws IOException {
+        return generators[i].get();
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AStringFieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AStringFieldValueGenerator.java
new file mode 100644
index 0000000..5ee6d40
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AStringFieldValueGenerator.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+public class AStringFieldValueGenerator implements IAsterixFieldValueGenerator<String> {
+    private static final String PREFIX = "A String Value #";
+    private static final int START = 1000000000;
+    private static final int BATCH_SIZE = 1000;
+    private final GenerationFunction generationFunction;
+    private final boolean unique;
+    private final boolean tagged;
+    private final Random rand = new Random();
+    private int value;
+    private int cycle;
+    private List<Integer> uniques;
+    private Iterator<Integer> iterator;
+    private String aString;
+    private UTF8StringSerializerDeserializer stringSerde =
+            new UTF8StringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader());
+
+    public AStringFieldValueGenerator(GenerationFunction generationFunction, boolean unique, boolean tagged) {
+        this.generationFunction = generationFunction;
+        this.unique = unique;
+        this.tagged = tagged;
+        switch (generationFunction) {
+            case DECREASING:
+                value = Integer.MAX_VALUE;
+                break;
+            case DETERMINISTIC:
+                value = START;
+                break;
+            case INCREASING:
+                value = 0;
+                break;
+            case RANDOM:
+                if (unique) {
+                    int lowerBound = START;
+                    int upperBound = lowerBound + BATCH_SIZE;
+                    uniques = new ArrayList<>();
+                    while (lowerBound < upperBound) {
+                        uniques.add(lowerBound);
+                        lowerBound++;
+                    }
+                    Collections.shuffle(uniques);
+                    iterator = uniques.iterator();
+                }
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
+        }
+        generate();
+        stringSerde.serialize(aString, out);
+    }
+
+    private void generate() {
+        switch (generationFunction) {
+            case DECREASING:
+                value--;
+            case DETERMINISTIC:
+                if (value >= START) {
+                    cycle++;
+                    value = START - cycle;
+                } else {
+                    value = START + cycle;
+                }
+                break;
+            case INCREASING:
+                value++;
+                break;
+            case RANDOM:
+                if (unique) {
+                    if (iterator.hasNext()) {
+                        value = iterator.next();
+                    } else {
+                        // generate next patch
+                        cycle++;
+                        int lowerBound;
+                        if (cycle % 2 == 0) {
+                            // even
+                            lowerBound = START + ((cycle / 2) * BATCH_SIZE);
+                        } else {
+                            // odd
+                            lowerBound = START - ((cycle / 2 + 1) * BATCH_SIZE);
+                        }
+                        int upperBound = lowerBound + BATCH_SIZE;
+                        uniques.clear();
+                        while (lowerBound < upperBound) {
+                            uniques.add(lowerBound);
+                            lowerBound++;
+                        }
+                        Collections.shuffle(uniques);
+                        iterator = uniques.iterator();
+                        value = iterator.next();
+                    }
+                } else {
+                    value = rand.nextInt();
+                }
+                break;
+            default:
+                break;
+        }
+        aString = PREFIX + String.format("%08d", value);
+    }
+
+    @Override
+    public String next() throws IOException {
+        generate();
+        return aString;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
+        }
+        stringSerde.serialize(aString, out);
+    }
+
+    @Override
+    public String get() throws IOException {
+        return aString;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/IAsterixFieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/IAsterixFieldValueGenerator.java
new file mode 100644
index 0000000..17bdcf8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/IAsterixFieldValueGenerator.java
@@ -0,0 +1,48 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public interface IAsterixFieldValueGenerator<T> {
+    /**
+     * @param out
+     * @throws IOException
+     */
+    public void next(DataOutput out) throws IOException;
+
+    /**
+     * @return
+     * @throws IOException
+     */
+    public T next() throws IOException;
+
+    /**
+     * @param out
+     * @throws IOException
+     */
+    public void get(DataOutput out) throws IOException;
+
+    /**
+     * @return
+     * @throws IOException
+     */
+    public T get() throws IOException;
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TestTupleCounterFrameWriter.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TestTupleCounterFrameWriter.java
new file mode 100644
index 0000000..ee3de51
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TestTupleCounterFrameWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.data.gen;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.test.CountAnswer;
+import org.apache.hyracks.api.test.TestFrameWriter;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class TestTupleCounterFrameWriter extends TestFrameWriter {
+
+    private final FrameTupleAccessor accessor;
+    private int count = 0;
+
+    public TestTupleCounterFrameWriter(RecordDescriptor recordDescriptor, CountAnswer openAnswer,
+            CountAnswer nextAnswer, CountAnswer flushAnswer, CountAnswer failAnswer, CountAnswer closeAnswer,
+            boolean deepCopyInputFrames) {
+        super(openAnswer, nextAnswer, flushAnswer, failAnswer, closeAnswer, deepCopyInputFrames);
+        accessor = new FrameTupleAccessor(recordDescriptor);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        super.nextFrame(buffer);
+        accessor.reset(buffer);
+        count += accessor.getTupleCount();
+    }
+
+    public int getCount() {
+        return count;
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TupleGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TupleGenerator.java
new file mode 100644
index 0000000..98c57a0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TupleGenerator.java
@@ -0,0 +1,126 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.IOException;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class TupleGenerator {
+
+    private final int[] keyIndexes;
+    private final int[] keyIndicators;
+    private final ARecordValueGenerator recordGenerator;
+    private final ARecordValueGenerator metaGenerator;
+    private final TestTupleReference tuple;
+
+    public enum GenerationFunction {
+        RANDOM,
+        DETERMINISTIC,
+        INCREASING,
+        DECREASING
+    }
+
+    /**
+     * @param recordType
+     * @param metaType
+     * @param key
+     * @param keyIndexes
+     * @param keyIndicators
+     * @param recordGeneration
+     * @param uniqueRecordFields
+     * @param metaGeneration
+     * @param uniqueMetaFields
+     */
+    public TupleGenerator(ARecordType recordType, ARecordType metaType, int[] keyIndexes, int[] keyIndicators,
+            GenerationFunction[] recordGeneration, boolean[] uniqueRecordFields, GenerationFunction[] metaGeneration,
+            boolean[] uniqueMetaFields) {
+        this.keyIndexes = keyIndexes;
+        this.keyIndicators = keyIndicators;
+        for (IAType field : recordType.getFieldTypes()) {
+            validate(field);
+        }
+        recordGenerator = new ARecordValueGenerator(recordGeneration, recordType, uniqueRecordFields, true);
+        if (metaType != null) {
+            for (IAType field : metaType.getFieldTypes()) {
+                validate(field);
+            }
+            metaGenerator = new ARecordValueGenerator(metaGeneration, metaType, uniqueMetaFields, true);
+        } else {
+            metaGenerator = null;
+        }
+        int numOfFields = keyIndexes.length + 1 + ((metaType != null) ? 1 : 0);
+        tuple = new TestTupleReference(numOfFields);
+        boolean atLeastOneKeyFieldIsNotRandomAndNotBoolean = false;
+        for (int i = 0; i < keyIndexes.length; i++) {
+            if (keyIndicators[i] < 0 || keyIndicators[i] > 1) {
+                throw new IllegalArgumentException("key field indicator must be either 0 or 1");
+            }
+            atLeastOneKeyFieldIsNotRandomAndNotBoolean = atLeastOneKeyFieldIsNotRandomAndNotBoolean
+                    || validateKey(keyIndexes[i], keyIndicators[i] == 0 ? recordType : metaType,
+                            keyIndicators[i] == 0 ? uniqueRecordFields[i] : uniqueMetaFields[i]);
+        }
+        if (!atLeastOneKeyFieldIsNotRandomAndNotBoolean) {
+            throw new IllegalArgumentException("at least one key field must be unique and not boolean");
+        }
+        if (keyIndexes.length != keyIndicators.length) {
+            throw new IllegalArgumentException("number of key indexes must equals number of key indicators");
+        }
+    }
+
+    private boolean validateKey(int i, ARecordType type, boolean unique) {
+        if (type.getFieldNames().length <= i) {
+            throw new IllegalArgumentException("key index must be less than number of fields");
+        }
+        return unique && type.getFieldTypes()[i].getTypeTag() != ATypeTag.BOOLEAN;
+    }
+
+    public ITupleReference next() throws IOException {
+        tuple.reset();
+        recordGenerator.next(tuple.getFields()[keyIndexes.length].getDataOutput());
+        if (metaGenerator != null) {
+            recordGenerator.next(tuple.getFields()[keyIndexes.length + 1].getDataOutput());
+        }
+        for (int i = 0; i < keyIndexes.length; i++) {
+            if (keyIndicators[i] == 0) {
+                recordGenerator.get(keyIndexes[i], tuple.getFields()[i].getDataOutput());
+            } else {
+                metaGenerator.get(keyIndexes[i], tuple.getFields()[i].getDataOutput());
+            }
+        }
+        return tuple;
+    }
+
+    private void validate(IAType field) {
+        switch (field.getTypeTag()) {
+            case BOOLEAN:
+            case DOUBLE:
+            case INT32:
+            case INT64:
+            case STRING:
+                break;
+            default:
+                throw new IllegalArgumentException("Generating data of type " + field + " is not supported");
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
index e267cc7..54bdb1b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
@@ -96,33 +96,4 @@
         }
         return tuple;
     }
-
-    private class TestTupleReference implements ITupleReference {
-        private final GrowableArray[] fields;
-
-        private TestTupleReference(GrowableArray[] fields) {
-            this.fields = fields;
-        }
-
-        @Override
-        public int getFieldCount() {
-            return fields.length;
-        }
-
-        @Override
-        public byte[] getFieldData(int fIdx) {
-
-            return fields[fIdx].getByteArray();
-        }
-
-        @Override
-        public int getFieldStart(int fIdx) {
-            return 0;
-        }
-
-        @Override
-        public int getFieldLength(int fIdx) {
-            return fields[fIdx].getLength();
-        }
-    }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleReference.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleReference.java
new file mode 100644
index 0000000..b676dfd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleReference.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.common;
+
+import org.apache.hyracks.data.std.util.GrowableArray;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class TestTupleReference implements ITupleReference {
+    private GrowableArray[] fields;
+    private int[] offsets;
+
+    public TestTupleReference(GrowableArray[] fields) {
+        this.fields = fields;
+        offsets = new int[fields.length];
+    }
+
+    public TestTupleReference(int numfields) {
+        this.fields = new GrowableArray[numfields];
+        for (int i = 0; i < numfields; i++) {
+            fields[i] = new GrowableArray();
+        }
+        offsets = new int[fields.length];
+    }
+
+    public GrowableArray[] getFields() {
+        return fields;
+    }
+
+    public void setFields(GrowableArray[] fields) {
+        this.fields = fields;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return fields.length;
+    }
+
+    @Override
+    public byte[] getFieldData(int fIdx) {
+        return fields[fIdx].getByteArray();
+    }
+
+    @Override
+    public int getFieldStart(int fIdx) {
+        return offsets[fIdx];
+    }
+
+    @Override
+    public int getFieldLength(int fIdx) {
+        return fields[fIdx].getLength();
+    }
+
+    public void reset() {
+        for (GrowableArray field : fields) {
+            field.reset();
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
index 2f712cc..a253ac0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
@@ -36,6 +36,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
 import org.apache.hyracks.api.test.TestFrameWriter;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -44,6 +45,7 @@
 import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.PartitionWithMessageDataWriter;
 import org.apache.hyracks.test.support.TestUtils;
@@ -70,7 +72,7 @@
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             VSizeFrame message = new VSizeFrame(ctx);
             VSizeFrame tempBuffer = new VSizeFrame(ctx);
-            ctx.setSharedObject(message);
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
             message.getBuffer().clear();
             message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
             message.getBuffer().flip();
@@ -144,8 +146,8 @@
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             VSizeFrame message = new VSizeFrame(ctx);
             VSizeFrame tempBuffer = new VSizeFrame(ctx);
-            ctx.setSharedObject(message);
-            writeRandomMessage(message, MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, DEFAULT_FRAME_SIZE + 1);
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+            writeRandomMessage(message, MessagingFrameTupleAppender.MARKER_MESSAGE, DEFAULT_FRAME_SIZE + 1);
             ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
                     Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
                     BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
@@ -165,7 +167,7 @@
                 fta.reset(writer.getLastFrame());
                 Assert.assertEquals(fta.getTupleCount(), 1);
                 FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
-                Assert.assertEquals(MessagingFrameTupleAppender.SNAPSHOT_MESSAGE,
+                Assert.assertEquals(MessagingFrameTupleAppender.MARKER_MESSAGE,
                         MessagingFrameTupleAppender.getMessageType(tempBuffer));
             }
             message.getBuffer().clear();
@@ -228,9 +230,9 @@
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             VSizeFrame message = new VSizeFrame(ctx);
             VSizeFrame tempBuffer = new VSizeFrame(ctx);
-            ctx.setSharedObject(message);
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
             message.getBuffer().clear();
-            writeRandomMessage(message, MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, DEFAULT_FRAME_SIZE);
+            writeRandomMessage(message, MessagingFrameTupleAppender.MARKER_MESSAGE, DEFAULT_FRAME_SIZE);
             ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
                     Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
                     BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
@@ -264,7 +266,7 @@
                 fta.reset(writer.getLastFrame());
                 Assert.assertEquals(fta.getTupleCount(), 1);
                 FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
-                Assert.assertEquals(MessagingFrameTupleAppender.SNAPSHOT_MESSAGE,
+                Assert.assertEquals(MessagingFrameTupleAppender.MARKER_MESSAGE,
                         MessagingFrameTupleAppender.getMessageType(tempBuffer));
             }
             partitioner.close();
@@ -286,7 +288,7 @@
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             VSizeFrame message = new VSizeFrame(ctx);
             VSizeFrame tempBuffer = new VSizeFrame(ctx);
-            ctx.setSharedObject(message);
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
             message.getBuffer().clear();
             message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
             message.getBuffer().flip();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
new file mode 100644
index 0000000..a0ef31e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.transaction.management.service.logging.LogReader;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.test.CountAnswer;
+import org.apache.hyracks.api.test.FrameWriterTestUtils;
+import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelper;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogMarkerTest {
+
+    private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+    private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+    private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+            new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+    private static final GenerationFunction[] RECORD_GEN_FUNCTION =
+            { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+    private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+    private static final ARecordType META_TYPE = null;
+    private static final GenerationFunction[] META_GEN_FUNCTION = null;
+    private static final boolean[] UNIQUE_META_FIELDS = null;
+    private static final int[] KEY_INDEXES = { 0 };
+    private static final int[] KEY_INDICATORS = { 0 };
+    private static final int NUM_OF_RECORDS = 100000;
+    private static final int SNAPSHOT_SIZE = 1000;
+    private static final int DATASET_ID = 101;
+    private static final String SPILL_AREA = "target" + File.separator + "spill_area";
+    private static final String DATAVERSE_NAME = "TestDV";
+    private static final String DATASET_NAME = "TestDS";
+    private static final String DATA_TYPE_NAME = "DUMMY";
+    private static final String NODE_GROUP_NAME = "DEFAULT";
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+        System.out.println("SetUp: ");
+        File f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "txnLogDir");
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+        f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "IODevice");
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+        f = new File(System.getProperty("user.dir") + File.separator + SPILL_AREA);
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        System.out.println("TearDown");
+        File f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "txnLogDir");
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+        f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "IODevice");
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+        f = new File(System.getProperty("user.dir") + File.separator + SPILL_AREA);
+        FileUtils.deleteQuietly(f);
+        System.out.println("Dir " + f.getName() + " deleted");
+    }
+
+    @Test
+    public void testInsertWithSnapshot() {
+        try {
+            TestNodeController nc = new TestNodeController();
+            nc.init();
+            Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
+                    NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                            Collections.emptyList(), null, null, null, false, null, false),
+                    null, DatasetType.INTERNAL, DATASET_ID, 0);
+            try {
+                nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,
+                        null);
+                IHyracksTaskContext ctx = nc.createTestContext();
+                nc.newJobId();
+                ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+                AsterixLSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+                        RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null);
+                insertOp.open();
+                TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                        RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+                VSizeFrame frame = new VSizeFrame(ctx);
+                VSizeFrame marker = new VSizeFrame(ctx);
+                FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+                long markerId = 0L;
+                for (int j = 0; j < NUM_OF_RECORDS; j++) {
+                    if (j % SNAPSHOT_SIZE == 0) {
+                        marker.reset();
+                        marker.getBuffer().put(MessagingFrameTupleAppender.MARKER_MESSAGE);
+                        marker.getBuffer().putLong(markerId);
+                        marker.getBuffer().flip();
+                        markerId++;
+                        TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, marker, ctx);
+                        tupleAppender.flush(insertOp);
+                    }
+                    ITupleReference tuple = tupleGenerator.next();
+                    DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+                }
+                if (tupleAppender.getTupleCount() > 0) {
+                    tupleAppender.write(insertOp, true);
+                }
+                insertOp.close();
+                nc.getTransactionManager().completedTransaction(txnCtx, new DatasetId(-1), -1, true);
+                LSMBTreeDataflowHelper dataflowHelper = nc.getPrimaryIndexDataflowHelper(dataset, KEY_TYPES,
+                        RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null);
+                dataflowHelper.open();
+                LSMBTree btree = (LSMBTree) dataflowHelper.getIndexInstance();
+                long lsn = btree.getMostRecentMarkerLSN();
+                int numOfMarkers = 0;
+                LogReader logReader = (LogReader) nc.getTransactionSubsystem().getLogManager().getLogReader(false);
+                long expectedMarkerId = markerId - 1;
+                while (lsn >= 0) {
+                    numOfMarkers++;
+                    ILogRecord logRecord = logReader.read(lsn);
+                    lsn = logRecord.getPreviousMarkerLSN();
+                    long logMarkerId = logRecord.getMarker().getLong();
+                    Assert.assertEquals(expectedMarkerId, logMarkerId);
+                    expectedMarkerId--;
+                }
+                logReader.close();
+                dataflowHelper.close();
+                Assert.assertEquals(markerId, numOfMarkers);
+                nc.newJobId();
+                TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
+                        Collections.emptyList(), Collections.emptyList(), false);
+                IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE,
+                        META_TYPE, new NoMergePolicyFactory(), null, null);
+                emptyTupleOp.open();
+                emptyTupleOp.close();
+                Assert.assertEquals(NUM_OF_RECORDS, countOp.getCount());
+            } finally {
+                nc.deInit();
+            }
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+
+    }
+
+    public static TestTupleCounterFrameWriter create(RecordDescriptor recordDescriptor,
+            Collection<FrameWriterOperation> exceptionThrowingOperations,
+            Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {
+        CountAnswer openAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Open,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer nextAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.NextFrame,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer flushAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Flush,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer failAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Fail,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer closeAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Close,
+                exceptionThrowingOperations, errorThrowingOperations);
+        return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer,
+                closeAnswer, deepCopyInputFrames);
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
deleted file mode 100644
index 536bf3a..0000000
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.test.dataflow;
-
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class TestRecordDescriptorFactory {
-    public RecordDescriptor createRecordDescriptor(ISerializerDeserializer<?>... serdes) {
-        return null;
-    }
-}
diff --git a/asterixdb/asterix-common/pom.xml b/asterixdb/asterix-common/pom.xml
index fc1c221..f6c0c99 100644
--- a/asterixdb/asterix-common/pom.xml
+++ b/asterixdb/asterix-common/pom.xml
@@ -252,6 +252,11 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>1.2.17</version>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index b3eb281..71c30d5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -79,7 +79,7 @@
     @Override
     public synchronized void completeOperation(ILSMIndex index, LSMOperationType opType,
             ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
             decrementNumActiveOperations(modificationCallback);
             if (numActiveOperations.get() == 0) {
@@ -148,12 +148,12 @@
         for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
 
             //get resource
-            ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
+            ILSMIndexAccessor accessor =
+                    lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
 
             //update resource lsn
-            AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
-                    .getIOOperationCallback();
+            AbstractLSMIOOperationCallback ioOpCallback =
+                    (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
             ioOpCallback.updateLastLSN(logRecord.getLSN());
 
             //schedule flush after update
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 9a76b40..cf66d30 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -22,6 +22,8 @@
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.exceptions.FrameDataException;
+import org.apache.asterix.common.transactions.ILogMarkerCallback;
+import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -31,6 +33,7 @@
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -41,7 +44,9 @@
 
 public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
+    public static final String KEY_INDEX = "Index";
     private final boolean isPrimary;
+    // This class has both lsmIndex and index (in super class) pointing to the same object
     private AbstractLSMIndex lsmIndex;
     private int i = 0;
 
@@ -59,10 +64,6 @@
     private int currentTupleIdx;
     private int lastFlushedTupleIdx;
 
-    public boolean isPrimary() {
-        return isPrimary;
-    }
-
     public AsterixLSMInsertDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op,
             boolean isPrimary) {
@@ -79,6 +80,10 @@
         indexHelper.open();
         lsmIndex = (AbstractLSMIndex) indexHelper.getIndexInstance();
         try {
+            if (isPrimary && ctx.getSharedObject() != null) {
+                PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback(lsmIndex);
+                TaskUtils.putInSharedMap(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
+            }
             writer.open();
             modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
                     indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(),
@@ -185,4 +190,8 @@
             writer.fail();
         }
     }
+
+    public boolean isPrimary() {
+        return isPrimary;
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogMarkerCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogMarkerCallback.java
new file mode 100644
index 0000000..11d649b
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogMarkerCallback.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This interface provide callback mechanism for adding marker logs to the transaction log file
+ */
+public interface ILogMarkerCallback {
+
+    String KEY_MARKER_CALLBACK = "MARKER_CALLBACK";
+
+    /**
+     * Called before writing the marker log allowing addition of specific information to the log record
+     *
+     * @param buffer:
+     *            the log buffer to write to
+     */
+    void before(ByteBuffer buffer);
+
+    /**
+     * Called after the log's been appended to the log tail passing the position of the log used for random access
+     *
+     * @param lsn
+     */
+    void after(long lsn);
+
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index cd05ba0..29af931 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -32,11 +32,38 @@
         LARGE_RECORD
     }
 
-    public static final int JOB_TERMINATE_LOG_SIZE = 14; //JOB_COMMIT or ABORT log type
-    public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 30;
-    public static final int UPDATE_LOG_BASE_SIZE = 51;
-    public static final int FLUSH_LOG_SIZE = 18;
-    public static final int WAIT_LOG_SIZE = 14;
+    public static final int CHKSUM_LEN = Long.BYTES;
+    public static final int FLDCNT_LEN = Integer.BYTES;
+    public static final int DS_LEN = Integer.BYTES;
+    public static final int LOG_SOURCE_LEN = Byte.BYTES;
+    public static final int LOGRCD_SZ_LEN = Integer.BYTES;
+    public static final int NEWOP_LEN = Byte.BYTES;
+    public static final int NEWVALSZ_LEN = Integer.BYTES;
+    public static final int PKHASH_LEN = Integer.BYTES;
+    public static final int PKSZ_LEN = Integer.BYTES;
+    public static final int PRVLSN_LEN = Long.BYTES;
+    public static final int RS_PARTITION_LEN = Integer.BYTES;
+    public static final int RSID_LEN = Long.BYTES;
+    public static final int SEQ_NUM_LEN = Long.BYTES;
+    public static final int TYPE_LEN = Byte.BYTES;
+    public static final int UUID_LEN = Long.BYTES;
+    public static final int VBUCKET_ID_LEN = Short.BYTES;
+
+    public static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
+    public static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN;
+    public static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
+    public static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
+    // What are these fields? vvvvv
+    public static final int REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN = Long.BYTES + Integer.BYTES + Integer.BYTES;
+
+    // How are the following computed?
+    public static final int JOB_TERMINATE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
+    public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 30; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +?
+    public static final int UPDATE_LOG_BASE_SIZE = 51; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +?
+    public static final int FLUSH_LOG_SIZE = 18; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +?
+    public static final int WAIT_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
+    public static final int MARKER_BASE_LOG_SIZE =
+            ALL_RECORD_HEADER_LEN + CHKSUM_LEN + DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN;
 
     public RecordReadStatus readLogRecord(ByteBuffer buffer);
 
@@ -135,7 +162,15 @@
 
     public ITupleReference getOldValue();
 
-    public void setOldValue(ITupleReference oldValue);
+    public void setOldValue(ITupleReference tupleBefore);
 
-    public void setOldValueSize(int oldValueSize);
+    public void setOldValueSize(int beforeSize);
+
+    public boolean isMarker();
+
+    public ByteBuffer getMarker();
+
+    public void logAppended(long lsn);
+
+    public long getPreviousMarkerLSN();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 23fdd0f..306b888 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -71,24 +71,6 @@
 
 public class LogRecord implements ILogRecord {
 
-    private static final int LOG_SOURCE_LEN = Byte.BYTES;
-    private static final int TYPE_LEN = Byte.BYTES;
-    public static final int PKHASH_LEN = Integer.BYTES;
-    public static final int PKSZ_LEN = Integer.BYTES;
-    private static final int RS_PARTITION_LEN = Integer.BYTES;
-    private static final int RSID_LEN = Long.BYTES;
-    private static final int LOGRCD_SZ_LEN = Integer.BYTES;
-    private static final int FLDCNT_LEN = Integer.BYTES;
-    private static final int NEWOP_LEN = Byte.BYTES;
-    private static final int NEWVALSZ_LEN = Integer.BYTES;
-    private static final int CHKSUM_LEN = Long.BYTES;
-
-    private static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
-    private static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN
-            + PKSZ_LEN;
-    private static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
-    private static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
-
     // ------------- fields in a log record (begin) ------------//
     private byte logSource;
     private byte logType;
@@ -108,8 +90,10 @@
     private ITupleReference oldValue;
     private int oldValueFieldCount;
     private long checksum;
+    private long prevMarkerLSN;
+    private ByteBuffer marker;
     // ------------- fields in a log record (end) --------------//
-
+    private final ILogMarkerCallback callback; // A callback for log mark operations
     private int PKFieldCnt;
     private ITransactionContext txnCtx;
     private long LSN;
@@ -129,7 +113,8 @@
     private String nodeId;
     private boolean replicated = false;
 
-    public LogRecord() {
+    public LogRecord(ILogMarkerCallback callback) {
+        this.callback = callback;
         isFlushed = new AtomicBoolean(false);
         readPKValue = new PrimaryKeyTupleReference();
         readNewValue = SimpleTupleWriter.INSTANCE.createTupleReference();
@@ -138,49 +123,70 @@
         logSource = LogSource.LOCAL;
     }
 
-    private void writeLogRecordCommonFields(ByteBuffer buffer) {
+    public LogRecord() {
+        this(null);
+    }
+
+    private void doWriteLogRecord(ByteBuffer buffer) {
         buffer.put(logSource);
         buffer.put(logType);
         buffer.putInt(jobId);
-        if (logType == LogType.UPDATE || logType == LogType.ENTITY_COMMIT || logType == LogType.UPSERT_ENTITY_COMMIT) {
-            buffer.putInt(resourcePartition);
-            buffer.putInt(datasetId);
-            buffer.putInt(PKHashValue);
-            if (PKValueSize <= 0) {
-                throw new IllegalStateException("Primary Key Size is less than or equal to 0");
-            }
-            buffer.putInt(PKValueSize);
-            writePKValue(buffer);
+        switch (logType) {
+            case LogType.ENTITY_COMMIT:
+            case LogType.UPSERT_ENTITY_COMMIT:
+                writeEntityInfo(buffer);
+                break;
+            case LogType.UPDATE:
+                writeEntityInfo(buffer);
+                buffer.putLong(resourceId);
+                buffer.putInt(logSize);
+                buffer.putInt(newValueFieldCount);
+                buffer.put(newOp);
+                buffer.putInt(newValueSize);
+                writeTuple(buffer, newValue, newValueSize);
+                if (oldValueSize > 0) {
+                    buffer.putInt(oldValueSize);
+                    buffer.putInt(oldValueFieldCount);
+                    writeTuple(buffer, oldValue, oldValueSize);
+                }
+                break;
+            case LogType.FLUSH:
+                buffer.putInt(datasetId);
+                break;
+            case LogType.MARKER:
+                buffer.putInt(datasetId);
+                buffer.putInt(resourcePartition);
+                callback.before(buffer);
+                buffer.putInt(logSize);
+                buffer.put(marker);
+                break;
+            default:
+                // Do nothing
         }
-        if (logType == LogType.UPDATE) {
-            buffer.putLong(resourceId);
-            buffer.putInt(logSize);
-            buffer.putInt(newValueFieldCount);
-            buffer.put(newOp);
-            buffer.putInt(newValueSize);
-            writeTuple(buffer, newValue, newValueSize);
-            if (oldValueSize > 0) {
-                buffer.putInt(oldValueSize);
-                buffer.putInt(oldValueFieldCount);
-                writeTuple(buffer, oldValue, oldValueSize);
-            }
+    }
+
+    private void writeEntityInfo(ByteBuffer buffer) {
+        buffer.putInt(resourcePartition);
+        buffer.putInt(datasetId);
+        buffer.putInt(PKHashValue);
+        if (PKValueSize <= 0) {
+            throw new IllegalStateException("Primary Key Size is less than or equal to 0");
         }
-        if (logType == LogType.FLUSH) {
-            buffer.putInt(datasetId);
-        }
+        buffer.putInt(PKValueSize);
+        writePKValue(buffer);
     }
 
     @Override
     public void writeLogRecord(ByteBuffer buffer) {
         int beginOffset = buffer.position();
-        writeLogRecordCommonFields(buffer);
+        doWriteLogRecord(buffer);
         checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
         buffer.putLong(checksum);
     }
 
     @Override
     public void writeRemoteLogRecord(ByteBuffer buffer) {
-        writeLogRecordCommonFields(buffer);
+        doWriteLogRecord(buffer);
         if (logType == LogType.FLUSH) {
             buffer.putLong(LSN);
             buffer.putInt(numOfFlushedIndexes);
@@ -222,7 +228,7 @@
         int beginOffset = buffer.position();
 
         //read common fields
-        RecordReadStatus status = readLogCommonFields(buffer);
+        RecordReadStatus status = doReadLogRecord(buffer);
         if (status != RecordReadStatus.OK) {
             buffer.position(beginOffset);
             return status;
@@ -241,7 +247,7 @@
         return RecordReadStatus.OK;
     }
 
-    private RecordReadStatus readLogCommonFields(ByteBuffer buffer) {
+    private RecordReadStatus doReadLogRecord(ByteBuffer buffer) {
         //first we need the logtype and Job ID, if the buffer isn't that big, then no dice.
         if (buffer.remaining() < ALL_RECORD_HEADER_LEN) {
             return RecordReadStatus.TRUNCATED;
@@ -255,64 +261,88 @@
                     return RecordReadStatus.TRUNCATED;
                 }
                 datasetId = buffer.getInt();
-                resourceId = 0L;
+                resourceId = 0l;
+                // fall throuh
+            case LogType.WAIT:
+                computeAndSetLogSize();
                 break;
-            case LogType.ABORT:
             case LogType.JOB_COMMIT:
+            case LogType.ABORT:
                 datasetId = -1;
                 PKHashValue = -1;
+                computeAndSetLogSize();
                 break;
             case LogType.ENTITY_COMMIT:
             case LogType.UPSERT_ENTITY_COMMIT:
-                if (!readEntityInfo(buffer)) {
+                if (readEntityInfo(buffer)) {
+                    computeAndSetLogSize();
+                } else {
                     return RecordReadStatus.TRUNCATED;
                 }
                 break;
             case LogType.UPDATE:
-                if (!readEntityInfo(buffer)) {
+                if (readEntityInfo(buffer)) {
+                    if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) {
+                        return RecordReadStatus.TRUNCATED;
+                    }
+                    resourceId = buffer.getLong();
+                    logSize = buffer.getInt();
+                    newValueFieldCount = buffer.getInt();
+                    newOp = buffer.get();
+                    newValueSize = buffer.getInt();
+                    if (buffer.remaining() < newValueSize) {
+                        if (logSize > buffer.capacity()) {
+                            return RecordReadStatus.LARGE_RECORD;
+                        }
+                        return RecordReadStatus.TRUNCATED;
+                    }
+                    newValue = readTuple(buffer, readNewValue, newValueFieldCount, newValueSize);
+                    if (logSize > getUpdateLogSizeWithoutOldValue()) {
+                        // Prev Image exists
+                        if (buffer.remaining() < Integer.BYTES) {
+                            return RecordReadStatus.TRUNCATED;
+                        }
+                        oldValueSize = buffer.getInt();
+                        if (buffer.remaining() < Integer.BYTES) {
+                            return RecordReadStatus.TRUNCATED;
+                        }
+                        oldValueFieldCount = buffer.getInt();
+                        if (buffer.remaining() < oldValueSize) {
+                            return RecordReadStatus.TRUNCATED;
+                        }
+                        oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize);
+                    } else {
+                        oldValueSize = 0;
+                    }
+                } else {
                     return RecordReadStatus.TRUNCATED;
                 }
-                if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) {
+                break;
+            case LogType.MARKER:
+                if (buffer.remaining() < DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN) {
                     return RecordReadStatus.TRUNCATED;
                 }
-                resourceId = buffer.getLong();
+                datasetId = buffer.getInt();
+                resourcePartition = buffer.getInt();
+                prevMarkerLSN = buffer.getLong();
                 logSize = buffer.getInt();
-                newValueFieldCount = buffer.getInt();
-                newOp = buffer.get();
-                newValueSize = buffer.getInt();
-                return readEntity(buffer);
+                int lenRemaining = logSize - MARKER_BASE_LOG_SIZE;
+                if (buffer.remaining() < lenRemaining) {
+                    return RecordReadStatus.TRUNCATED;
+                }
+
+                if (marker == null || marker.capacity() < lenRemaining) {
+                    // TODO(amoudi): account for memory allocation
+                    marker = ByteBuffer.allocate(lenRemaining + Short.BYTES);
+                }
+                marker.clear();
+                buffer.get(marker.array(), 0, lenRemaining);
+                marker.position(lenRemaining);
+                marker.flip();
+                break;
             default:
                 break;
         }
-        computeAndSetLogSize();
-        return RecordReadStatus.OK;
-    }
-
-    private RecordReadStatus readEntity(ByteBuffer buffer) {
-        if (buffer.remaining() < newValueSize) {
-            if (logSize > buffer.capacity()) {
-                return RecordReadStatus.LARGE_RECORD;
-            }
-            return RecordReadStatus.TRUNCATED;
-        }
-        newValue = readTuple(buffer, readNewValue, newValueFieldCount, newValueSize);
-        if (logSize > getUpdateLogSizeWithoutOldValue()) {
-            // Prev Image exists
-            if (buffer.remaining() < Integer.BYTES) {
-                return RecordReadStatus.TRUNCATED;
-            }
-            oldValueSize = buffer.getInt();
-            if (buffer.remaining() < Integer.BYTES) {
-                return RecordReadStatus.TRUNCATED;
-            }
-            oldValueFieldCount = buffer.getInt();
-            if (buffer.remaining() < oldValueSize) {
-                return RecordReadStatus.TRUNCATED;
-            }
-            oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize);
-        } else {
-            oldValueSize = 0;
-        }
         return RecordReadStatus.OK;
     }
 
@@ -339,7 +369,7 @@
     @Override
     public void readRemoteLog(ByteBuffer buffer) {
         //read common fields
-        readLogCommonFields(buffer);
+        doReadLogRecord(buffer);
 
         if (logType == LogType.FLUSH) {
             LSN = buffer.getLong();
@@ -412,11 +442,18 @@
             case LogType.WAIT:
                 logSize = WAIT_LOG_SIZE;
                 break;
+            case LogType.MARKER:
+                setMarkerLogSize();
+                break;
             default:
                 throw new IllegalStateException("Unsupported Log Type");
         }
     }
 
+    private void setMarkerLogSize() {
+        logSize = MARKER_BASE_LOG_SIZE + marker.remaining();
+    }
+
     @Override
     public String getLogRecordForDisplay() {
         StringBuilder builder = new StringBuilder();
@@ -688,4 +725,28 @@
     public void setOldValueSize(int oldValueSize) {
         this.oldValueSize = oldValueSize;
     }
+
+    public void setMarker(ByteBuffer marker) {
+        this.marker = marker;
+    }
+
+    @Override
+    public boolean isMarker() {
+        return logType == LogType.MARKER;
+    }
+
+    @Override
+    public void logAppended(long lsn) {
+        callback.after(lsn);
+    }
+
+    @Override
+    public long getPreviousMarkerLSN() {
+        return prevMarkerLSN;
+    }
+
+    @Override
+    public ByteBuffer getMarker() {
+        return marker;
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
index 714b8f7..269e4b9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
@@ -27,6 +27,7 @@
     public static final byte FLUSH = 4;
     public static final byte UPSERT_ENTITY_COMMIT = 5;
     public static final byte WAIT = 6;
+    public static final byte MARKER = 7;
 
     private static final String STRING_UPDATE = "UPDATE";
     private static final String STRING_JOB_COMMIT = "JOB_COMMIT";
@@ -35,8 +36,8 @@
     private static final String STRING_FLUSH = "FLUSH";
     private static final String STRING_UPSERT_ENTITY_COMMIT = "UPSERT_ENTITY_COMMIT";
     private static final String STRING_WAIT = "WAIT";
-
-    private static final String STRING_INVALID_LOG_TYPE = "INVALID_LOG_TYPE";
+    private static final String STRING_MARKER = "MARKER";
+    private static final String STRING_UNKNOWN_LOG_TYPE = "UNKNOWN_LOG_TYPE";
 
     public static String toString(byte logType) {
         switch (logType) {
@@ -54,8 +55,10 @@
                 return STRING_UPSERT_ENTITY_COMMIT;
             case LogType.WAIT:
                 return STRING_WAIT;
+            case LogType.MARKER:
+                return STRING_MARKER;
             default:
-                return STRING_INVALID_LOG_TYPE;
+                return STRING_UNKNOWN_LOG_TYPE;
         }
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
new file mode 100644
index 0000000..7dae65f
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+
+/**
+ * A basic callback used to write marker to transaction logs
+ */
+public class PrimaryIndexLogMarkerCallback implements ILogMarkerCallback {
+
+    private AbstractLSMIndex index;
+
+    /**
+     * @param index:
+     *            a pointer to the primary index used to store marker log info
+     * @throws HyracksDataException
+     */
+    public PrimaryIndexLogMarkerCallback(AbstractLSMIndex index) throws HyracksDataException {
+        this.index = index;
+    }
+
+    @Override
+    public void before(ByteBuffer buffer) {
+        buffer.putLong(index.getCurrentMemoryComponent().getMostRecentMarkerLSN());
+    }
+
+    @Override
+    public void after(long lsn) {
+        index.getCurrentMemoryComponent().setMostRecentMarkerLSN(lsn);
+    }
+}
diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixConstants.java
similarity index 79%
copy from asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
copy to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixConstants.java
index 11a2510..4bca216 100644
--- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixConstants.java
@@ -16,10 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.event.util;
+package org.apache.asterix.common.utils;
 
+/**
+ * A static class that stores asterix constants
+ */
 public class AsterixConstants {
+    public static final String ASTERIX_ROOT_METADATA_DIR = "asterix_root_metadata";
 
-    public static String ASTERIX_ROOT_METADATA_DIR = "asterix_root_metadata";
+    private AsterixConstants() {
+    }
 
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/FrameStack.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/FrameStack.java
new file mode 100644
index 0000000..8c83687
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/FrameStack.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.utils;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Not thread safe stack that is used to store fixed size buffers in memory
+ * Once memory is consumed, it uses disk to store buffers
+ */
+public class FrameStack implements Closeable {
+    private static final AtomicInteger stackIdGenerator = new AtomicInteger(0);
+    private static final String STACK_FILE_NAME = "stack";
+    private final int stackId;
+    private final int frameSize;
+    private final int numOfMemoryFrames;
+    private final ArrayDeque<ByteBuffer> fullBuffers;
+    private final ArrayDeque<ByteBuffer> emptyBuffers;
+    private int totalWriteCount = 0;
+    private int totalReadCount = 0;
+    private final File file;
+    private final RandomAccessFile iostream;
+    private final byte[] frame;
+
+    /**
+     * Create a hybrid of memory and disk stack of byte buffers
+     *
+     * @param dir
+     * @param frameSize
+     * @param numOfMemoryFrames
+     * @throws HyracksDataException
+     * @throws FileNotFoundException
+     */
+    public FrameStack(String dir, int frameSize, int numOfMemoryFrames)
+            throws HyracksDataException, FileNotFoundException {
+        this.stackId = stackIdGenerator.getAndIncrement();
+        this.frameSize = frameSize;
+        this.numOfMemoryFrames = numOfMemoryFrames;
+        this.fullBuffers = numOfMemoryFrames <= 0 ? null : new ArrayDeque<>();
+        this.emptyBuffers = numOfMemoryFrames <= 0 ? null : new ArrayDeque<>();
+        this.file = StoragePathUtil.createFile(
+                ((dir == null) ? "" : (dir.endsWith(File.separator) ? dir : (dir + File.separator))) + STACK_FILE_NAME,
+                stackId);
+        this.iostream = new RandomAccessFile(file, "rw");
+        this.frame = new byte[frameSize];
+    }
+
+    /**
+     * @return the number of remaining frames to be read in the stack
+     */
+    public int remaining() {
+        return totalWriteCount - totalReadCount;
+    }
+
+    /**
+     * copy content of buffer into the stack
+     *
+     * @param buffer
+     * @throws IOException
+     */
+    public synchronized void push(ByteBuffer buffer) throws IOException {
+        int diff = totalWriteCount - totalReadCount;
+        if (diff < numOfMemoryFrames) {
+            ByteBuffer aBuffer = allocate();
+            aBuffer.put(buffer.array());
+            aBuffer.flip();
+            fullBuffers.push(aBuffer);
+        } else {
+            long position = (long) (diff - numOfMemoryFrames) * frameSize;
+            if (position != iostream.getFilePointer()) {
+                iostream.seek(position);
+            }
+            iostream.write(buffer.array());
+        }
+        totalWriteCount++;
+    }
+
+    private ByteBuffer allocate() {
+        ByteBuffer aBuffer = emptyBuffers.poll();
+        if (aBuffer == null) {
+            aBuffer = ByteBuffer.allocate(frameSize);
+        }
+        aBuffer.clear();
+        return aBuffer;
+    }
+
+    /**
+     * Free a frame off of the stack and copy it into dest
+     *
+     * @param dest
+     * @throws IOException
+     */
+    public synchronized void pop(ByteBuffer dest) throws IOException {
+        dest.clear();
+        int diff = totalWriteCount - totalReadCount - 1;
+        if (diff >= 0) {
+            if (diff < numOfMemoryFrames) {
+                totalReadCount++;
+                ByteBuffer aBuffer = fullBuffers.pop();
+                emptyBuffers.push(aBuffer);
+                dest.put(aBuffer.array());
+            } else {
+                long position = (long) (diff - numOfMemoryFrames) * frameSize;
+                iostream.seek(position);
+                iostream.readFully(frame);
+                dest.put(frame);
+            }
+        }
+        dest.flip();
+    }
+
+    /**
+     * Closing this stack will result in the data being deleted
+     *
+     * @throws IOException
+     */
+    @Override
+    public void close() throws IOException {
+        iostream.close();
+        Files.delete(file.toPath());
+    }
+
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 78b06fb..615e8af 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -24,12 +24,16 @@
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 
 public class StoragePathUtil {
+    private static final Logger LOGGER = Logger.getLogger(StoragePathUtil.class.getName());
     public static final String PARTITION_DIR_PREFIX = "partition_";
     public static final String TEMP_DATASETS_STORAGE_FOLDER = "temp";
     public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_";
@@ -70,4 +74,39 @@
     public static int getPartitionNumFromName(String name) {
         return Integer.parseInt(name.substring(PARTITION_DIR_PREFIX.length()));
     }
+
+    /**
+     * Create a file
+     * Note: this method is not thread safe. It is the responsibility of the caller to ensure no path conflict when
+     * creating files simultaneously
+     *
+     * @param name
+     * @param count
+     * @return
+     * @throws HyracksDataException
+     */
+    public static File createFile(String name, int count) throws HyracksDataException {
+        try {
+            String fileName = name + "_" + count;
+            File file = new File(fileName);
+            if (file.getParentFile() != null) {
+                file.getParentFile().mkdirs();
+            }
+            if (!file.exists()) {
+                boolean success = file.createNewFile();
+                if (!success) {
+                    throw new HyracksDataException("Unable to create spill file " + fileName);
+                } else {
+                    if (LOGGER.isEnabledFor(Level.INFO)) {
+                        LOGGER.info("Created spill file " + file.getAbsolutePath());
+                    }
+                }
+            } else {
+                throw new HyracksDataException("spill file " + fileName + " already exists");
+            }
+            return file;
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
index 1d5b15e..2878d5a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
@@ -18,9 +18,12 @@
  */
 package org.apache.asterix.common.utils;
 
+import java.nio.ByteBuffer;
+
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
@@ -64,4 +67,17 @@
         logRecord.computeAndSetPKValueSize();
         logRecord.computeAndSetLogSize();
     }
+
+    public static void formMarkerLogRecord(LogRecord logRecord, ITransactionContext txnCtx, int datasetId,
+            int resourcePartition, ByteBuffer marker) {
+        logRecord.setTxnCtx(txnCtx);
+        logRecord.setLogSource(LogSource.LOCAL);
+        logRecord.setLogType(LogType.MARKER);
+        logRecord.setJobId(txnCtx.getJobId().getId());
+        logRecord.setDatasetId(datasetId);
+        logRecord.setResourcePartition(resourcePartition);
+        marker.get(); // read the first byte since it is not part of the marker object
+        logRecord.setMarker(marker);
+        logRecord.computeAndSetLogSize();
+    }
 }
diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
index b19a722..1780a51 100644
--- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
+++ b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
@@ -27,6 +27,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.asterix.common.utils.AsterixConstants;
 import org.apache.asterix.event.driver.EventDriver;
 import org.apache.asterix.event.error.VerificationUtil;
 import org.apache.asterix.event.model.AsterixInstance;
@@ -72,8 +73,8 @@
 
         for (Node node : cluster.getNode()) {
             if (copyHyracksToNC) {
-                Pattern copyHyracksForNC = createCopyHyracksPattern(asterixInstanceName, cluster, node.getClusterIp(),
-                        destDir);
+                Pattern copyHyracksForNC =
+                        createCopyHyracksPattern(asterixInstanceName, cluster, node.getClusterIp(), destDir);
                 ps.add(copyHyracksForNC);
             }
         }
@@ -389,8 +390,8 @@
         Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
         String username = cluster.getUsername() != null ? cluster.getUsername() : System.getProperty("user.name");
         String workingDir = cluster.getWorkingDir().getDir();
-        String destDir = workingDir + File.separator + "library" + File.separator + dataverse + File.separator
-                + libraryName;
+        String destDir =
+                workingDir + File.separator + "library" + File.separator + dataverse + File.separator + libraryName;
         String fileToTransfer = new File(libraryPath).getAbsolutePath();
 
         Iterator<Node> installTargets = cluster.getNode().iterator();
@@ -434,8 +435,8 @@
         patternList.add(p);
 
         Iterator<Node> uninstallTargets = cluster.getNode().iterator();
-        String libDir = workingDir + File.separator + "library" + File.separator + dataverse + File.separator
-                + libraryName;
+        String libDir =
+                workingDir + File.separator + "library" + File.separator + dataverse + File.separator + libraryName;
         Node uninstallNode = uninstallTargets.next();
         nodeid = new Nodeid(new Value(null, uninstallNode.getId()));
         event = new Event("file_delete", nodeid, libDir);
@@ -606,8 +607,8 @@
         String username = cluster.getUsername() == null ? System.getProperty("user.name") : cluster.getUsername();
         String srcHost = cluster.getMasterNode().getClientIp();
         Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
-        String srcDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir()
-                : cluster.getMasterNode().getLogDir();
+        String srcDir =
+                cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir() : cluster.getMasterNode().getLogDir();
         String destDir = outputDir + File.separator + "cc";
         String pargs = username + " " + srcHost + " " + srcDir + " " + destDir;
         Event event = new Event("directory_copy", nodeid, pargs);
diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedMarker.java
similarity index 66%
copy from asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
copy to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedMarker.java
index 11a2510..487b47d 100644
--- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedMarker.java
@@ -16,10 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.event.util;
+package org.apache.asterix.external.api;
 
-public class AsterixConstants {
+import org.apache.hyracks.api.comm.VSizeFrame;
 
-    public static String ASTERIX_ROOT_METADATA_DIR = "asterix_root_metadata";
+public interface IFeedMarker {
+
+    /**
+     * Mark the frame with a mark denoting the progress of the feed
+     * The mark will be eventually written to the transaction log
+     *
+     * @param mark
+     *            a frame to write the progress mark in
+     * @return
+     */
+    public boolean mark(VSizeFrame mark);
 
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
index 0f5ada4..9d9ff28 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 
+@FunctionalInterface
 public interface IRecordConverter<I, O> {
 
     public O convert(IRawRecord<? extends I> input) throws IOException;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index 9cce1c9..08ffe18 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -73,4 +73,8 @@
      * gives the record reader a chance to recover from IO errors during feed intake
      */
     public boolean handleException(Throwable th);
+
+    public default IFeedMarker getProgressReporter() {
+        return null;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
index a301ac9..7806489 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -34,9 +34,9 @@
 
     public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
             final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader)
+            final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader, boolean sendMarker)
             throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
         this.dataParser = dataParser;
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index b47d278..7d65c52 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -32,9 +32,9 @@
 
     public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
             final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader)
-                    throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+            final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader, boolean sendMarker)
+            throws HyracksDataException {
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 87daffa..be9056b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -19,10 +19,15 @@
 package org.apache.asterix.external.dataflow;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nonnull;
 
+import org.apache.asterix.external.api.IFeedMarker;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordReader;
@@ -30,9 +35,13 @@
 import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.log4j.Logger;
 
 public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
@@ -40,38 +49,52 @@
     protected final IRecordDataParser<T> dataParser;
     protected final IRecordReader<? extends T> recordReader;
     protected final AtomicBoolean closed = new AtomicBoolean(false);
-    protected final long interval = 1000;
+    protected static final long INTERVAL = 1000;
+    protected final Object mutex = new Object();
+    protected final boolean sendMarker;
     protected boolean failed = false;
 
     public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
             @Nonnull FeedLogManager feedLogManager, int numOfOutputFields, @Nonnull IRecordDataParser<T> dataParser,
-            @Nonnull IRecordReader<T> recordReader) throws HyracksDataException {
+            @Nonnull IRecordReader<T> recordReader, boolean sendMarker) throws HyracksDataException {
         super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
         this.dataParser = dataParser;
         this.recordReader = recordReader;
+        this.sendMarker = sendMarker;
         recordReader.setFeedLogManager(feedLogManager);
         recordReader.setController(this);
     }
 
     @Override
     public void start(IFrameWriter writer) throws HyracksDataException {
+        ExecutorService executorService = sendMarker ? Executors.newSingleThreadExecutor() : null;
+        Future<?> result = null;
+        if (sendMarker) {
+            DataflowMarker dataflowMarker = new DataflowMarker(recordReader.getProgressReporter(),
+                    TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx));
+            result = executorService.submit(dataflowMarker);
+        }
         HyracksDataException hde = null;
         try {
             failed = false;
             tupleForwarder.initialize(ctx, writer);
             while (recordReader.hasNext()) {
-                IRawRecord<? extends T> record = recordReader.next();
-                if (record == null) {
-                    flush();
-                    Thread.sleep(interval);
-                    continue;
+                // synchronized on mutex before we call next() so we don't a marker before its record
+                synchronized (mutex) {
+                    IRawRecord<? extends T> record = recordReader.next();
+                    if (record == null) {
+                        flush();
+                        wait(INTERVAL);
+                        continue;
+                    }
+                    tb.reset();
+                    parseAndForward(record);
                 }
-                tb.reset();
-                parseAndForward(record);
             }
         } catch (InterruptedException e) {
             //TODO: Find out what could cause an interrupted exception beside termination of a job/feed
-            LOGGER.warn("Feed has been interrupted. Closing the feed");
+            LOGGER.warn("Feed has been interrupted. Closing the feed", e);
+            Thread.currentThread().interrupt();
         } catch (Exception e) {
             failed = true;
             tupleForwarder.flush();
@@ -90,10 +113,13 @@
             hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
         } finally {
             closeSignal();
-            if (hde != null) {
-                throw hde;
+            if (sendMarker && result != null) {
+                result.cancel(true);
             }
         }
+        if (hde != null) {
+            throw hde;
+        }
     }
 
     private void parseAndForward(IRawRecord<? extends T> record) throws IOException {
@@ -170,4 +196,53 @@
         // This is not a parser record. most likely, this error happened in the record reader.
         return recordReader.handleException(th);
     }
+
+    private class DataflowMarker implements Runnable {
+        private final IFeedMarker marker;
+        private final VSizeFrame mark;
+        private volatile boolean stopped = false;
+
+        public DataflowMarker(IFeedMarker marker, VSizeFrame mark) {
+            this.marker = marker;
+            this.mark = mark;
+        }
+
+        public synchronized void stop() {
+            stopped = true;
+            notify();
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    synchronized (this) {
+                        if (!stopped) {
+                            // TODO (amoudi): find a better reactive way to do this
+                            // sleep for two seconds
+                            wait(TimeUnit.SECONDS.toMillis(2));
+                        } else {
+                            break;
+                        }
+                    }
+                    synchronized (mutex) {
+                        if (marker.mark(mark)) {
+                            // broadcast
+                            tupleForwarder.flush();
+                            // clear
+                            mark.getBuffer().clear();
+                            mark.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+                            mark.getBuffer().flip();
+                        }
+                    }
+                }
+            } catch (InterruptedException e) {
+                LOGGER.warn("Marker stopped", e);
+                Thread.currentThread().interrupt();
+                return;
+            } catch (Exception e) {
+                LOGGER.warn("Marker stopped", e);
+            }
+        }
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index f1eb870..36c6c2f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -30,10 +30,12 @@
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 
 public class FeedTupleForwarder implements ITupleForwarder {
 
@@ -59,7 +61,7 @@
             this.writer = writer;
             this.appender = new FrameTupleAppender(frame);
             // Set null feed message
-            VSizeFrame message = (VSizeFrame) ctx.getSharedObject();
+            VSizeFrame message = TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
             // a null message
             message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
             message.getBuffer().flip();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index 44aac60..790289a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -32,8 +32,8 @@
 
     public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
             FeedLogManager feedLogManager, int numOfOutputFields, IRecordWithMetadataParser<T> dataParser,
-            IRecordReader<T> recordReader) throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+            IRecordReader<T> recordReader, boolean sendMarker) throws HyracksDataException {
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 2e1c83f..6159908 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -72,11 +72,12 @@
             IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool framePool)
             throws HyracksDataException {
         this.writer = writer;
+
         this.spiller =
-                new FrameSpiller(ctx,
+                fpa.spillToDiskOnCongestion() ? new FrameSpiller(ctx,
                         connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_"
                                 + runtimeId.getFeedRuntimeType() + "_" + runtimeId.getPartition(),
-                        fpa.getMaxSpillOnDisk());
+                        fpa.getMaxSpillOnDisk()) : null;
         this.exceptionHandler = new FeedExceptionHandler(ctx, fta);
         this.fpa = fpa;
         this.framePool = framePool;
@@ -122,7 +123,9 @@
             LOGGER.log(Level.WARNING, th.getMessage(), th);
         }
         try {
-            spiller.close();
+            if (spiller != null) {
+                spiller.close();
+            }
         } catch (Throwable th) {
             LOGGER.log(Level.WARNING, th.getMessage(), th);
         }
@@ -459,34 +462,34 @@
                     frame = inbox.poll();
                     if (frame == null) {
                         // Memory queue is empty. Check spill
-                        frame = spiller.next();
-                        while (frame != null) {
-                            if (consume(frame) != null) {
-                                // We don't release the frame since this is a spill frame that we didn't get from memory
-                                // manager
-                                return;
-                            }
+                        if (spiller != null) {
                             frame = spiller.next();
+                            while (frame != null) {
+                                if (consume(frame) != null) {
+                                    // We don't release the frame since this is a spill frame that we didn't get from memory
+                                    // manager
+                                    return;
+                                }
+                                frame = spiller.next();
+                            }
                         }
                         writer.flush();
                         // At this point. We consumed all memory and spilled
                         // We can't assume the next will be in memory. what if there is 0 memory?
                         synchronized (mutex) {
                             frame = inbox.poll();
-                            if (frame == null) {
-                                // Nothing in memory
-                                if (spiller.switchToMemory()) {
-                                    if (poisoned) {
-                                        break;
-                                    }
-                                    if (DEBUG) {
-                                        LOGGER.info("Consumer is going to sleep");
-                                    }
-                                    // Nothing in disk
-                                    mutex.wait();
-                                    if (DEBUG) {
-                                        LOGGER.info("Consumer is waking up");
-                                    }
+                            // Nothing in memory
+                            if (frame == null && (spiller == null || spiller.switchToMemory())) {
+                                if (poisoned) {
+                                    break;
+                                }
+                                if (DEBUG) {
+                                    LOGGER.info("Consumer is going to sleep");
+                                }
+                                // Nothing in disk
+                                mutex.wait();
+                                if (DEBUG) {
+                                    LOGGER.info("Consumer is waking up");
                                 }
                             }
                         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
index a2f19bb..09e03a3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
@@ -28,10 +28,10 @@
 import java.nio.file.Files;
 import java.util.ArrayDeque;
 
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 /**
@@ -49,28 +49,30 @@
     private final String fileNamePrefix;
     private final ArrayDeque<File> files = new ArrayDeque<>();
     private final VSizeFrame frame;
-    private final int budget;           // Max current frames in disk allowed
-    private BufferedOutputStream bos;   // Current output stream
-    private BufferedInputStream bis;    // Current input stream
-    private File currentWriteFile;      // Current write file
-    private File currentReadFile;       // Current read file
-    private int currentWriteCount = 0;  // Current file write count
-    private int currentReadCount = 0;   // Current file read count
-    private int totalWriteCount = 0;    // Total frames spilled
-    private int totalReadCount = 0;     // Total frames read
-    private int fileCount = 0;          // How many spill files?
+    private final int budget; // Max current frames in disk allowed
+    private BufferedOutputStream bos; // Current output stream
+    private BufferedInputStream bis; // Current input stream
+    private File currentWriteFile; // Current write file
+    private File currentReadFile; // Current read file
+    private int currentWriteCount = 0; // Current file write count
+    private int currentReadCount = 0; // Current file read count
+    private int totalWriteCount = 0; // Total frames spilled
+    private int totalReadCount = 0; // Total frames read
+    private int fileCount = 0; // How many spill files?
 
     public FrameSpiller(IHyracksTaskContext ctx, String fileNamePrefix, long budgetInBytes)
             throws HyracksDataException {
         this.frame = new VSizeFrame(ctx);
         this.fileNamePrefix = fileNamePrefix;
-        this.budget = (int) (budgetInBytes / ctx.getInitialFrameSize());
-
+        this.budget = (int) Math.min(budgetInBytes / ctx.getInitialFrameSize(), Integer.MAX_VALUE);
+        if (budget <= 0) {
+            throw new HyracksDataException("Invalid budget " + budgetInBytes + ". Budget must be larger than 0");
+        }
     }
 
     public void open() throws HyracksDataException {
         try {
-            this.currentWriteFile = createFile();
+            this.currentWriteFile = StoragePathUtil.createFile(fileNamePrefix, fileCount++);
             this.currentReadFile = currentWriteFile;
             this.bos = new BufferedOutputStream(new FileOutputStream(currentWriteFile));
             this.bis = new BufferedInputStream(new FileInputStream(currentReadFile));
@@ -135,7 +137,7 @@
     }
 
     public double usedBudget() {
-        return ((double) (totalWriteCount - totalReadCount) / (double) budget);
+        return (double) (totalWriteCount - totalReadCount) / (double) budget;
     }
 
     public synchronized boolean spill(ByteBuffer frame) throws HyracksDataException {
@@ -150,7 +152,7 @@
             if (currentWriteCount >= FRAMES_PER_FILE) {
                 bos.close();
                 currentWriteCount = 0;
-                currentWriteFile = createFile();
+                currentWriteFile = StoragePathUtil.createFile(fileNamePrefix, fileCount++);
                 files.add(currentWriteFile);
                 bos = new BufferedOutputStream(new FileOutputStream(currentWriteFile));
             }
@@ -161,26 +163,6 @@
         }
     }
 
-    private File createFile() throws HyracksDataException {
-        try {
-            String fileName = fileNamePrefix + "_" + fileCount++;
-            File file = new File(fileName);
-            if (!file.exists()) {
-                boolean success = file.createNewFile();
-                if (!success) {
-                    throw new HyracksDataException("Unable to create spill file " + fileName);
-                } else {
-                    if (LOGGER.isEnabledFor(Level.INFO)) {
-                        LOGGER.info("Created spill file " + file.getAbsolutePath());
-                    }
-                }
-            }
-            return file;
-        } catch (Throwable th) {
-            throw new HyracksDataException(th);
-        }
-    }
-
     public synchronized void close() {
         // Do proper cleanup
         if (bos != null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index 8ee3e2b..9661890 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -28,6 +28,8 @@
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 
 public class IngestionRuntime extends SubscribableRuntime {
 
@@ -48,8 +50,9 @@
         dWriter.subscribe(collector);
         subscribers.add(collectionRuntime);
         if (numSubscribers == 0) {
-            ctx.setSharedObject(new VSizeFrame(ctx));
-            collectionRuntime.getCtx().setSharedObject(ctx.getSharedObject());
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE,
+                    TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx), collectionRuntime.getCtx());
             adapterRuntimeManager.start();
         }
         numSubscribers++;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 54e17ef..37a42a7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -40,7 +40,9 @@
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 /*
@@ -112,7 +114,7 @@
         this.feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getFeedManager();
         this.message = new VSizeFrame(ctx);
-        ctx.setSharedObject(message);
+        TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
         this.opDesc = feedMetaOperatorDescriptor;
         this.recordDescProvider = recordDescProvider;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 6f679f7..95bebad 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -41,7 +41,9 @@
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -107,7 +109,7 @@
                 .getApplicationObject()).getFeedManager();
         this.targetId = targetId;
         this.message = new VSizeFrame(ctx);
-        ctx.setSharedObject(message);
+        TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
         this.recordDescProvider = recordDescProvider;
         this.opDesc = feedMetaOperatorDescriptor;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 4ad08b3..98cb4b0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -59,7 +59,7 @@
     public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx,
             int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory,
             Map<String, String> configuration, boolean indexingOp, boolean isFeed, FeedLogManager feedLogManager)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         try {
             switch (dataSourceFactory.getDataSourceType()) {
                 case RECORDS:
@@ -67,6 +67,7 @@
                     IRecordReader<?> recordReader = recordReaderFactory.createRecordReader(ctx, partition);
                     IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
                     IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
+                    boolean sendMarker = ExternalDataUtils.isSendMarker(configuration);
                     if (indexingOp) {
                         return new IndexingDataFlowController(ctx,
                                 DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser,
@@ -80,18 +81,19 @@
                             if (isChangeFeed) {
                                 int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
                                 return new ChangeFeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager,
-                                        numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader);
+                                        numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader,
+                                        sendMarker);
                             } else {
                                 return new FeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager, 2,
-                                        (IRecordWithMetadataParser) dataParser, recordReader);
+                                        (IRecordWithMetadataParser) dataParser, recordReader, sendMarker);
                             }
                         } else if (isChangeFeed) {
                             int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
                             return new ChangeFeedDataFlowController(ctx, tupleForwarder, feedLogManager, numOfKeys + 1,
-                                    (IRecordWithPKDataParser) dataParser, recordReader);
+                                    (IRecordWithPKDataParser) dataParser, recordReader, sendMarker);
                         } else {
                             return new FeedRecordDataFlowController(ctx, tupleForwarder, feedLogManager, 1, dataParser,
-                                    recordReader);
+                                    recordReader, sendMarker);
                         }
                     } else {
                         return new RecordDataFlowController(ctx,
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
index ad945f2..ed811ad 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
 public class DataflowUtils {
     public static void addTupleToFrame(FrameTupleAppender appender, ArrayTupleBuilder tb, IFrameWriter writer)
@@ -67,4 +68,14 @@
                 throw new HyracksDataException("Unknown tuple forward policy");
         }
     }
+
+    public static void addTupleToFrame(FrameTupleAppender appender, ITupleReference tuple, IFrameWriter writer)
+            throws HyracksDataException {
+        if (!appender.append(tuple)) {
+            appender.write(writer, true);
+            if (!appender.append(tuple)) {
+                throw new HyracksDataException("Tuple is too large for a frame");
+            }
+        }
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 55dee04..e251f32 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -46,6 +46,8 @@
     public static final String KEY_FILESYSTEM = "fs";
     // specifies the address of the HDFS name node
     public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
+    // specifies whether a feed sends progress markers or not
+    public static final String KEY_SEND_MARKER = "send-marker";
     // specifies the class implementation of the accessed instance of HDFS
     public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
     public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 19781f9..23cd39c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -332,4 +332,8 @@
         }
         return intIndicators;
     }
+
+    public static boolean isSendMarker(Map<String, String> configuration) {
+        return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_SEND_MARKER));
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 6b7eb31..6e1b9e8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -64,6 +64,9 @@
         SPILL,              // Memory budget has been consumed. Now we're writing to disk
         DISCARD             // Memory and Disk space budgets have been consumed. Now we're discarding
     }
+    
+    private FeedUtils() {
+    }
 
     private static String prepareDataverseFeedName(String dataverseName, String feedName) {
         return dataverseName + File.separator + feedName;
@@ -87,7 +90,7 @@
             throw new AsterixException("Can't create file splits for adapter with count partitioning constraints");
         }
         String[] locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
-        List<FileSplit> splits = new ArrayList<FileSplit>();
+        List<FileSplit> splits = new ArrayList<>();
         for (String nd : locations) {
             splits.add(splitsForAdapter(dataverseName, feedName, nd,
                     AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0]));
@@ -120,6 +123,7 @@
         int offset = fta.getTupleStartOffset(tc);
         int len = fta.getTupleLength(tc);
         int newSize = FrameHelper.calcAlignedFrameSizeToStore(1, len, message.getMinSize());
+        message.reset();
         message.ensureFrameSize(newSize);
         message.getBuffer().clear();
         message.getBuffer().put(input.array(), offset, len);
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
index 820ae5f..7a5b722 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
@@ -22,6 +22,7 @@
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 
 public class CreateDataverseStatement extends Statement {
 
@@ -31,11 +32,7 @@
 
     public CreateDataverseStatement(Identifier dataverseName, String format, boolean ifNotExists) {
         this.dataverseName = dataverseName;
-        if (format == null) {
-            this.format = "org.apache.asterix.runtime.formats.NonTaggedDataFormat";
-        } else {
-            this.format = format;
-        }
+        this.format = (format == null) ? NonTaggedDataFormat.class.getName() : format;
         this.ifNotExists = ifNotExists;
     }
 
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java
index 8738c97..000339f7 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java
@@ -58,7 +58,7 @@
         return (((long) (bytes[offset] & 0xff)) << 56) + (((long) (bytes[offset + 1] & 0xff)) << 48)
                 + (((long) (bytes[offset + 2] & 0xff)) << 40) + (((long) (bytes[offset + 3] & 0xff)) << 32)
                 + (((long) (bytes[offset + 4] & 0xff)) << 24) + (((long) (bytes[offset + 5] & 0xff)) << 16)
-                + (((long) (bytes[offset + 6] & 0xff)) << 8) + (((long) (bytes[offset + 7] & 0xff)) << 0);
+                + (((long) (bytes[offset + 6] & 0xff)) << 8) + (bytes[offset + 7] & 0xff);
     }
 
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index cabfc77..d247490 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -119,8 +119,9 @@
         lsmComponentLSNMappingService = new LSMComponentsSyncService();
         replicationNotifier = new ReplicationNotifier();
         replicationThreads = Executors.newCachedThreadPool(appContext.getThreadFactory());
-        Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
-                .getAppContext()).getMetadataProperties().getNodePartitions();
+        Map<String, ClusterPartition[]> nodePartitions =
+                ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider.getAppContext()).getMetadataProperties()
+                        .getNodePartitions();
         Set<String> nodeReplicationClients = replicationProperties.getNodeReplicationClients(nodeId);
         List<Integer> clientsPartitions = new ArrayList<>();
         for (String clientId : nodeReplicationClients) {
@@ -141,8 +142,8 @@
         try {
             serverSocketChannel = ServerSocketChannel.open();
             serverSocketChannel.configureBlocking(true);
-            InetSocketAddress replicationChannelAddress = new InetSocketAddress(InetAddress.getByName(nodeIP),
-                    dataPort);
+            InetSocketAddress replicationChannelAddress =
+                    new InetSocketAddress(InetAddress.getByName(nodeIP), dataPort);
             serverSocketChannel.socket().bind(replicationChannelAddress);
             lsmComponentLSNMappingService.start();
             replicationNotifier.start();
@@ -169,8 +170,9 @@
         if (remainingFile == 0) {
             if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null
                     && replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) {
-                int remainingIndexes = replicaUniqueLSN2RemoteMapping
-                        .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet();
+                int remainingIndexes =
+                        replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes
+                                .decrementAndGet();
                 if (remainingIndexes == 0) {
                     /**
                      * Note: there is a chance that this will never be removed because some
@@ -216,8 +218,8 @@
         public void run() {
             Thread.currentThread().setName("Replication Thread");
             try {
-                ReplicationRequestType replicationFunction = ReplicationProtocol.getRequestType(socketChannel,
-                        inBuffer);
+                ReplicationRequestType replicationFunction =
+                        ReplicationProtocol.getRequestType(socketChannel, inBuffer);
                 while (replicationFunction != ReplicationRequestType.GOODBYE) {
                     switch (replicationFunction) {
                         case REPLICATE_LOG:
@@ -281,8 +283,8 @@
             Set<Integer> datasetsToForceFlush = new HashSet<>();
             for (IndexInfo iInfo : openIndexesInfo) {
                 if (requestedIndexesToBeFlushed.contains(iInfo.getResourceId())) {
-                    AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex()
-                            .getIOOperationCallback();
+                    AbstractLSMIOOperationCallback ioCallback =
+                            (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
                     //if an index has a pending flush, then the request to flush it will succeed.
                     if (ioCallback.hasPendingFlush()) {
                         //remove index to indicate that it will be flushed
@@ -373,8 +375,9 @@
             List<String> filesList;
             Set<String> replicaIds = request.getReplicaIds();
             Set<String> requesterExistingFiles = request.getExistingFiles();
-            Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) appContextProvider
-                    .getAppContext()).getMetadataProperties().getNodePartitions();
+            Map<String, ClusterPartition[]> nodePartitions =
+                    ((IAsterixPropertiesProvider) appContextProvider.getAppContext()).getMetadataProperties()
+                            .getNodePartitions();
             for (String replicaId : replicaIds) {
                 //get replica partitions
                 ClusterPartition[] replicaPatitions = nodePartitions.get(replicaId);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index 857f1e2..afd6019 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -25,6 +25,8 @@
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.transactions.ILogMarkerCallback;
+import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
@@ -41,6 +43,7 @@
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
 import org.apache.hyracks.storage.am.common.api.IIndexCursor;
@@ -114,8 +117,11 @@
         writer.open();
         indexHelper.open();
         index = indexHelper.getIndexInstance();
-
         try {
+            if (ctx.getSharedObject() != null) {
+                PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) index);
+                TaskUtils.putInSharedMap(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
+            }
             missingTupleBuilder = new ArrayTupleBuilder(1);
             DataOutput out = missingTupleBuilder.getDataOutput();
             try {
@@ -135,8 +141,8 @@
                     .createSearchOperationCallback(indexHelper.getResourceID(), ctx, this));
             cursor = indexAccessor.createSearchCursor(false);
             frameTuple = new FrameTupleReference();
-            IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                    .getApplicationContext().getApplicationObject();
+            IAsterixAppRuntimeContext runtimeCtx =
+                    (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
             AsterixLSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
                     runtimeCtx.getTransactionSubsystem().getLogManager());
         } catch (Exception e) {
@@ -241,10 +247,7 @@
                 writeOutput(i, recordWasInserted);
                 i++;
             }
-            if (tupleCount > 0) {
-                // All tuples has to move forward to maintain the correctness of the transaction pipeline
-                appender.write(writer, true);
-            }
+            appender.write(writer, true);
         } catch (IndexException | IOException | AsterixException e) {
             throw new HyracksDataException(e);
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 94d2a8c..9a66aa5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -80,8 +80,8 @@
 
     public LogManager(TransactionSubsystem txnSubsystem) {
         this.txnSubsystem = txnSubsystem;
-        logManagerProperties = new LogManagerProperties(this.txnSubsystem.getTransactionProperties(),
-                this.txnSubsystem.getId());
+        logManagerProperties =
+                new LogManagerProperties(this.txnSubsystem.getTransactionProperties(), this.txnSubsystem.getId());
         logFileSize = logManagerProperties.getLogPartitionSize();
         logPageSize = logManagerProperties.getLogPageSize();
         numLogPages = logManagerProperties.getNumLogPages();
@@ -172,6 +172,9 @@
         if (logRecord.getLogType() == LogType.FLUSH) {
             logRecord.setLSN(appendLSN.get());
         }
+        if (logRecord.isMarker()) {
+            logRecord.logAppended(appendLSN.get());
+        }
         appendLSN.addAndGet(logRecord.getLogSize());
     }
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index afb926b..0183b29 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -92,7 +92,7 @@
     private final long SHARP_CHECKPOINT_LSN = -1;
     private final boolean replicationEnabled;
     public static final long NON_SHARP_CHECKPOINT_TARGET_LSN = -1;
-    private final static String RECOVERY_FILES_DIR_NAME = "recovery_temp";
+    private static final String RECOVERY_FILES_DIR_NAME = "recovery_temp";
     private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null;
     private final long cachedEntityCommitsPerJobSize;
     private final PersistentLocalResourceRepository localResourceRepository;
@@ -108,8 +108,8 @@
         this.txnSubsystem = txnSubsystem;
         logMgr = (LogManager) txnSubsystem.getLogManager();
         checkpointHistory = txnSubsystem.getTransactionProperties().getCheckpointHistory();
-        IAsterixPropertiesProvider propertiesProvider = (IAsterixPropertiesProvider) txnSubsystem
-                .getAsterixAppRuntimeContextProvider().getAppContext();
+        IAsterixPropertiesProvider propertiesProvider =
+                (IAsterixPropertiesProvider) txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext();
         replicationEnabled = propertiesProvider.getReplicationProperties().isReplicationEnabled();
         localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getLocalResourceRepository();
@@ -271,6 +271,7 @@
                     break;
                 case LogType.FLUSH:
                 case LogType.WAIT:
+                case LogType.MARKER:
                     break;
                 default:
                     throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
@@ -361,10 +362,10 @@
                                 //if index is not registered into IndexLifeCycleManager,
                                 //create the index using LocalMetadata stored in LocalResourceRepository
                                 //get partition path in this node
-                                String partitionIODevicePath = localResourceRepository
-                                        .getPartitionPath(localResource.getPartition());
-                                String resourceAbsolutePath = partitionIODevicePath + File.separator
-                                        + localResource.getResourceName();
+                                String partitionIODevicePath =
+                                        localResourceRepository.getPartitionPath(localResource.getPartition());
+                                String resourceAbsolutePath =
+                                        partitionIODevicePath + File.separator + localResource.getResourceName();
                                 localResource.setResourcePath(resourceAbsolutePath);
                                 index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
                                 if (index == null) {
@@ -379,8 +380,8 @@
                                     //#. get maxDiskLastLSN
                                     ILSMIndex lsmIndex = index;
                                     try {
-                                        maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex
-                                                .getIOOperationCallback())
+                                        maxDiskLastLsn =
+                                                ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
                                                         .getComponentLSN(lsmIndex.getImmutableComponents());
                                     } catch (HyracksDataException e) {
                                         datasetLifecycleManager.close(resourceAbsolutePath);
@@ -405,6 +406,7 @@
                     case LogType.ABORT:
                     case LogType.FLUSH:
                     case LogType.UPSERT_ENTITY_COMMIT:
+                    case LogType.MARKER:
                         //do nothing
                         break;
                     default:
@@ -443,8 +445,8 @@
         //right after the new checkpoint file is written.
         File[] prevCheckpointFiles = getPreviousCheckpointFiles();
 
-        IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getDatasetLifecycleManager();
+        IDatasetLifecycleManager datasetLifecycleManager =
+                txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
         //flush all in-memory components if it is the sharp checkpoint
         if (isSharpCheckpoint) {
             datasetLifecycleManager.flushAllDatasets();
@@ -467,8 +469,8 @@
                         Set<Integer> deadReplicasPartitions = new HashSet<>();
                         //get partitions of the dead replicas that are not active on this node
                         for (String deadReplicaId : deadReplicaIds) {
-                            ClusterPartition[] nodePartitons = metadataProperties.getNodePartitions()
-                                    .get(deadReplicaId);
+                            ClusterPartition[] nodePartitons =
+                                    metadataProperties.getNodePartitions().get(deadReplicaId);
                             for (ClusterPartition partition : nodePartitons) {
                                 if (!localResourceRepository.getActivePartitions()
                                         .contains(partition.getPartitionId())) {
@@ -492,8 +494,8 @@
                 datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(nonSharpCheckpointTargetLSN);
                 if (replicationEnabled) {
                     //request remote replicas to flush lagging indexes
-                    IReplicationManager replicationManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                            .getAppContext().getReplicationManager();
+                    IReplicationManager replicationManager =
+                            txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicationManager();
                     try {
                         replicationManager.requestFlushLaggingReplicaIndexes(nonSharpCheckpointTargetLSN);
                     } catch (IOException e) {
@@ -564,16 +566,16 @@
 
     @Override
     public long getLocalMinFirstLSN() throws HyracksDataException {
-        IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getDatasetLifecycleManager();
+        IDatasetLifecycleManager datasetLifecycleManager =
+                txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
         List<IIndex> openIndexList = datasetLifecycleManager.getOpenIndexes();
         long firstLSN;
         //the min first lsn can only be the current append or smaller
         long minFirstLSN = logMgr.getAppendLSN();
         if (openIndexList.size() > 0) {
             for (IIndex index : openIndexList) {
-                AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) index)
-                        .getIOOperationCallback();
+                AbstractLSMIOOperationCallback ioCallback =
+                        (AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback();
 
                 if (!((AbstractLSMIndex) index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
                     firstLSN = ioCallback.getFirstLSN();
@@ -585,8 +587,8 @@
     }
 
     private long getRemoteMinFirstLSN() {
-        IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getAppContext().getReplicaResourcesManager();
+        IReplicaResourcesManager remoteResourcesManager =
+                txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
         long minRemoteLSN = remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions());
         return minRemoteLSN;
     }
@@ -783,6 +785,7 @@
                     case LogType.ABORT:
                     case LogType.FLUSH:
                     case LogType.WAIT:
+                    case LogType.MARKER:
                         //ignore
                         break;
                     default:
@@ -798,8 +801,8 @@
             //undo loserTxn's effect
             LOGGER.log(Level.INFO, "undoing loser transaction's effect");
 
-            IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                    .getDatasetLifecycleManager();
+            IDatasetLifecycleManager datasetLifecycleManager =
+                    txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
             //TODO sort loser entities by smallest LSN to undo in one pass.
             Iterator<Entry<TxnId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator();
             int undoCount = 0;
@@ -855,10 +858,10 @@
 
     private static void undo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) {
         try {
-            ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
-                    logRecord.getResourceId());
-            ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
+            ILSMIndex index =
+                    (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
+            ILSMIndexAccessor indexAccessor =
+                    index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
                 indexAccessor.forceDelete(logRecord.getNewValue());
             } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {
@@ -873,10 +876,10 @@
 
     private static void redo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) {
         try {
-            ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
-                    logRecord.getResourceId());
-            ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
+            ILSMIndex index =
+                    (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
+            ILSMIndexAccessor indexAccessor =
+                    index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
                 indexAccessor.forceInsert(logRecord.getNewValue());
             } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
index d94f933..19d7afb 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
@@ -64,6 +64,7 @@
 
     /**
      * Provide data to the stream of this {@link IFrameWriter}.
+     *
      * @param buffer
      *            - Buffer containing data.
      * @throws HyracksDataException
@@ -72,21 +73,24 @@
 
     /**
      * request the frame to push its content forward and flush its consumers
+     *
      * @throws HyracksDataException
      */
     public default void flush() throws HyracksDataException {
-        throw new HyracksDataException("flush() is not supported in this IFrameWriter");
+        // No Op
     }
 
     /**
      * Indicate that a failure was encountered and the current stream is to be
      * aborted.
+     *
      * @throws HyracksDataException
      */
     public void fail() throws HyracksDataException;
 
     /**
      * Close this {@link IFrameWriter} and give up all resources.
+     *
      * @throws HyracksDataException
      */
     public void close() throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index 3781489..4eb3ebf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -45,7 +45,7 @@
 
     public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId) throws Exception;
 
-    public void setSharedObject(Object sharedObject);
+    public void setSharedObject(Object object);
 
     public Object getSharedObject();
 }
diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
similarity index 82%
rename from asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
index 11a2510..8d55235 100644
--- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
@@ -16,10 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.event.util;
+package org.apache.hyracks.api.util;
 
-public class AsterixConstants {
+public class HyracksConstants {
+    public static final String KEY_MESSAGE = "HYX:MSG";
 
-    public static String ASTERIX_ROOT_METADATA_DIR = "asterix_root_metadata";
-
+    private HyracksConstants() {
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 2f8def1..43cac74 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -260,7 +260,8 @@
         init();
 
         datasetNetworkManager.start();
-        IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), ncConfig.retries);
+        IIPCHandle ccIPCHandle =
+                ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), ncConfig.retries);
         this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
@@ -270,12 +271,11 @@
         // Use "public" versions of network addresses and ports
         NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
         NetworkAddress netAddress = netManager.getPublicNetworkAddress();
-        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress,
-                datasetAddress, osMXBean.getName(), osMXBean.getArch(), osMXBean
-                        .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean
-                        .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean
-                        .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
-                runtimeMXBean.getSystemProperties(), hbSchema));
+        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
+                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
+                runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+                runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
 
         synchronized (this) {
             while (registrationPending) {
@@ -490,7 +490,8 @@
             CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
             switch (fn.getFunctionId()) {
                 case SEND_APPLICATION_MESSAGE: {
-                    CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+                    CCNCFunctions.SendApplicationMessageFunction amf =
+                            (CCNCFunctions.SendApplicationMessageFunction) fn;
                     queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
                             amf.getDeploymentId(), amf.getNodeId()));
                     return;
@@ -515,7 +516,8 @@
                 }
 
                 case REPORT_PARTITION_AVAILABILITY: {
-                    CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
+                    CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
+                            (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
                     queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this,
                             rpaf.getPartitionId(), rpaf.getNetworkAddress()));
                     return;
@@ -528,7 +530,8 @@
                 }
 
                 case GET_NODE_CONTROLLERS_INFO_RESPONSE: {
-                    CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf = (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
+                    CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf =
+                            (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
                     setNodeControllersInfo(gncirf.getNodeControllerInfos());
                     return;
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 134154c..f463bfa 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -394,8 +394,8 @@
     }
 
     @Override
-    public void setSharedObject(Object sharedObject) {
-        this.sharedObject = sharedObject;
+    public void setSharedObject(Object object) {
+        this.sharedObject = object;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 7d12296..7d90a8b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -106,20 +106,20 @@
                                 ByteBuffer buffer = ctx.allocateFrame();
                                 boolean fail = false;
                                 boolean done = false;
-                                boolean flush = false;
                                 while (!fail && !done) {
                                     synchronized (MaterializingPipelinedPartition.this) {
-                                        if (flushRequest) {
-                                            flushRequest = false;
-                                            flush = true;
-                                        }
-                                        while (offset >= size && !eos && !failed && !flush) {
+                                        while (offset >= size && !eos && !failed) {
+                                            if (flushRequest) {
+                                                flushRequest = false;
+                                                writer.flush();
+                                            }
                                             try {
                                                 MaterializingPipelinedPartition.this.wait();
                                             } catch (InterruptedException e) {
                                                 throw new HyracksDataException(e);
                                             }
                                         }
+                                        flushRequest = false;
                                         fail = failed;
                                         done = eos && offset >= size;
                                     }
@@ -134,10 +134,6 @@
                                         offset += readLen;
                                         buffer.flip();
                                         writer.nextFrame(buffer);
-                                        if (flush) {
-                                            writer.flush();
-                                            flush = false;
-                                        }
                                     }
                                 }
                             }
@@ -213,4 +209,4 @@
         flushRequest = true;
         notifyAll();
     }
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index e7131d5..57f8072 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -89,9 +89,7 @@
     @Override
     public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
         getBuffer().clear();
-        if (getTupleCount() > 0) {
-            outWriter.nextFrame(getBuffer());
-        }
+        outWriter.nextFrame(getBuffer());
         if (clearFrame) {
             frame.reset();
             reset(getBuffer(), true);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index ef11b5b..3ef8b28 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -45,13 +45,13 @@
      * append fieldSlots and bytes to the current frame
      */
     @Override
-    public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException {
-        if (canHoldNewTuple(fieldSlots.length, length)) {
-            for (int i = 0; i < fieldSlots.length; ++i) {
-                IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fieldSlots[i]);
+    public boolean append(int[] fieldEndOffsets, byte[] bytes, int offset, int length) throws HyracksDataException {
+        if (canHoldNewTuple(fieldEndOffsets.length, length)) {
+            for (int i = 0; i < fieldEndOffsets.length; ++i) {
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fieldEndOffsets[i]);
             }
-            System.arraycopy(bytes, offset, array, tupleDataEndOffset + fieldSlots.length * 4, length);
-            tupleDataEndOffset += fieldSlots.length * 4 + length;
+            System.arraycopy(bytes, offset, array, tupleDataEndOffset + fieldEndOffsets.length * 4, length);
+            tupleDataEndOffset += fieldEndOffsets.length * 4 + length;
             IntSerDeUtils.putInt(getBuffer().array(),
                     FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
@@ -63,19 +63,24 @@
     }
 
     public boolean append(ITupleReference tuple) throws HyracksDataException {
-        int tupleSize = 0;
+        int length = 0;
         for (int i = 0; i < tuple.getFieldCount(); i++) {
-            tupleSize += tuple.getFieldLength(i);
+            length += tuple.getFieldLength(i);
         }
-        if (canHoldNewTuple(tuple.getFieldCount(), tupleSize)) {
-            int offset = 0;
+
+        if (canHoldNewTuple(tuple.getFieldCount(), length)) {
+            length = 0;
             for (int i = 0; i < tuple.getFieldCount(); ++i) {
-                IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, offset);
-                System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i), array,
-                        tupleDataEndOffset + tuple.getFieldCount() * 4, tuple.getFieldLength(i));
-                offset += tuple.getFieldLength(i);
+                length += tuple.getFieldLength(i);
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, length);
             }
-            tupleDataEndOffset += tuple.getFieldCount() * 4 + tupleSize;
+            length = 0;
+            for (int i = 0; i < tuple.getFieldCount(); ++i) {
+                System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i), array,
+                        tupleDataEndOffset + tuple.getFieldCount() * 4 + length, tuple.getFieldLength(i));
+                length += tuple.getFieldLength(i);
+            }
+            tupleDataEndOffset += tuple.getFieldCount() * 4 + length;
             IntSerDeUtils.putInt(getBuffer().array(),
                     FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index cae659d..7f518cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -26,7 +26,9 @@
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.util.IntSerDeUtils;
 
 /**
@@ -39,7 +41,9 @@
     private static final int NULL_MESSAGE_SIZE = 1;
     public static final byte NULL_FEED_MESSAGE = 0x01;
     public static final byte ACK_REQ_FEED_MESSAGE = 0x02;
-    public static final byte SNAPSHOT_MESSAGE = 0x03;
+    public static final byte MARKER_MESSAGE = 0x03;
+    private boolean initialized = false;
+    private VSizeFrame message;
 
     public MessagingFrameTupleAppender(IHyracksTaskContext ctx) {
         this.ctx = ctx;
@@ -59,8 +63,8 @@
             case ACK_REQ_FEED_MESSAGE:
                 aString.append("Ack Request, ");
                 break;
-            case SNAPSHOT_MESSAGE:
-                aString.append("Snapshot, ");
+            case MARKER_MESSAGE:
+                aString.append("Marker, ");
                 break;
             default:
                 aString.append("Unknown, ");
@@ -78,8 +82,8 @@
                 return NULL_FEED_MESSAGE;
             case ACK_REQ_FEED_MESSAGE:
                 return ACK_REQ_FEED_MESSAGE;
-            case SNAPSHOT_MESSAGE:
-                return SNAPSHOT_MESSAGE;
+            case MARKER_MESSAGE:
+                return MARKER_MESSAGE;
             default:
                 throw new HyracksDataException("Unknown message type");
         }
@@ -101,24 +105,35 @@
 
     @Override
     public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
+        if (!initialized) {
+            message = TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
+            initialized = true;
+        }
         // If message fits, we append it, otherwise, we append a null message, then send a message only
         // frame with the message
-        ByteBuffer message = ((VSizeFrame) ctx.getSharedObject()).getBuffer();
-        int messageSize = message.limit() - message.position();
-        if (hasEnoughSpace(1, messageSize)) {
-            appendMessage(message);
-            forward(outWriter);
-        } else {
+        if (message == null) {
             if (tupleCount > 0) {
                 appendNullMessage();
                 forward(outWriter);
             }
-            if (!hasEnoughSpace(1, messageSize)) {
-                frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize, frame.getMinSize()));
-                reset(frame.getBuffer(), true);
+        } else {
+            ByteBuffer buffer = message.getBuffer();
+            int messageSize = buffer.limit() - buffer.position();
+            if (hasEnoughSpace(1, messageSize)) {
+                appendMessage(buffer);
+                forward(outWriter);
+            } else {
+                if (tupleCount > 0) {
+                    appendNullMessage();
+                    forward(outWriter);
+                }
+                if (!hasEnoughSpace(1, messageSize)) {
+                    frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize, frame.getMinSize()));
+                    reset(frame.getBuffer(), true);
+                }
+                appendMessage(buffer);
+                forward(outWriter);
             }
-            appendMessage(message);
-            forward(outWriter);
         }
     }
 
@@ -130,8 +145,9 @@
     }
 
     private void appendMessage(ByteBuffer message) {
-        System.arraycopy(message.array(), message.position(), array, tupleDataEndOffset, message.limit());
-        tupleDataEndOffset += message.limit();
+        int messageLength = message.limit() - message.position();
+        System.arraycopy(message.array(), message.position(), array, tupleDataEndOffset, messageLength);
+        tupleDataEndOffset += messageLength;
         IntSerDeUtils.putInt(getBuffer().array(),
                 FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
         ++tupleCount;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/TaskUtils.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/TaskUtils.java
new file mode 100644
index 0000000..4f27d79
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/TaskUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * A Utility class for facilitating common operations used with a hyracks task
+ */
+public class TaskUtils {
+    private TaskUtils() {
+    }
+
+    /**
+     * get the shared object of a task as a Map<String,Object>
+     *
+     * @param ctx
+     *            the task context
+     * @param create
+     * @return the task shared map
+     */
+    @SuppressWarnings("unchecked")
+    public static Map<String, Object> getSharedMap(IHyracksTaskContext ctx, boolean create) {
+        if (ctx.getSharedObject() != null) {
+            return (Map<String, Object>) ctx.getSharedObject();
+        } else if (create) {
+            Map<String, Object> taskMap = new HashMap<>();
+            ctx.setSharedObject(taskMap);
+            return taskMap;
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * put the key value pair in a map task object
+     *
+     * @param key
+     * @param ctx
+     * @param object
+     */
+    public static void putInSharedMap(String key, Object object, IHyracksTaskContext ctx) {
+        TaskUtils.getSharedMap(ctx, true).put(key, object);
+    }
+
+    /**
+     * get a <T> object from the shared map of the task
+     *
+     * @param key
+     * @param ctx
+     * @return the value associated with the key casted as T
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T get(String key, IHyracksTaskContext ctx) {
+        Map<String, Object> sharedMap = TaskUtils.getSharedMap(ctx, false);
+        return sharedMap == null ? null : (T) sharedMap.get(key);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 30ee3c0..b4e51be 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -65,8 +65,8 @@
             RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException {
         this.groupFields = groupFields;
         this.comparators = comparators;
-        this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields,
-                writer);
+        this.aggregator =
+                aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields, writer);
         this.aggregateState = aggregator.createAggregateStates();
         copyFrame = new VSizeFrame(ctx);
         inFrameAccessor = new FrameTupleAccessor(inRecordDesc);
@@ -91,29 +91,32 @@
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         inFrameAccessor.reset(buffer);
         int nTuples = inFrameAccessor.getTupleCount();
-        for (int i = 0; i < nTuples; ++i) {
-            if (first) {
+        if (nTuples != 0) {
+            for (int i = 0; i < nTuples; ++i) {
+                if (first) {
 
-                tupleBuilder.reset();
-                for (int j = 0; j < groupFields.length; j++) {
-                    tupleBuilder.addField(inFrameAccessor, i, groupFields[j]);
-                }
-                aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
+                    tupleBuilder.reset();
+                    for (int j = 0; j < groupFields.length; j++) {
+                        tupleBuilder.addField(inFrameAccessor, i, groupFields[j]);
+                    }
+                    aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
 
-                first = false;
+                    first = false;
 
-            } else {
-                if (i == 0) {
-                    switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
                 } else {
-                    switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
-                }
+                    if (i == 0) {
+                        switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor,
+                                i);
+                    } else {
+                        switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+                    }
 
+                }
             }
+            copyFrame.ensureFrameSize(buffer.capacity());
+            FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
+            copyFrameAccessor.reset(copyFrame.getBuffer());
         }
-        copyFrame.ensureFrameSize(buffer.capacity());
-        FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
-        copyFrameAccessor.reset(copyFrame.getBuffer());
     }
 
     private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
diff --git a/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml b/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
index 7267cb7..7784199 100644
--- a/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
@@ -33,4 +33,35 @@
     <root.dir>${basedir}/../../..</root.dir>
   </properties>
 
+  <build>
+      <pluginManagement>
+          <plugins>
+              <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+              <plugin>
+                  <groupId>org.eclipse.m2e</groupId>
+                  <artifactId>lifecycle-mapping</artifactId>
+                  <version>1.0.0</version>
+                  <configuration>
+                      <lifecycleMappingMetadata>
+                          <pluginExecutions>
+                              <pluginExecution>
+                                  <pluginExecutionFilter>
+                                      <groupId>org.apache.maven.plugins</groupId>
+                                      <artifactId>maven-plugin-plugin</artifactId>
+                                      <versionRange>[3.3,)</versionRange>
+                                      <goals>
+                                          <goal>descriptor</goal>
+                                      </goals>
+                                  </pluginExecutionFilter>
+                                  <action>
+                                      <ignore></ignore>
+                                  </action>
+                              </pluginExecution>
+                          </pluginExecutions>
+                      </lifecycleMappingMetadata>
+                  </configuration>
+              </plugin>
+          </plugins>
+      </pluginManagement>
+  </build>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetaDataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetaDataPageManager.java
index 2550ab4..3982a3a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetaDataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetaDataPageManager.java
@@ -31,12 +31,15 @@
      */
     /**
      * Open an index file's metadata
-     * @param fileId The file which to open the metadata of
+     *
+     * @param fileId
+     *            The file which to open the metadata of
      */
     public void open(int fileId);
 
     /**
      * Close an index file's metadata.
+     *
      * @throws HyracksDataException
      */
 
@@ -44,7 +47,9 @@
 
     /**
      * Get the location of a free page to use for index operations
-     * @param metaFrame A metadata frame to use to wrap the raw page
+     *
+     * @param metaFrame
+     *            A metadata frame to use to wrap the raw page
      * @return A page location, or -1 if no free page could be found or allocated
      * @throws HyracksDataException
      */
@@ -53,7 +58,9 @@
 
     /**
      * Get the location of a block of free pages to use for index operations
-     * @param metaFrame A metadata frame to use to wrap the raw page
+     *
+     * @param metaFrame
+     *            A metadata frame to use to wrap the raw page
      * @return The starting page location, or -1 if a block of free pages could be found or allocated
      * @throws HyracksDataException
      */
@@ -62,8 +69,11 @@
 
     /**
      * Add a page back to the pool of free pages within an index file
-     * @param metaFrame A metadata frame to use to wrap the raw page
-     * @param freePage The page which to return to the free space
+     *
+     * @param metaFrame
+     *            A metadata frame to use to wrap the raw page
+     * @param freePage
+     *            The page which to return to the free space
      * @throws HyracksDataException
      */
 
@@ -74,7 +84,9 @@
 
     /**
      * Gets the highest page offset according to the metadata
-     * @param metaFrame A metadata frame to use to wrap the raw page
+     *
+     * @param metaFrame
+     *            A metadata frame to use to wrap the raw page
      * @return The locaiton of the highest offset page
      * @throws HyracksDataException
      */
@@ -83,8 +95,11 @@
 
     /**
      * Initializes the index metadata
-     * @param metaFrame A metadata farme to use to wrap the raw page
-     * @param currentMaxPage The highest page offset to consider valid
+     *
+     * @param metaFrame
+     *            A metadata farme to use to wrap the raw page
+     * @param currentMaxPage
+     *            The highest page offset to consider valid
      * @throws HyracksDataException
      */
 
@@ -105,6 +120,7 @@
 
     /**
      * Determines where the metadata page is located in an index file
+     *
      * @return The locaiton of the metadata page, or -1 if the file appears to be corrupt
      * @throws HyracksDataException
      */
@@ -113,7 +129,9 @@
 
     /**
      * Initializes the metadata manager on an open index file
-     * @param metaFrame A metadata frame used to wrap the raw page
+     *
+     * @param metaFrame
+     *            A metadata frame used to wrap the raw page
      * @throws HyracksDataException
      */
 
@@ -121,6 +139,7 @@
 
     /**
      * Locate the filter page in an index file
+     *
      * @return The offset of the filter page if it exists, or less than zero if no filter page exists yet
      * @throws HyracksDataException
      */
@@ -135,7 +154,9 @@
 
     /**
      * Set the cached page to manage for filter data
-     * @param page The page to manage
+     *
+     * @param page
+     *            The page to manage
      */
 
     void setFilterPage(ICachedPage page);
@@ -149,4 +170,6 @@
      * @throws HyracksDataException
      */
     long getLSNOffset() throws HyracksDataException;
+
+    public long getLastMarkerLSN() throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetaDataFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetaDataFrame.java
index e33b949..bdfa9e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetaDataFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetaDataFrame.java
@@ -62,4 +62,9 @@
     public long getLSN();
 
     public void setLSN(long lsn);
+
+    // Special placeholder for LSN information of a marker log. used for rollback information
+    public long getLastMarkerLSN();
+
+    public void setLastMarkerLSN(long lsn);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 4f9e6c4..44778a2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -199,7 +199,9 @@
         if (index != null) {
             // if index == null, then the index open was not successful
             try {
-                appender.write(writer, true);
+                if (appender.getTupleCount() > 0) {
+                    appender.write(writer, true);
+                }
             } catch (Throwable th) {
                 closeException = new HyracksDataException(th);
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
index 16fdecd..38b43c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
@@ -34,38 +34,43 @@
     // Arbitrarily chosen magic integer.
     protected static final int MAGIC_VALID_INT = 0x5bd1e995;
 
-    protected static final int tupleCountOff = 0; //0
-    protected static final int freeSpaceOff = tupleCountOff + 4; //4
-    protected static final int maxPageOff = freeSpaceOff + 4; //8
-    protected static final int levelOff = maxPageOff + 12; //20
-    protected static final int nextPageOff = levelOff + 1; // 21
-    protected static final int validOff = nextPageOff + 4; // 25
+    protected static final int TUPLE_COUNT_OFFSET = 0; //0
+    protected static final int FREE_SPACE_OFFSET = TUPLE_COUNT_OFFSET + 4; //4
+    protected static final int MAX_PAGE_OFFSET = FREE_SPACE_OFFSET + 4; //8
+    protected static final int LEVEL_OFFSET = MAX_PAGE_OFFSET + 12; //20
+    protected static final int NEXT_PAGE_OFFSET = LEVEL_OFFSET + 1; // 21
+    protected static final int VALID_OFFSET = NEXT_PAGE_OFFSET + 4; // 25
 
     // The additionalFilteringPageOff is used only for LSM indexes.
     // We store the page id that will be used to store the information of the the filter that is associated with a disk component.
     // It is only set in the first meta page other meta pages (i.e., with level -2) have junk in the max page field.
-    private static final int additionalFilteringPageOff = validOff + 4; // 29
-    public static final int lsnOff = additionalFilteringPageOff + 4; // 33
+    private static final int ADDITIONAL_FILTERING_PAGE_OFFSET = VALID_OFFSET + 4; // 29
+    public static final int LSN_OFFSET = ADDITIONAL_FILTERING_PAGE_OFFSET + 4; // 33
+    private static final int LAST_MARKER_LSN_OFFSET = LSN_OFFSET + 8; // 41
+    private static final int HEADER_END_OFFSET = LAST_MARKER_LSN_OFFSET + 8;
 
     protected ICachedPage page = null;
     protected ByteBuffer buf = null;
 
+    @Override
     public int getMaxPage() {
-        return buf.getInt(maxPageOff);
+        return buf.getInt(MAX_PAGE_OFFSET);
     }
 
+    @Override
     public void setMaxPage(int maxPage) {
-        buf.putInt(maxPageOff, maxPage);
+        buf.putInt(MAX_PAGE_OFFSET, maxPage);
     }
 
+    @Override
     public int getFreePage() {
-        int tupleCount = buf.getInt(tupleCountOff);
+        int tupleCount = buf.getInt(TUPLE_COUNT_OFFSET);
         if (tupleCount > 0) {
             // return the last page from the linked list of free pages
             // TODO: this is a dumb policy, but good enough for now
-            int lastPageOff = buf.getInt(freeSpaceOff) - 4;
-            buf.putInt(freeSpaceOff, lastPageOff);
-            buf.putInt(tupleCountOff, tupleCount - 1);
+            int lastPageOff = buf.getInt(FREE_SPACE_OFFSET) - 4;
+            buf.putInt(FREE_SPACE_OFFSET, lastPageOff);
+            buf.putInt(TUPLE_COUNT_OFFSET, tupleCount - 1);
             return buf.getInt(lastPageOff);
         } else {
             return -1;
@@ -75,26 +80,28 @@
     // must be checked before adding free page
     // user of this class is responsible for getting a free page as a new meta
     // page, latching it, etc. if there is no space on this page
+    @Override
     public boolean hasSpace() {
-        return buf.getInt(freeSpaceOff) + 4 < buf.capacity();
+        return buf.getInt(FREE_SPACE_OFFSET) + 4 < buf.capacity();
     }
 
     // no bounds checking is done, there must be free space
+    @Override
     public void addFreePage(int freePage) {
-        int freeSpace = buf.getInt(freeSpaceOff);
+        int freeSpace = buf.getInt(FREE_SPACE_OFFSET);
         buf.putInt(freeSpace, freePage);
-        buf.putInt(freeSpaceOff, freeSpace + 4);
-        buf.putInt(tupleCountOff, buf.getInt(tupleCountOff) + 1);
+        buf.putInt(FREE_SPACE_OFFSET, freeSpace + 4);
+        buf.putInt(TUPLE_COUNT_OFFSET, buf.getInt(TUPLE_COUNT_OFFSET) + 1);
     }
 
     @Override
     public byte getLevel() {
-        return buf.get(levelOff);
+        return buf.get(LEVEL_OFFSET);
     }
 
     @Override
     public void setLevel(byte level) {
-        buf.put(levelOff, level);
+        buf.put(LEVEL_OFFSET, level);
     }
 
     @Override
@@ -110,56 +117,67 @@
 
     @Override
     public void initBuffer(byte level) {
-        buf.putInt(tupleCountOff, 0);
-        buf.putInt(freeSpaceOff, lsnOff + 8);
-        buf.putInt(maxPageOff, 0);
-        buf.put(levelOff, level);
-        buf.putInt(nextPageOff, -1);
-        buf.putInt(additionalFilteringPageOff, -1);
+        buf.putInt(TUPLE_COUNT_OFFSET, 0);
+        buf.putInt(FREE_SPACE_OFFSET, HEADER_END_OFFSET);
+        buf.putInt(MAX_PAGE_OFFSET, 0);
+        buf.put(LEVEL_OFFSET, level);
+        buf.putInt(NEXT_PAGE_OFFSET, -1);
+        buf.putInt(ADDITIONAL_FILTERING_PAGE_OFFSET, -1);
+        buf.putLong(LAST_MARKER_LSN_OFFSET, -1L);
         setValid(false);
     }
 
     @Override
     public int getNextPage() {
-        return buf.getInt(nextPageOff);
+        return buf.getInt(NEXT_PAGE_OFFSET);
     }
 
     @Override
     public void setNextPage(int nextPage) {
-        buf.putInt(nextPageOff, nextPage);
+        buf.putInt(NEXT_PAGE_OFFSET, nextPage);
     }
 
     @Override
     public boolean isValid() {
-        return buf.getInt(validOff) == MAGIC_VALID_INT;
+        return buf.getInt(VALID_OFFSET) == MAGIC_VALID_INT;
     }
 
     @Override
     public void setValid(boolean isValid) {
         if (isValid) {
-            buf.putInt(validOff, MAGIC_VALID_INT);
+            buf.putInt(VALID_OFFSET, MAGIC_VALID_INT);
         } else {
-            buf.putInt(validOff, 0);
+            buf.putInt(VALID_OFFSET, 0);
         }
     }
 
     @Override
     public long getLSN() {
-        return buf.getLong(lsnOff);
+        return buf.getLong(LSN_OFFSET);
     }
 
     @Override
     public void setLSN(long lsn) {
-        buf.putLong(lsnOff, lsn);
+        buf.putLong(LSN_OFFSET, lsn);
+    }
+
+    @Override
+    public long getLastMarkerLSN() {
+        return buf.getLong(LAST_MARKER_LSN_OFFSET);
+    }
+
+    @Override
+    public void setLastMarkerLSN(long lsn) {
+        buf.putLong(LAST_MARKER_LSN_OFFSET, lsn);
     }
 
     @Override
     public int getLSMComponentFilterPageId() {
-        return buf.getInt(additionalFilteringPageOff);
+        return buf.getInt(ADDITIONAL_FILTERING_PAGE_OFFSET);
     }
 
     @Override
     public void setLSMComponentFilterPageId(int filterPage) {
-        buf.putInt(additionalFilteringPageOff, filterPage);
+        buf.putInt(ADDITIONAL_FILTERING_PAGE_OFFSET, filterPage);
     }
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
index 468654a..4d56a80 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
@@ -361,8 +361,9 @@
      */
     @Override
     public int getFirstMetadataPage() throws HyracksDataException {
-        if (headPage != IBufferCache.INVALID_PAGEID)
+        if (headPage != IBufferCache.INVALID_PAGEID) {
             return headPage;
+        }
 
         ITreeIndexMetaDataFrame metaFrame = metaDataFrameFactory.createFrame();
 
@@ -472,8 +473,29 @@
     public long getLSNOffset() throws HyracksDataException {
         int metadataPageNum = getFirstMetadataPage();
         if (metadataPageNum != IBufferCache.INVALID_PAGEID) {
-            return (metadataPageNum * bufferCache.getPageSize()) + LIFOMetaDataFrame.lsnOff;
+            return (metadataPageNum * (long) bufferCache.getPageSize()) + LIFOMetaDataFrame.LSN_OFFSET;
         }
         return IMetaDataPageManager.INVALID_LSN_OFFSET;
     }
+
+    @Override
+    public long getLastMarkerLSN() throws HyracksDataException {
+        ICachedPage metaNode;
+        if (!appendOnly || (appendOnly && confiscatedMetaNode == null)) {
+            metaNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, getFirstMetadataPage()), false);
+        } else {
+            metaNode = confiscatedMetaNode;
+        }
+        ITreeIndexMetaDataFrame metaFrame = metaDataFrameFactory.createFrame();
+        metaNode.acquireReadLatch();
+        try {
+            metaFrame.setPage(metaNode);
+            return metaFrame.getLastMarkerLSN();
+        } finally {
+            metaNode.releaseReadLatch();
+            if (!appendOnly || (appendOnly && confiscatedMetaNode == null)) {
+                bufferCache.unpin(metaNode);
+            }
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 9e75360..6d673a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -129,8 +129,8 @@
             ++i;
         }
         componentFactory = new LSMBTreeDiskComponentFactory(diskBTreeFactory, bloomFilterFactory, filterFactory);
-        bulkLoadComponentFactory = new LSMBTreeDiskComponentFactory(bulkLoadBTreeFactory, bloomFilterFactory,
-                filterFactory);
+        bulkLoadComponentFactory =
+                new LSMBTreeDiskComponentFactory(bulkLoadBTreeFactory, bloomFilterFactory, filterFactory);
         this.needKeyDupCheck = needKeyDupCheck;
         this.btreeFields = btreeFields;
         this.hasBloomFilter = needKeyDupCheck;
@@ -184,9 +184,9 @@
         for (LSMComponentFileReferences lsmComonentFileReference : validFileReferences) {
             LSMBTreeDiskComponent component;
             try {
-                component = createDiskComponent(componentFactory,
-                        lsmComonentFileReference.getInsertIndexFileReference(),
-                        lsmComonentFileReference.getBloomFilterFileReference(), false);
+                component =
+                        createDiskComponent(componentFactory, lsmComonentFileReference.getInsertIndexFileReference(),
+                                lsmComonentFileReference.getBloomFilterFileReference(), false);
             } catch (IndexException e) {
                 throw new HyracksDataException(e);
             }
@@ -424,7 +424,7 @@
         ILSMIndexAccessorInternal flushAccessor = new LSMBTreeAccessor(lsmHarness, opCtx);
         ioScheduler.scheduleOperation(new LSMBTreeFlushOperation(flushAccessor, flushingComponent,
                 componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(),
-                callback, fileManager.getBaseDir()));
+                callback, fileManager.getBaseDir(), flushingComponent.getMostRecentMarkerLSN()));
     }
 
     @Override
@@ -489,7 +489,7 @@
             filterManager.updateFilterInfo(component.getLSMComponentFilter(), filterTuples);
             filterManager.writeFilterInfo(component.getLSMComponentFilter(), component.getBTree());
         }
-
+        component.setMostRecentMarkerLSN(flushOp.getPrevMarkerLSN());
         bulkLoader.end();
 
         return component;
@@ -511,8 +511,8 @@
         BTree lastBTree = ((LSMBTreeDiskComponent) mergingComponents.get(mergingComponents.size() - 1)).getBTree();
         FileReference firstFile = firstBTree.getFileReference();
         FileReference lastFile = lastBTree.getFileReference();
-        LSMComponentFileReferences relMergeFileRefs = fileManager
-                .getRelMergeFileReference(firstFile.getFile().getName(), lastFile.getFile().getName());
+        LSMComponentFileReferences relMergeFileRefs =
+                fileManager.getRelMergeFileReference(firstFile.getFile().getName(), lastFile.getFile().getName());
         ILSMIndexAccessorInternal accessor = new LSMBTreeAccessor(lsmHarness, opCtx);
         ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, mergingComponents, cursor,
                 relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(),
@@ -542,8 +542,8 @@
         LSMBTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getBTreeMergeTarget(),
                 mergeOp.getBloomFilterMergeTarget(), true);
 
-        IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements, false,
-                true);
+        IIndexBulkLoader bulkLoader =
+                mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements, false, true);
         IIndexBulkLoader builder = null;
         if (hasBloomFilter) {
             builder = mergedComponent.getBloomFilter().createBuilder(numElements, bloomFilterSpec.getNumHashes(),
@@ -574,6 +574,8 @@
             filterManager.writeFilterInfo(mergedComponent.getLSMComponentFilter(), mergedComponent.getBTree());
         }
 
+        mergedComponent
+                .setMostRecentMarkerLSN(mergedComponents.get(mergedComponents.size() - 1).getMostRecentMarkerLSN());
         bulkLoader.end();
 
         return mergedComponent;
@@ -581,7 +583,7 @@
 
     protected LSMBTreeDiskComponent createDiskComponent(LSMBTreeDiskComponentFactory factory,
             FileReference btreeFileRef, FileReference bloomFilterFileRef, boolean createComponent)
-                    throws HyracksDataException, IndexException {
+            throws HyracksDataException, IndexException {
         // Create new BTree instance.
         LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) factory
                 .createLSMComponentInstance(new LSMComponentFileReferences(btreeFileRef, null, bloomFilterFileRef));
@@ -595,6 +597,11 @@
         if (component.getLSMComponentFilter() != null && !createComponent) {
             filterManager.readFilterInfo(component.getLSMComponentFilter(), component.getBTree());
         }
+
+        if (!createComponent) {
+            component.readMostRecentMarkerLSN(component.getBTree());
+        }
+
         return component;
     }
 
@@ -651,8 +658,8 @@
 
             if (hasBloomFilter) {
                 int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
-                BloomFilterSpecification bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement,
-                        bloomFilterFalsePositiveRate);
+                BloomFilterSpecification bloomFilterSpec =
+                        BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
                 builder = ((LSMBTreeDiskComponent) component).getBloomFilter().createBuilder(numElementsHint,
                         bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
             } else {
@@ -727,7 +734,7 @@
                     filterManager.writeFilterInfo(component.getLSMComponentFilter(),
                             ((LSMBTreeDiskComponent) component).getBTree());
                 }
-
+                component.setMostRecentMarkerLSN(-1L);
                 bulkLoader.end();
 
                 if (isEmptyComponent) {
@@ -792,36 +799,36 @@
 
     @Override
     public ITreeIndexFrameFactory getInteriorFrameFactory() {
-        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) memoryComponents
-                .get(currentMutableComponentId.get());
+        LSMBTreeMemoryComponent mutableComponent =
+                (LSMBTreeMemoryComponent) memoryComponents.get(currentMutableComponentId.get());
         return mutableComponent.getBTree().getInteriorFrameFactory();
     }
 
     @Override
     public int getFieldCount() {
-        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) memoryComponents
-                .get(currentMutableComponentId.get());
+        LSMBTreeMemoryComponent mutableComponent =
+                (LSMBTreeMemoryComponent) memoryComponents.get(currentMutableComponentId.get());
         return mutableComponent.getBTree().getFieldCount();
     }
 
     @Override
     public int getFileId() {
-        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) memoryComponents
-                .get(currentMutableComponentId.get());
+        LSMBTreeMemoryComponent mutableComponent =
+                (LSMBTreeMemoryComponent) memoryComponents.get(currentMutableComponentId.get());
         return mutableComponent.getBTree().getFileId();
     }
 
     @Override
     public IMetaDataPageManager getMetaManager() {
-        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) memoryComponents
-                .get(currentMutableComponentId.get());
+        LSMBTreeMemoryComponent mutableComponent =
+                (LSMBTreeMemoryComponent) memoryComponents.get(currentMutableComponentId.get());
         return mutableComponent.getBTree().getMetaManager();
     }
 
     @Override
     public ITreeIndexFrameFactory getLeafFrameFactory() {
-        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) memoryComponents
-                .get(currentMutableComponentId.get());
+        LSMBTreeMemoryComponent mutableComponent =
+                (LSMBTreeMemoryComponent) memoryComponents.get(currentMutableComponentId.get());
         return mutableComponent.getBTree().getLeafFrameFactory();
     }
 
@@ -838,8 +845,8 @@
 
     @Override
     public int getRootPageId() {
-        LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) memoryComponents
-                .get(currentMutableComponentId.get());
+        LSMBTreeMemoryComponent mutableComponent =
+                (LSMBTreeMemoryComponent) memoryComponents.get(currentMutableComponentId.get());
         return mutableComponent.getBTree().getRootPageId();
     }
 
@@ -892,11 +899,21 @@
         if (memoryComponentsAllocated) {
             return;
         }
+        long markerLSN = -1L;
+        if (!diskComponents.isEmpty()) {
+            markerLSN = diskComponents.get(diskComponents.size() - 1).getMostRecentMarkerLSN();
+        } else {
+            // Needed in case a marker was added before any record
+            if (memoryComponents != null && !memoryComponents.isEmpty()) {
+                markerLSN = memoryComponents.get(0).getMostRecentMarkerLSN();
+            }
+        }
         for (ILSMComponent c : memoryComponents) {
             LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
             ((IVirtualBufferCache) mutableComponent.getBTree().getBufferCache()).open();
             mutableComponent.getBTree().create();
             mutableComponent.getBTree().activate();
+            mutableComponent.setMostRecentMarkerLSN(markerLSN);
         }
         memoryComponentsAllocated = true;
     }
@@ -946,4 +963,14 @@
             memoryComponentsAllocated = false;
         }
     }
+
+    public synchronized long getMostRecentMarkerLSN() throws HyracksDataException {
+        if (!isPrimaryIndex()) {
+            throw new HyracksDataException("Markers are only supported for primary indexes");
+        }
+        LSMBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+        opCtx.setOperation(IndexOperation.SEARCH);
+        getOperationalComponents(opCtx);
+        return !opCtx.getComponentHolder().isEmpty() ? opCtx.getComponentHolder().get(0).getMostRecentMarkerLSN() : -1L;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index c43590b..424dfd3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -21,6 +21,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
 import org.apache.hyracks.storage.am.btree.impls.BTree;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractDiskLSMComponent;
 
@@ -62,4 +63,9 @@
     public int getFileReferenceCount() {
         return btree.getBufferCache().getFileReferenceCount(btree.getFileId());
     }
+
+    public void readMostRecentMarkerLSN(BTree treeIndex) throws HyracksDataException {
+        IMetaDataPageManager treeMetaManager = treeIndex.getMetaManager();
+        mostRecentMarkerLSN = treeMetaManager.getLastMarkerLSN();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
index a30527d..f82a1b0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
@@ -39,16 +39,18 @@
     private final FileReference bloomFilterFlushTarget;
     private final ILSMIOOperationCallback callback;
     private final String indexIdentifier;
+    private final long prevMarkerLSN;
 
     public LSMBTreeFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
             FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback,
-            String indexIdentifier) {
+            String indexIdentifier, long prevMarkerLSN) {
         this.accessor = accessor;
         this.flushingComponent = flushingComponent;
         this.btreeFlushTarget = btreeFlushTarget;
         this.bloomFilterFlushTarget = bloomFilterFlushTarget;
         this.callback = callback;
         this.indexIdentifier = indexIdentifier;
+        this.prevMarkerLSN = prevMarkerLSN;
     }
 
     @Override
@@ -107,4 +109,8 @@
     public int compareTo(LSMBTreeFlushOperation o) {
         return btreeFlushTarget.getFile().getName().compareTo(o.getBTreeFlushTarget().getFile().getName());
     }
+
+    public long getPrevMarkerLSN() {
+        return prevMarkerLSN;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMemoryComponent.java
index dbf47a1..1a103f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMemoryComponent.java
@@ -29,11 +29,16 @@
 
     private final BTree btree;
 
-    public LSMBTreeMemoryComponent(BTree btree, IVirtualBufferCache vbc, boolean isActive, ILSMComponentFilter filter) {
-        super(vbc, isActive, filter);
+    public LSMBTreeMemoryComponent(BTree btree, IVirtualBufferCache vbc, boolean isActive, ILSMComponentFilter filter,
+            long mostRecentMarkerLSN) {
+        super(vbc, isActive, filter, mostRecentMarkerLSN);
         this.btree = btree;
     }
 
+    public LSMBTreeMemoryComponent(BTree btree, IVirtualBufferCache vbc, boolean isActive, ILSMComponentFilter filter) {
+        this(btree, vbc, isActive, filter, -1L);
+    }
+
     public BTree getBTree() {
         return btree;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
index b847876..a888dd5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
@@ -46,4 +46,8 @@
     public ComponentState getState();
 
     public ILSMComponentFilter getLSMComponentFilter();
+
+    public void setMostRecentMarkerLSN(long lsn);
+
+    public long getMostRecentMarkerLSN();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualMetaDataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualMetaDataPageManager.java
index c83cbf2..158c68f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualMetaDataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualMetaDataPageManager.java
@@ -67,10 +67,12 @@
         return NullMetadataFrameFactory.INSTANCE;
     }
 
+    @Override
     public int getCapacity() {
         return capacity - 2;
     }
 
+    @Override
     public void reset() {
         currentPageId.set(1);
     }
@@ -157,10 +159,12 @@
         // Method doesn't make sense for this free page manager.
     }
 
+    @Override
     public void setFilterPage(ICachedPage page) {
         // Method doesn't make sense for this free page manager.
     }
 
+    @Override
     public ICachedPage getFilterPage() {
         return null;
     }
@@ -174,4 +178,10 @@
     public long getLSNOffset() throws HyracksDataException {
         return IMetaDataPageManager.INVALID_LSN_OFFSET;
     }
+
+    @Override
+    public long getLastMarkerLSN() throws HyracksDataException {
+        // Method doesn't make sense for this free page manager.
+        return -1L;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
index b2c55dc..c678878 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
@@ -26,10 +26,16 @@
     protected ComponentState state;
     protected int readerCount;
     protected final ILSMComponentFilter filter;
+    protected long mostRecentMarkerLSN;
+
+    public AbstractLSMComponent(ILSMComponentFilter filter, long mostRecentMarkerLSN) {
+        this.filter = filter;
+        this.mostRecentMarkerLSN = mostRecentMarkerLSN;
+        readerCount = 0;
+    }
 
     public AbstractLSMComponent(ILSMComponentFilter filter) {
-        this.filter = filter;
-        readerCount = 0;
+        this(filter, -1L);
     }
 
     public AbstractLSMComponent() {
@@ -45,4 +51,14 @@
     public ILSMComponentFilter getLSMComponentFilter() {
         return filter;
     }
+
+    @Override
+    public long getMostRecentMarkerLSN() {
+        return mostRecentMarkerLSN;
+    }
+
+    @Override
+    public void setMostRecentMarkerLSN(long lsn) {
+        this.mostRecentMarkerLSN = lsn;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 440ad31..cb3a2db 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -275,8 +275,8 @@
         }
 
         //create replication job and submit it
-        LSMIndexReplicationJob job = new LSMIndexReplicationJob(this, ctx, componentFiles, operation, executionType,
-                opType);
+        LSMIndexReplicationJob job =
+                new LSMIndexReplicationJob(this, ctx, componentFiles, operation, executionType, opType);
         try {
             diskBufferCache.getIOReplicationManager().submitJob(job);
         } catch (IOException e) {
@@ -296,4 +296,8 @@
     public boolean isDurable() {
         return durable;
     }
+
+    public ILSMComponent getCurrentMemoryComponent() {
+        return memoryComponents.get(currentMutableComponentId.get());
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
index 4b8fc9b..500996f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
@@ -32,8 +32,9 @@
     private int writerCount;
     private boolean requestedToBeActive;
 
-    public AbstractMemoryLSMComponent(IVirtualBufferCache vbc, boolean isActive, ILSMComponentFilter filter) {
-        super(filter);
+    public AbstractMemoryLSMComponent(IVirtualBufferCache vbc, boolean isActive, ILSMComponentFilter filter,
+            long mostRecentMarkerLSN) {
+        super(filter, mostRecentMarkerLSN);
         this.vbc = vbc;
         writerCount = 0;
         if (isActive) {
@@ -44,6 +45,10 @@
         isModified = new AtomicBoolean();
     }
 
+    public AbstractMemoryLSMComponent(IVirtualBufferCache vbc, boolean isActive, ILSMComponentFilter filter) {
+        this(vbc, isActive, filter, -1L);
+    }
+
     public AbstractMemoryLSMComponent(IVirtualBufferCache vbc, boolean isActive) {
         this(vbc, isActive, null);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 8ddab88..896d513 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -172,7 +172,11 @@
                 lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.FLUSH);
                 // Changing the flush status should *always* precede changing the mutable component.
                 lsmIndex.changeFlushStatusForCurrentMutableCompoent(false);
+                // Flushing! => carry over the marker lsn to the next component
+                long mostRecentMarkerLSN =
+                        ((AbstractLSMIndex) lsmIndex).getCurrentMemoryComponent().getMostRecentMarkerLSN();
                 lsmIndex.changeMutableComponent();
+                ((AbstractLSMIndex) lsmIndex).getCurrentMemoryComponent().setMostRecentMarkerLSN(mostRecentMarkerLSN);
                 // Notify all waiting threads whenever a flush has been scheduled since they will check
                 // again if they can grab and enter the mutable component.
                 opTracker.notifyAll();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 3cca96e..92155e4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -133,7 +133,8 @@
     }
 
     @Override
-    public void scheduleReplication(List<ILSMComponent> lsmComponents, boolean bulkload, LSMOperationType opType) throws HyracksDataException {
+    public void scheduleReplication(List<ILSMComponent> lsmComponents, boolean bulkload, LSMOperationType opType)
+            throws HyracksDataException {
         ctx.setOperation(IndexOperation.REPLICATE);
         ctx.getComponentsToBeReplicated().clear();
         ctx.getComponentsToBeReplicated().addAll(lsmComponents);
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index f2ec64b..aa0a7bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -42,7 +42,6 @@
     private final TestJobletContext jobletContext;
     private final TaskAttemptId taskId;
     private WorkspaceFileFactory fileFactory;
-
     private Map<Object, IStateObject> stateObjectMap = new HashMap<>();
     private Object sharedObject;
 
@@ -149,12 +148,12 @@
     }
 
     @Override
-    public Object getSharedObject() {
-        return sharedObject;
+    public void setSharedObject(Object object) {
+        sharedObject = object;
     }
 
     @Override
-    public void setSharedObject(Object sharedObject) {
-        this.sharedObject = sharedObject;
+    public Object getSharedObject() {
+        return sharedObject;
     }
 }