Fix for metadata indexes first LSN

Change-Id: I6ce08ee38e49e0f0f0c2acd27b64415d771bda67
Reviewed-on: https://asterix-gerrit.ics.uci.edu/293
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMIndexUtil.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMIndexUtil.java
new file mode 100644
index 0000000..342df7f
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMIndexUtil.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.dataflow;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import edu.uci.ics.asterix.common.transactions.ILogManager;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+
+public class AsterixLSMIndexUtil {
+
+    public static void checkAndSetFirstLSN(AbstractLSMIndex lsmIndex, ILogManager logManager)
+            throws HyracksDataException, AsterixException {
+
+        // If the index has an empty memory component, we need to set its first LSN (For soft checkpoint)
+        if (lsmIndex.isCurrentMutableComponentEmpty()) {
+            //prevent transactions from incorrectly setting the first LSN on a modified component by checking the index is still empty
+            synchronized (lsmIndex.getOperationTracker()) {
+                if (lsmIndex.isCurrentMutableComponentEmpty()) {
+                    AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
+                            .getIOOperationCallback();
+                    ioOpCallback.setFirstLSN(logManager.getAppendLSN());
+                }
+            }
+        }
+    }
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 403d0c9..cc6faed 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -17,8 +17,6 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
-import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
-import edu.uci.ics.asterix.common.transactions.ILogManager;
 import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -63,20 +61,12 @@
                 tupleFilter = tupleFilterFactory.createTupleFilter(indexHelper.getTaskContext());
                 frameTuple = new FrameTupleReference();
             }
-            // If the index has an empty memory component, we need to set its first LSN (For soft checkpoint)
-            if (lsmIndex.isCurrentMutableComponentEmpty()) {
-                //prevent transactions from incorrectly setting the first LSN on a modified component
-                synchronized (lsmIndex.getOperationTracker()) {
-                    if (lsmIndex.isCurrentMutableComponentEmpty()) {
-                        AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
-                                .getIOOperationCallback();
-                        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                                .getApplicationContext().getApplicationObject();
-                        ILogManager logManager = runtimeCtx.getTransactionSubsystem().getLogManager();
-                        ioOpCallback.setFirstLSN(logManager.getAppendLSN());
-                    }
-                }
-            }
+
+            IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                    .getApplicationContext().getApplicationObject();
+
+            AsterixLSMIndexUtil.checkAndSetFirstLSN(lsmIndex, runtimeCtx.getTransactionSubsystem().getLogManager());
+
         } catch (Exception e) {
             indexHelper.close();
             throw new HyracksDataException(e);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 0ff5455..fc6c462 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
+import edu.uci.ics.asterix.common.dataflow.AsterixLSMIndexUtil;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
@@ -99,6 +100,7 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 
 public class MetadataNode implements IMetadataNode {
     private static final long serialVersionUID = 1L;
@@ -301,8 +303,12 @@
             txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
                     metadataIndex.isPrimaryIndex());
 
+            AsterixLSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, transactionSubsystem.getLogManager());
+
             // TODO: fix exceptions once new BTree exception model is in hyracks.
             indexAccessor.forceInsert(tuple);
+        } catch (Exception e) {
+            throw e;
         } finally {
             indexLifecycleManager.close(resourceID);
         }
@@ -378,7 +384,7 @@
                     dropFeed(jobId, dataverseName, feed.getFeedName());
                 }
             }
-            
+
             List<FeedPolicy> feedPolicies = getDataversePolicies(jobId, dataverseName);
             if (feedPolicies != null && feedPolicies.size() > 0) {
                 // Drop all feed ingestion policies in this dataverse.
@@ -631,19 +637,27 @@
             throws Exception {
         long resourceID = metadataIndex.getResourceID();
         ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceID);
-        indexLifecycleManager.open(resourceID);
-        // prepare a Callback for logging
-        IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID, metadataIndex,
-                lsmIndex, IndexOperation.DELETE);
-        ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
+        try {
+            indexLifecycleManager.open(resourceID);
+            // prepare a Callback for logging
+            IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID,
+                    metadataIndex, lsmIndex, IndexOperation.DELETE);
+            ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
 
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
-        txnCtx.setWriteTxn(true);
-        txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
-                metadataIndex.isPrimaryIndex());
+            ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId,
+                    false);
+            txnCtx.setWriteTxn(true);
+            txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
+                    metadataIndex.isPrimaryIndex());
 
-        indexAccessor.forceDelete(tuple);
-        indexLifecycleManager.close(resourceID);
+            AsterixLSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, transactionSubsystem.getLogManager());
+
+            indexAccessor.forceDelete(tuple);
+        } catch (Exception e) {
+            throw e;
+        } finally {
+            indexLifecycleManager.close(resourceID);
+        }
     }
 
     @Override
@@ -1331,7 +1345,6 @@
         return DatasetIdFactory.getMostRecentDatasetId();
     }
 
- 
     @Override
     public void addFeedPolicy(JobId jobId, FeedPolicy feedPolicy) throws MetadataException, RemoteException {
         try {
@@ -1368,8 +1381,6 @@
         }
     }
 
- 
-
     @Override
     public void addFeed(JobId jobId, Feed feed) throws MetadataException, RemoteException {
         try {
@@ -1422,7 +1433,6 @@
 
     }
 
-  
     @Override
     public void dropFeedPolicy(JobId jobId, String dataverseName, String policyName) throws MetadataException,
             RemoteException {
@@ -1451,7 +1461,7 @@
             throw new MetadataException(e);
         }
     }
-    
+
     @Override
     public void addExternalFile(JobId jobId, ExternalFile externalFile) throws MetadataException, RemoteException {
         try {