[ASTERIXDB-1985][CLUS] Add rebalance callback

Change-Id: I9a90ba975467c136371236195f82d48430d8319d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1863
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
index f1a123c..03958ed 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
@@ -121,7 +121,7 @@
                         hcc.getNodeControllerInfos());
 
                 // Flush the cached contents of the dataset to file system.
-                FlushDatasetUtil.flushDataset(hcc, metadataProvider, dataverseName, datasetName, datasetName);
+                FlushDatasetUtil.flushDataset(hcc, metadataProvider, dataverseName, datasetName);
 
                 // Metadata transaction commits.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
index 3bd1be5..e9d231a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
@@ -43,6 +43,7 @@
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.rebalance.NoOpDatasetRebalanceCallback;
 import org.apache.asterix.utils.RebalanceUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -242,7 +243,7 @@
         IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
         MetadataProvider metadataProvider = new MetadataProvider(appCtx, null, new StorageComponentProvider());
         RebalanceUtil.rebalance(dataverseName, datasetName, new LinkedHashSet<>(Arrays.asList(targetNodes)),
-                metadataProvider, hcc);
+                metadataProvider, hcc, NoOpDatasetRebalanceCallback.INSTANCE);
     }
 
     // Sends HTTP response to the request client.
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 2967a38..bd5c024 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -34,9 +34,9 @@
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -55,12 +55,12 @@
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.config.ExternalProperties;
+import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.config.DatasetConfig.TransactionState;
-import org.apache.asterix.common.config.ExternalProperties;
-import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -159,16 +159,16 @@
 import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.asterix.translator.AbstractLangTranslator;
-import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
-import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionOutput;
 import org.apache.asterix.translator.TypeTranslator;
+import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
+import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.util.ValidateUtil;
 import org.apache.asterix.utils.DataverseUtil;
 import org.apache.asterix.utils.FeedOperations;
@@ -967,8 +967,7 @@
 
             // #. flush the internal dataset for correlated policy
             if (ds.isCorrelated() && ds.getDatasetType() == DatasetType.INTERNAL) {
-                FlushDatasetUtil.flushDataset(hcc, metadataProvider, index.getDataverseName(), index.getDatasetName(),
-                        index.getDatasetName());
+                FlushDatasetUtil.flushDataset(hcc, metadataProvider, index.getDataverseName(), index.getDatasetName());
             }
 
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -2821,7 +2820,7 @@
         }
 
         // Flushes source dataset.
-        FlushDatasetUtil.flushDataset(hcc, metadataProvider, dataverseNameFrom, datasetNameFrom, datasetNameFrom);
+        FlushDatasetUtil.flushDataset(hcc, metadataProvider, dataverseNameFrom, datasetNameFrom);
     }
 
     // Executes external shell commands.
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
new file mode 100644
index 0000000..e8683c9
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.rebalance;
+
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * This interface is used for customizing the before/after operation for rebalance.
+ */
+public interface IDatasetRebalanceCallback {
+
+    /**
+     * The action to perform before the target dataset is populated.
+     *
+     * @param metadataProvider,
+     *            the metadata provider.
+     * @param source,
+     *            the source dataset.
+     * @param target,
+     *            the target dataset.
+     * @param hcc,
+     *            the hyracks client connection.
+     * @throws HyracksDataException
+     */
+    void beforeRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
+            IHyracksClientConnection hcc) throws HyracksDataException;
+
+    /**
+     * The action to perform after the target datasets is populated.
+     *
+     * @param metadataProvider,
+     *            the metadata provider.
+     * @param source,
+     *            the source dataset.
+     * @param target,
+     *            the target dataset.
+     * @param hcc,
+     *            the hyracks client connection.
+     * @throws HyracksDataException
+     */
+    void afterRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target, IHyracksClientConnection hcc)
+            throws HyracksDataException;
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
new file mode 100644
index 0000000..680adbf
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.rebalance;
+
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+
+// The callback performs no action before and after a rebalance.
+public class NoOpDatasetRebalanceCallback implements IDatasetRebalanceCallback {
+
+    public static final NoOpDatasetRebalanceCallback INSTANCE = new NoOpDatasetRebalanceCallback();
+
+    private NoOpDatasetRebalanceCallback() {
+
+    }
+
+    @Override
+    public void beforeRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
+            IHyracksClientConnection hcc) {
+        // Does nothing.
+    }
+
+    @Override
+    public void afterRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
+            IHyracksClientConnection hcc) {
+        // Does nothing.
+    }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
index 5445986..958444c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
@@ -44,7 +44,13 @@
     }
 
     public static void flushDataset(IHyracksClientConnection hcc, MetadataProvider metadataProvider,
-            String dataverseName, String datasetName, String indexName) throws Exception {
+            String dataverseName, String datasetName) throws Exception {
+        Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+        flushDataset(hcc, metadataProvider, dataset);
+    }
+
+    public static void flushDataset(IHyracksClientConnection hcc, MetadataProvider metadataProvider, Dataset dataset)
+            throws Exception {
         CompilerProperties compilerProperties = metadataProvider.getApplicationContext().getCompilerProperties();
         int frameSize = compilerProperties.getFrameSize();
         JobSpecification spec = new JobSpecification(frameSize);
@@ -54,14 +60,13 @@
                 new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs);
 
         org.apache.asterix.common.transactions.JobId jobId = JobIdFactory.generateJobId();
-        Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
-        FlushDatasetOperatorDescriptor flushOperator =
-                new FlushDatasetOperatorDescriptor(spec, jobId, dataset.getDatasetId());
+        FlushDatasetOperatorDescriptor flushOperator = new FlushDatasetOperatorDescriptor(spec, jobId,
+                dataset.getDatasetId());
 
         spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset, indexName);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+                .getSplitProviderAndConstraints(dataset, dataset.getDatasetName());
         AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
 
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 3b17a94..1ed37e0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -42,6 +42,7 @@
 import org.apache.asterix.metadata.lock.MetadataLockManager;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.IndexUtil;
+import org.apache.asterix.rebalance.IDatasetRebalanceCallback;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -85,7 +86,8 @@
      * @throws Exception
      */
     public static void rebalance(String dataverseName, String datasetName, Set<String> targetNcNames,
-            MetadataProvider metadataProvider, IHyracksClientConnection hcc) throws Exception {
+            MetadataProvider metadataProvider, IHyracksClientConnection hcc,
+            IDatasetRebalanceCallback datasetRebalanceCallback) throws Exception {
         Dataset sourceDataset;
         Dataset targetDataset;
         // Executes the first Metadata transaction.
@@ -115,10 +117,11 @@
                     metadataProvider);
 
             // The target dataset for rebalance.
-            targetDataset = new Dataset(sourceDataset, true, nodeGroupName);
+            targetDataset = sourceDataset.getTargetDatasetForRebalance(nodeGroupName);
+
 
             // Rebalances the source dataset into the target dataset.
-            rebalance(sourceDataset, targetDataset, metadataProvider, hcc);
+            rebalance(sourceDataset, targetDataset, metadataProvider, hcc, datasetRebalanceCallback);
 
             // Complete the metadata transaction.
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -192,10 +195,13 @@
 
     // Rebalances from the source to the target.
     private static void rebalance(Dataset source, Dataset target, MetadataProvider metadataProvider,
-            IHyracksClientConnection hcc) throws Exception {
+            IHyracksClientConnection hcc, IDatasetRebalanceCallback datasetRebalanceCallback) throws Exception {
         // Drops the target dataset files (if any) to make rebalance idempotent.
         dropDatasetFiles(target, metadataProvider, hcc);
 
+        // Performs the specified operation before the target dataset is populated.
+        datasetRebalanceCallback.beforeRebalance(metadataProvider, source, target, hcc);
+
         // Creates the rebalance target.
         createRebalanceTarget(target, metadataProvider, hcc);
 
@@ -204,6 +210,9 @@
 
         // Creates and loads indexes for the rebalance target.
         createAndLoadSecondaryIndexesForTarget(source, target, metadataProvider, hcc);
+
+        // Performs the specified operation after the target dataset is populated.
+        datasetRebalanceCallback.afterRebalance(metadataProvider, source, target, hcc);
     }
 
     // Switches the metadata entity from the source dataset to the target dataset.
@@ -305,8 +314,7 @@
     // Creates the commit operator for populating the target dataset.
     private static IOperatorDescriptor createUpsertCommitOp(JobSpecification spec, MetadataProvider metadataProvider,
             JobId jobId, Dataset target) throws AlgebricksException {
-        int numKeys = target.getPrimaryKeys().size();
-        int[] primaryKeyFields = IntStream.range(0, numKeys).toArray();
+        int[] primaryKeyFields = getPrimaryKeyPermutationForUpsert(target);
         return new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
                 new IPushRuntimeFactory[] {
                         target.getCommitRuntimeFactory(metadataProvider, jobId, primaryKeyFields, true) },
@@ -351,4 +359,26 @@
             JobUtils.runJob(hcc, indexLoadingJobSpec, true);
         }
     }
+
+    // Gets the primary key permutation for upserts.
+    private static int[] getPrimaryKeyPermutationForUpsert(Dataset dataset) {
+        // prev record first
+        int f = 1;
+        // add the previous meta second
+        if (dataset.hasMetaPart()) {
+            f++;
+        }
+        // add the previous filter third
+        int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
+        if (numFilterFields > 0) {
+            f++;
+        }
+        int numPrimaryKeys = dataset.getPrimaryKeys().size();
+        int[] pkIndexes = new int[numPrimaryKeys];
+        for (int i = 0; i < pkIndexes.length; i++) {
+            pkIndexes[i] = f;
+            f++;
+        }
+        return pkIndexes;
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 9131692..020ff6c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -40,8 +40,8 @@
 import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.metadata.IDataset;
-import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.common.utils.JobUtils.ProgressState;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
@@ -67,7 +67,6 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.utils.RecordUtil;
-import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
@@ -78,6 +77,7 @@
 import org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
 import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
@@ -176,15 +176,6 @@
                 dataset.hints, dataset.datasetType, dataset.datasetId, dataset.pendingOp, dataset.rebalanceCount);
     }
 
-    public Dataset(Dataset dataset, boolean forRebalance, String targetNodeGroupName) {
-        this(dataset.dataverseName, dataset.datasetName, dataset.recordTypeDataverseName, dataset.recordTypeName,
-                dataset.metaTypeDataverseName, dataset.metaTypeName, targetNodeGroupName,
-                dataset.compactionPolicyFactory,
-                dataset.compactionPolicyProperties, dataset.datasetDetails, dataset.hints, dataset.datasetType,
-                forRebalance ? DatasetIdFactory.generateAlternatingDatasetId(dataset.datasetId) : dataset.datasetId,
-                dataset.pendingOp, forRebalance ? dataset.rebalanceCount + 1 : dataset.rebalanceCount);
-    }
-
     public Dataset(String dataverseName, String datasetName, String itemTypeDataverseName, String itemTypeName,
             String metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName, String compactionPolicy,
             Map<String, String> compactionPolicyProperties, IDatasetDetails datasetDetails, Map<String, String> hints,
@@ -801,6 +792,14 @@
         return IntStream.range(0, numPrimaryKeys).toArray();
     }
 
+    // Gets the target dataset for the purpose of rebalance.
+    public Dataset getTargetDatasetForRebalance(String targetNodeGroupName) {
+        return new Dataset(this.dataverseName, this.datasetName, this.recordTypeDataverseName, this.recordTypeName,
+                this.metaTypeDataverseName, this.metaTypeName, targetNodeGroupName, this.compactionPolicyFactory,
+                this.compactionPolicyProperties, this.datasetDetails, this.hints, this.datasetType,
+                DatasetIdFactory.generateAlternatingDatasetId(this.datasetId), this.pendingOp, this.rebalanceCount + 1);
+    }
+
     // Gets an array of partition numbers for this dataset.
     protected int[] getDatasetPartitions(MetadataProvider metadataProvider) throws AlgebricksException {
         FileSplit[] splitsForDataset = metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
index b55e8ad..6ccbc8d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
@@ -85,6 +85,24 @@
         }
     }
 
+    /**
+     * Put LSM metadata state into the index's current memory component.
+     *
+     * @param index,
+     *            the LSM index.
+     * @param key,
+     *            the key for the metadata state.
+     * @param pointable,
+     *            the value for the metadata state.
+     * @throws HyracksDataException
+     */
+    public static void put(ILSMIndex index, IValueReference key, IPointable pointable) throws HyracksDataException {
+        // write the opTracker to ensure the component layout don't change
+        synchronized (index.getOperationTracker()) {
+            index.getCurrentMemoryComponent().getMetadata().put(key, pointable);
+        }
+    }
+
     private static void fromDiskComponents(ILSMIndex index, IValueReference key, IPointable pointable)
             throws HyracksDataException {
         for (ILSMDiskComponent c : index.getImmutableComponents()) {