[ASTERIXDB-2130][TX] Avoid Evicting Datasets Pending Txn Completion

- user model changes: no
- storage format changes: no
- interface changes: yes
  Added ITransactionOperationTracker for txn specific op tracker.
  Added complete to ITransactionManager to release all resources
  held by the txn upon its completion.

Details:
Currently, a dataset could be evicted/dropped while a transaction
waiting for its completion (commit/rollback). This change prevents
that by incrementing the reference counter of the datasets of all
indexes registered in the transaction.

Change-Id: I82b614a938f24f3199fd327502ed422ce9d3b9b7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2122
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index e74600e..2068192 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -20,12 +20,11 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 
-public class BaseOperationTracker implements ILSMOperationTracker {
+public class BaseOperationTracker implements ITransactionOperationTracker {
 
     protected final int datasetID;
     protected final DatasetInfo dsInfo;
@@ -62,4 +61,14 @@
 
     public void exclusiveJobCommitted() throws HyracksDataException {
     }
+
+    @Override
+    public void beforeTransaction() {
+        dsInfo.touch();
+    }
+
+    @Override
+    public void afterTransaction() {
+        dsInfo.untouch();
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/ITransactionOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/ITransactionOperationTracker.java
new file mode 100644
index 0000000..e6aeec5
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/ITransactionOperationTracker.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+
+public interface ITransactionOperationTracker extends ILSMOperationTracker {
+
+    /**
+     * Called before a transaction performs any operations on
+     * {@link org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex}
+     */
+    void beforeTransaction();
+
+    /**
+     * Called after a transaction completes its operations on
+     * {@link org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex}
+     */
+    void afterTransaction();
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index 20ede18..f9d924f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -56,4 +56,10 @@
     public void incrementNumActiveOperations();
 
     public void decrementNumActiveOperations();
+
+    /**
+     * Called when no further operations will be performed by the transaction
+     * so that any resources held by the transaction may be released
+     */
+    void complete();
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
index eb37f22..4846b6e 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -18,22 +18,20 @@
  */
 package org.apache.asterix.transaction.management.service.transaction;
 
-import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.asterix.common.context.ITransactionOperationTracker;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.transactions.LogRecord;
-import org.apache.asterix.common.transactions.MutableLong;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -45,7 +43,7 @@
  * concurrently. Please see each variable declaration to know which one is accessed concurrently and
  * which one is not.
  */
-public class TransactionContext implements ITransactionContext, Serializable {
+public class TransactionContext implements ITransactionContext {
 
     private static final long serialVersionUID = -6105616785783310111L;
 
@@ -76,7 +74,7 @@
 
     // indexMap is concurrently accessed by multiple threads,
     // so those threads are synchronized on indexMap object itself
-    private final Map<MutableLong, AbstractLSMIOOperationCallback> indexMap;
+    private final Map<Long, ITransactionOperationTracker> indexMap;
 
     // TODO: fix ComponentLSNs' issues.
     // primaryIndex, primaryIndexCallback, and primaryIndexOptracker will be
@@ -89,7 +87,6 @@
     // The following three variables are used as temporary variables in order to
     // avoid object creations.
     // Those are used in synchronized methods.
-    private final MutableLong tempResourceIdForRegister;
     private final LogRecord logRecord;
 
     private final AtomicInteger transactorNumActiveOperations;
@@ -108,7 +105,6 @@
         isMetadataTxn = false;
         indexMap = new HashMap<>();
         primaryIndex = null;
-        tempResourceIdForRegister = new MutableLong();
         logRecord = new LogRecord();
         transactorNumActiveOperations = new AtomicInteger(0);
     }
@@ -122,10 +118,11 @@
                 primaryIndexCallback = callback;
                 primaryIndexOpTracker = (PrimaryIndexOperationTracker) index.getOperationTracker();
             }
-            tempResourceIdForRegister.set(resourceId);
-            if (!indexMap.containsKey(tempResourceIdForRegister)) {
-                indexMap.put(new MutableLong(resourceId),
-                        ((AbstractLSMIOOperationCallback) index.getIOOperationCallback()));
+            if (!indexMap.containsKey(resourceId)) {
+                final ITransactionOperationTracker txnOpTracker =
+                        (ITransactionOperationTracker) index.getOperationTracker();
+                indexMap.put(resourceId, txnOpTracker);
+                txnOpTracker.beforeTransaction();
             }
         }
     }
@@ -243,12 +240,6 @@
         return logRecord;
     }
 
-    public void cleanupForAbort() {
-        if (primaryIndexOpTracker != null) {
-            primaryIndexOpTracker.cleanupNumActiveOperationsForAbortedJob(transactorNumActiveOperations.get());
-        }
-    }
-
     @Override
     public void incrementNumActiveOperations() {
         transactorNumActiveOperations.incrementAndGet();
@@ -258,4 +249,23 @@
     public void decrementNumActiveOperations() {
         transactorNumActiveOperations.decrementAndGet();
     }
-}
+
+    @Override
+    public void complete() {
+        try {
+            if (txnState.get() == ITransactionManager.ABORTED) {
+                cleanupForAbort();
+            }
+        } finally {
+            synchronized (indexMap) {
+                indexMap.values().forEach(ITransactionOperationTracker::afterTransaction);
+            }
+        }
+    }
+
+    private void cleanupForAbort() {
+        if (primaryIndexOpTracker != null) {
+            primaryIndexOpTracker.cleanupNumActiveOperationsForAbortedJob(transactorNumActiveOperations.get());
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index 3e79e1d..c9a1bad 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -72,7 +72,7 @@
             ae.printStackTrace();
             throw new ACIDException(msg, ae);
         } finally {
-            ((TransactionContext) txnCtx).cleanupForAbort();
+            txnCtx.complete();
             txnSubsystem.getLockManager().releaseLocks(txnCtx);
             transactionContextRepository.remove(txnCtx.getJobId());
         }
@@ -119,6 +119,7 @@
             }
             throw ae;
         } finally {
+            txnCtx.complete();
             txnSubsystem.getLockManager().releaseLocks(txnCtx);
             transactionContextRepository.remove(txnCtx.getJobId());
             txnCtx.setTxnState(ITransactionManager.COMMITTED);