[ASTERIXDB-1985][CLUS] Add rebalance callback
Change-Id: I9a90ba975467c136371236195f82d48430d8319d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1863
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
index f1a123c..03958ed 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
@@ -121,7 +121,7 @@
hcc.getNodeControllerInfos());
// Flush the cached contents of the dataset to file system.
- FlushDatasetUtil.flushDataset(hcc, metadataProvider, dataverseName, datasetName, datasetName);
+ FlushDatasetUtil.flushDataset(hcc, metadataProvider, dataverseName, datasetName);
// Metadata transaction commits.
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
index 3bd1be5..e9d231a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
@@ -43,6 +43,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.rebalance.NoOpDatasetRebalanceCallback;
import org.apache.asterix.utils.RebalanceUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -242,7 +243,7 @@
IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
MetadataProvider metadataProvider = new MetadataProvider(appCtx, null, new StorageComponentProvider());
RebalanceUtil.rebalance(dataverseName, datasetName, new LinkedHashSet<>(Arrays.asList(targetNodes)),
- metadataProvider, hcc);
+ metadataProvider, hcc, NoOpDatasetRebalanceCallback.INSTANCE);
}
// Sends HTTP response to the request client.
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 2967a38..bd5c024 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -34,9 +34,9 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -55,12 +55,12 @@
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.config.ExternalProperties;
+import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.config.DatasetConfig.TransactionState;
-import org.apache.asterix.common.config.ExternalProperties;
-import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ACIDException;
@@ -159,16 +159,16 @@
import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
import org.apache.asterix.translator.AbstractLangTranslator;
-import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
-import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionOutput;
import org.apache.asterix.translator.TypeTranslator;
+import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
+import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import org.apache.asterix.translator.util.ValidateUtil;
import org.apache.asterix.utils.DataverseUtil;
import org.apache.asterix.utils.FeedOperations;
@@ -967,8 +967,7 @@
// #. flush the internal dataset for correlated policy
if (ds.isCorrelated() && ds.getDatasetType() == DatasetType.INTERNAL) {
- FlushDatasetUtil.flushDataset(hcc, metadataProvider, index.getDataverseName(), index.getDatasetName(),
- index.getDatasetName());
+ FlushDatasetUtil.flushDataset(hcc, metadataProvider, index.getDataverseName(), index.getDatasetName());
}
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -2821,7 +2820,7 @@
}
// Flushes source dataset.
- FlushDatasetUtil.flushDataset(hcc, metadataProvider, dataverseNameFrom, datasetNameFrom, datasetNameFrom);
+ FlushDatasetUtil.flushDataset(hcc, metadataProvider, dataverseNameFrom, datasetNameFrom);
}
// Executes external shell commands.
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
new file mode 100644
index 0000000..e8683c9
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.rebalance;
+
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * This interface is used for customizing the before/after operation for rebalance.
+ */
+public interface IDatasetRebalanceCallback {
+
+ /**
+ * The action to perform before the target dataset is populated.
+ *
+ * @param metadataProvider,
+ * the metadata provider.
+ * @param source,
+ * the source dataset.
+ * @param target,
+ * the target dataset.
+ * @param hcc,
+ * the hyracks client connection.
+ * @throws HyracksDataException
+ */
+ void beforeRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
+ IHyracksClientConnection hcc) throws HyracksDataException;
+
+ /**
+ * The action to perform after the target datasets is populated.
+ *
+ * @param metadataProvider,
+ * the metadata provider.
+ * @param source,
+ * the source dataset.
+ * @param target,
+ * the target dataset.
+ * @param hcc,
+ * the hyracks client connection.
+ * @throws HyracksDataException
+ */
+ void afterRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target, IHyracksClientConnection hcc)
+ throws HyracksDataException;
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
new file mode 100644
index 0000000..680adbf
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.rebalance;
+
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+
+// The callback performs no action before and after a rebalance.
+public class NoOpDatasetRebalanceCallback implements IDatasetRebalanceCallback {
+
+ public static final NoOpDatasetRebalanceCallback INSTANCE = new NoOpDatasetRebalanceCallback();
+
+ private NoOpDatasetRebalanceCallback() {
+
+ }
+
+ @Override
+ public void beforeRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
+ IHyracksClientConnection hcc) {
+ // Does nothing.
+ }
+
+ @Override
+ public void afterRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
+ IHyracksClientConnection hcc) {
+ // Does nothing.
+ }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
index 5445986..958444c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
@@ -44,7 +44,13 @@
}
public static void flushDataset(IHyracksClientConnection hcc, MetadataProvider metadataProvider,
- String dataverseName, String datasetName, String indexName) throws Exception {
+ String dataverseName, String datasetName) throws Exception {
+ Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+ flushDataset(hcc, metadataProvider, dataset);
+ }
+
+ public static void flushDataset(IHyracksClientConnection hcc, MetadataProvider metadataProvider, Dataset dataset)
+ throws Exception {
CompilerProperties compilerProperties = metadataProvider.getApplicationContext().getCompilerProperties();
int frameSize = compilerProperties.getFrameSize();
JobSpecification spec = new JobSpecification(frameSize);
@@ -54,14 +60,13 @@
new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs);
org.apache.asterix.common.transactions.JobId jobId = JobIdFactory.generateJobId();
- Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
- FlushDatasetOperatorDescriptor flushOperator =
- new FlushDatasetOperatorDescriptor(spec, jobId, dataset.getDatasetId());
+ FlushDatasetOperatorDescriptor flushOperator = new FlushDatasetOperatorDescriptor(spec, jobId,
+ dataset.getDatasetId());
spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset, indexName);
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+ .getSplitProviderAndConstraints(dataset, dataset.getDatasetName());
AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 3b17a94..1ed37e0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -42,6 +42,7 @@
import org.apache.asterix.metadata.lock.MetadataLockManager;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.metadata.utils.IndexUtil;
+import org.apache.asterix.rebalance.IDatasetRebalanceCallback;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -85,7 +86,8 @@
* @throws Exception
*/
public static void rebalance(String dataverseName, String datasetName, Set<String> targetNcNames,
- MetadataProvider metadataProvider, IHyracksClientConnection hcc) throws Exception {
+ MetadataProvider metadataProvider, IHyracksClientConnection hcc,
+ IDatasetRebalanceCallback datasetRebalanceCallback) throws Exception {
Dataset sourceDataset;
Dataset targetDataset;
// Executes the first Metadata transaction.
@@ -115,10 +117,11 @@
metadataProvider);
// The target dataset for rebalance.
- targetDataset = new Dataset(sourceDataset, true, nodeGroupName);
+ targetDataset = sourceDataset.getTargetDatasetForRebalance(nodeGroupName);
+
// Rebalances the source dataset into the target dataset.
- rebalance(sourceDataset, targetDataset, metadataProvider, hcc);
+ rebalance(sourceDataset, targetDataset, metadataProvider, hcc, datasetRebalanceCallback);
// Complete the metadata transaction.
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -192,10 +195,13 @@
// Rebalances from the source to the target.
private static void rebalance(Dataset source, Dataset target, MetadataProvider metadataProvider,
- IHyracksClientConnection hcc) throws Exception {
+ IHyracksClientConnection hcc, IDatasetRebalanceCallback datasetRebalanceCallback) throws Exception {
// Drops the target dataset files (if any) to make rebalance idempotent.
dropDatasetFiles(target, metadataProvider, hcc);
+ // Performs the specified operation before the target dataset is populated.
+ datasetRebalanceCallback.beforeRebalance(metadataProvider, source, target, hcc);
+
// Creates the rebalance target.
createRebalanceTarget(target, metadataProvider, hcc);
@@ -204,6 +210,9 @@
// Creates and loads indexes for the rebalance target.
createAndLoadSecondaryIndexesForTarget(source, target, metadataProvider, hcc);
+
+ // Performs the specified operation after the target dataset is populated.
+ datasetRebalanceCallback.afterRebalance(metadataProvider, source, target, hcc);
}
// Switches the metadata entity from the source dataset to the target dataset.
@@ -305,8 +314,7 @@
// Creates the commit operator for populating the target dataset.
private static IOperatorDescriptor createUpsertCommitOp(JobSpecification spec, MetadataProvider metadataProvider,
JobId jobId, Dataset target) throws AlgebricksException {
- int numKeys = target.getPrimaryKeys().size();
- int[] primaryKeyFields = IntStream.range(0, numKeys).toArray();
+ int[] primaryKeyFields = getPrimaryKeyPermutationForUpsert(target);
return new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
new IPushRuntimeFactory[] {
target.getCommitRuntimeFactory(metadataProvider, jobId, primaryKeyFields, true) },
@@ -351,4 +359,26 @@
JobUtils.runJob(hcc, indexLoadingJobSpec, true);
}
}
+
+ // Gets the primary key permutation for upserts.
+ private static int[] getPrimaryKeyPermutationForUpsert(Dataset dataset) {
+ // prev record first
+ int f = 1;
+ // add the previous meta second
+ if (dataset.hasMetaPart()) {
+ f++;
+ }
+ // add the previous filter third
+ int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
+ if (numFilterFields > 0) {
+ f++;
+ }
+ int numPrimaryKeys = dataset.getPrimaryKeys().size();
+ int[] pkIndexes = new int[numPrimaryKeys];
+ for (int i = 0; i < pkIndexes.length; i++) {
+ pkIndexes[i] = f;
+ f++;
+ }
+ return pkIndexes;
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 9131692..020ff6c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -40,8 +40,8 @@
import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import org.apache.asterix.common.metadata.IDataset;
-import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.utils.JobUtils;
import org.apache.asterix.common.utils.JobUtils.ProgressState;
import org.apache.asterix.external.feed.management.FeedConnectionId;
@@ -67,7 +67,6 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.utils.RecordUtil;
-import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
@@ -78,6 +77,7 @@
import org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
@@ -176,15 +176,6 @@
dataset.hints, dataset.datasetType, dataset.datasetId, dataset.pendingOp, dataset.rebalanceCount);
}
- public Dataset(Dataset dataset, boolean forRebalance, String targetNodeGroupName) {
- this(dataset.dataverseName, dataset.datasetName, dataset.recordTypeDataverseName, dataset.recordTypeName,
- dataset.metaTypeDataverseName, dataset.metaTypeName, targetNodeGroupName,
- dataset.compactionPolicyFactory,
- dataset.compactionPolicyProperties, dataset.datasetDetails, dataset.hints, dataset.datasetType,
- forRebalance ? DatasetIdFactory.generateAlternatingDatasetId(dataset.datasetId) : dataset.datasetId,
- dataset.pendingOp, forRebalance ? dataset.rebalanceCount + 1 : dataset.rebalanceCount);
- }
-
public Dataset(String dataverseName, String datasetName, String itemTypeDataverseName, String itemTypeName,
String metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName, String compactionPolicy,
Map<String, String> compactionPolicyProperties, IDatasetDetails datasetDetails, Map<String, String> hints,
@@ -801,6 +792,14 @@
return IntStream.range(0, numPrimaryKeys).toArray();
}
+ // Gets the target dataset for the purpose of rebalance.
+ public Dataset getTargetDatasetForRebalance(String targetNodeGroupName) {
+ return new Dataset(this.dataverseName, this.datasetName, this.recordTypeDataverseName, this.recordTypeName,
+ this.metaTypeDataverseName, this.metaTypeName, targetNodeGroupName, this.compactionPolicyFactory,
+ this.compactionPolicyProperties, this.datasetDetails, this.hints, this.datasetType,
+ DatasetIdFactory.generateAlternatingDatasetId(this.datasetId), this.pendingOp, this.rebalanceCount + 1);
+ }
+
// Gets an array of partition numbers for this dataset.
protected int[] getDatasetPartitions(MetadataProvider metadataProvider) throws AlgebricksException {
FileSplit[] splitsForDataset = metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
index b55e8ad..6ccbc8d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
@@ -85,6 +85,24 @@
}
}
+ /**
+ * Put LSM metadata state into the index's current memory component.
+ *
+ * @param index,
+ * the LSM index.
+ * @param key,
+ * the key for the metadata state.
+ * @param pointable,
+ * the value for the metadata state.
+ * @throws HyracksDataException
+ */
+ public static void put(ILSMIndex index, IValueReference key, IPointable pointable) throws HyracksDataException {
+ // write the opTracker to ensure the component layout don't change
+ synchronized (index.getOperationTracker()) {
+ index.getCurrentMemoryComponent().getMetadata().put(key, pointable);
+ }
+ }
+
private static void fromDiskComponents(ILSMIndex index, IValueReference key, IPointable pointable)
throws HyracksDataException {
for (ILSMDiskComponent c : index.getImmutableComponents()) {