[ASTERIXDB-2130][TX] Avoid Evicting Datasets Pending Txn Completion
- user model changes: no
- storage format changes: no
- interface changes: yes
Added ITransactionOperationTracker for txn specific op tracker.
Added complete to ITransactionManager to release all resources
held by the txn upon its completion.
Details:
Currently, a dataset could be evicted/dropped while a transaction
waiting for its completion (commit/rollback). This change prevents
that by incrementing the reference counter of the datasets of all
indexes registered in the transaction.
Change-Id: I82b614a938f24f3199fd327502ed422ce9d3b9b7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2122
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index e74600e..2068192 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -20,12 +20,11 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
-public class BaseOperationTracker implements ILSMOperationTracker {
+public class BaseOperationTracker implements ITransactionOperationTracker {
protected final int datasetID;
protected final DatasetInfo dsInfo;
@@ -62,4 +61,14 @@
public void exclusiveJobCommitted() throws HyracksDataException {
}
+
+ @Override
+ public void beforeTransaction() {
+ dsInfo.touch();
+ }
+
+ @Override
+ public void afterTransaction() {
+ dsInfo.untouch();
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/ITransactionOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/ITransactionOperationTracker.java
new file mode 100644
index 0000000..e6aeec5
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/ITransactionOperationTracker.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+
+public interface ITransactionOperationTracker extends ILSMOperationTracker {
+
+ /**
+ * Called before a transaction performs any operations on
+ * {@link org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex}
+ */
+ void beforeTransaction();
+
+ /**
+ * Called after a transaction completes its operations on
+ * {@link org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex}
+ */
+ void afterTransaction();
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index 20ede18..f9d924f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -56,4 +56,10 @@
public void incrementNumActiveOperations();
public void decrementNumActiveOperations();
+
+ /**
+ * Called when no further operations will be performed by the transaction
+ * so that any resources held by the transaction may be released
+ */
+ void complete();
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
index eb37f22..4846b6e 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -18,22 +18,20 @@
*/
package org.apache.asterix.transaction.management.service.transaction;
-import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.asterix.common.context.ITransactionOperationTracker;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.transactions.AbstractOperationCallback;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.common.transactions.LogRecord;
-import org.apache.asterix.common.transactions.MutableLong;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -45,7 +43,7 @@
* concurrently. Please see each variable declaration to know which one is accessed concurrently and
* which one is not.
*/
-public class TransactionContext implements ITransactionContext, Serializable {
+public class TransactionContext implements ITransactionContext {
private static final long serialVersionUID = -6105616785783310111L;
@@ -76,7 +74,7 @@
// indexMap is concurrently accessed by multiple threads,
// so those threads are synchronized on indexMap object itself
- private final Map<MutableLong, AbstractLSMIOOperationCallback> indexMap;
+ private final Map<Long, ITransactionOperationTracker> indexMap;
// TODO: fix ComponentLSNs' issues.
// primaryIndex, primaryIndexCallback, and primaryIndexOptracker will be
@@ -89,7 +87,6 @@
// The following three variables are used as temporary variables in order to
// avoid object creations.
// Those are used in synchronized methods.
- private final MutableLong tempResourceIdForRegister;
private final LogRecord logRecord;
private final AtomicInteger transactorNumActiveOperations;
@@ -108,7 +105,6 @@
isMetadataTxn = false;
indexMap = new HashMap<>();
primaryIndex = null;
- tempResourceIdForRegister = new MutableLong();
logRecord = new LogRecord();
transactorNumActiveOperations = new AtomicInteger(0);
}
@@ -122,10 +118,11 @@
primaryIndexCallback = callback;
primaryIndexOpTracker = (PrimaryIndexOperationTracker) index.getOperationTracker();
}
- tempResourceIdForRegister.set(resourceId);
- if (!indexMap.containsKey(tempResourceIdForRegister)) {
- indexMap.put(new MutableLong(resourceId),
- ((AbstractLSMIOOperationCallback) index.getIOOperationCallback()));
+ if (!indexMap.containsKey(resourceId)) {
+ final ITransactionOperationTracker txnOpTracker =
+ (ITransactionOperationTracker) index.getOperationTracker();
+ indexMap.put(resourceId, txnOpTracker);
+ txnOpTracker.beforeTransaction();
}
}
}
@@ -243,12 +240,6 @@
return logRecord;
}
- public void cleanupForAbort() {
- if (primaryIndexOpTracker != null) {
- primaryIndexOpTracker.cleanupNumActiveOperationsForAbortedJob(transactorNumActiveOperations.get());
- }
- }
-
@Override
public void incrementNumActiveOperations() {
transactorNumActiveOperations.incrementAndGet();
@@ -258,4 +249,23 @@
public void decrementNumActiveOperations() {
transactorNumActiveOperations.decrementAndGet();
}
-}
+
+ @Override
+ public void complete() {
+ try {
+ if (txnState.get() == ITransactionManager.ABORTED) {
+ cleanupForAbort();
+ }
+ } finally {
+ synchronized (indexMap) {
+ indexMap.values().forEach(ITransactionOperationTracker::afterTransaction);
+ }
+ }
+ }
+
+ private void cleanupForAbort() {
+ if (primaryIndexOpTracker != null) {
+ primaryIndexOpTracker.cleanupNumActiveOperationsForAbortedJob(transactorNumActiveOperations.get());
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index 3e79e1d..c9a1bad 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -72,7 +72,7 @@
ae.printStackTrace();
throw new ACIDException(msg, ae);
} finally {
- ((TransactionContext) txnCtx).cleanupForAbort();
+ txnCtx.complete();
txnSubsystem.getLockManager().releaseLocks(txnCtx);
transactionContextRepository.remove(txnCtx.getJobId());
}
@@ -119,6 +119,7 @@
}
throw ae;
} finally {
+ txnCtx.complete();
txnSubsystem.getLockManager().releaseLocks(txnCtx);
transactionContextRepository.remove(txnCtx.getJobId());
txnCtx.setTxnState(ITransactionManager.COMMITTED);