Fix for metadata indexes first LSN
Change-Id: I6ce08ee38e49e0f0f0c2acd27b64415d771bda67
Reviewed-on: https://asterix-gerrit.ics.uci.edu/293
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMIndexUtil.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMIndexUtil.java
new file mode 100644
index 0000000..342df7f
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMIndexUtil.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.dataflow;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import edu.uci.ics.asterix.common.transactions.ILogManager;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+
+public class AsterixLSMIndexUtil {
+
+ public static void checkAndSetFirstLSN(AbstractLSMIndex lsmIndex, ILogManager logManager)
+ throws HyracksDataException, AsterixException {
+
+ // If the index has an empty memory component, we need to set its first LSN (For soft checkpoint)
+ if (lsmIndex.isCurrentMutableComponentEmpty()) {
+ //prevent transactions from incorrectly setting the first LSN on a modified component by checking the index is still empty
+ synchronized (lsmIndex.getOperationTracker()) {
+ if (lsmIndex.isCurrentMutableComponentEmpty()) {
+ AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
+ .getIOOperationCallback();
+ ioOpCallback.setFirstLSN(logManager.getAppendLSN());
+ }
+ }
+ }
+ }
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 403d0c9..cc6faed 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -17,8 +17,6 @@
import java.nio.ByteBuffer;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
-import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
-import edu.uci.ics.asterix.common.transactions.ILogManager;
import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -63,20 +61,12 @@
tupleFilter = tupleFilterFactory.createTupleFilter(indexHelper.getTaskContext());
frameTuple = new FrameTupleReference();
}
- // If the index has an empty memory component, we need to set its first LSN (For soft checkpoint)
- if (lsmIndex.isCurrentMutableComponentEmpty()) {
- //prevent transactions from incorrectly setting the first LSN on a modified component
- synchronized (lsmIndex.getOperationTracker()) {
- if (lsmIndex.isCurrentMutableComponentEmpty()) {
- AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
- .getIOOperationCallback();
- IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
- ILogManager logManager = runtimeCtx.getTransactionSubsystem().getLogManager();
- ioOpCallback.setFirstLSN(logManager.getAppendLSN());
- }
- }
- }
+
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
+
+ AsterixLSMIndexUtil.checkAndSetFirstLSN(lsmIndex, runtimeCtx.getTransactionSubsystem().getLogManager());
+
} catch (Exception e) {
indexHelper.close();
throw new HyracksDataException(e);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 0ff5455..fc6c462 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -22,6 +22,7 @@
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
+import edu.uci.ics.asterix.common.dataflow.AsterixLSMIndexUtil;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
@@ -99,6 +100,7 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
public class MetadataNode implements IMetadataNode {
private static final long serialVersionUID = 1L;
@@ -301,8 +303,12 @@
txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
metadataIndex.isPrimaryIndex());
+ AsterixLSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, transactionSubsystem.getLogManager());
+
// TODO: fix exceptions once new BTree exception model is in hyracks.
indexAccessor.forceInsert(tuple);
+ } catch (Exception e) {
+ throw e;
} finally {
indexLifecycleManager.close(resourceID);
}
@@ -378,7 +384,7 @@
dropFeed(jobId, dataverseName, feed.getFeedName());
}
}
-
+
List<FeedPolicy> feedPolicies = getDataversePolicies(jobId, dataverseName);
if (feedPolicies != null && feedPolicies.size() > 0) {
// Drop all feed ingestion policies in this dataverse.
@@ -631,19 +637,27 @@
throws Exception {
long resourceID = metadataIndex.getResourceID();
ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceID);
- indexLifecycleManager.open(resourceID);
- // prepare a Callback for logging
- IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID, metadataIndex,
- lsmIndex, IndexOperation.DELETE);
- ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
+ try {
+ indexLifecycleManager.open(resourceID);
+ // prepare a Callback for logging
+ IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID,
+ metadataIndex, lsmIndex, IndexOperation.DELETE);
+ ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
- ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
- txnCtx.setWriteTxn(true);
- txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
- metadataIndex.isPrimaryIndex());
+ ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId,
+ false);
+ txnCtx.setWriteTxn(true);
+ txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
+ metadataIndex.isPrimaryIndex());
- indexAccessor.forceDelete(tuple);
- indexLifecycleManager.close(resourceID);
+ AsterixLSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, transactionSubsystem.getLogManager());
+
+ indexAccessor.forceDelete(tuple);
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ indexLifecycleManager.close(resourceID);
+ }
}
@Override
@@ -1331,7 +1345,6 @@
return DatasetIdFactory.getMostRecentDatasetId();
}
-
@Override
public void addFeedPolicy(JobId jobId, FeedPolicy feedPolicy) throws MetadataException, RemoteException {
try {
@@ -1368,8 +1381,6 @@
}
}
-
-
@Override
public void addFeed(JobId jobId, Feed feed) throws MetadataException, RemoteException {
try {
@@ -1422,7 +1433,6 @@
}
-
@Override
public void dropFeedPolicy(JobId jobId, String dataverseName, String policyName) throws MetadataException,
RemoteException {
@@ -1451,7 +1461,7 @@
throw new MetadataException(e);
}
}
-
+
@Override
public void addExternalFile(JobId jobId, ExternalFile externalFile) throws MetadataException, RemoteException {
try {