merge from asterix_lsm_stabilization rev r1457 through r1462
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization_managix@1463 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
new file mode 100644
index 0000000..718ea3f
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.opcallbacks;
+
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.service.locking.ILockManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+
+/**
+ * Assumes LSM-BTrees as primary indexes. Implements try/locking and unlocking on primary keys.
+ */
+public class PrimaryIndexInstantSearchOperationCallback extends AbstractOperationCallback implements
+ ISearchOperationCallback {
+
+ public PrimaryIndexInstantSearchOperationCallback(int datasetId, int[] entityIdFields, ILockManager lockManager,
+ TransactionContext txnCtx) {
+ super(datasetId, entityIdFields, txnCtx, lockManager);
+ }
+
+ @Override
+ public boolean proceed(ITupleReference tuple) throws HyracksDataException {
+ int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
+ try {
+ return lockManager.instantTryLock(datasetId, pkHash, LockMode.S, txnCtx);
+ } catch (ACIDException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void reconcile(ITupleReference tuple) throws HyracksDataException {
+ int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
+ try {
+ lockManager.instantLock(datasetId, pkHash, LockMode.S, txnCtx);
+ } catch (ACIDException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void cancel(ITupleReference tuple) throws HyracksDataException {
+ //no op
+ }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
index 1a3ac03..8abc5b1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
@@ -11,7 +11,7 @@
private int firstWaitingResource; //resource(entity or dataset) which this job is waiting for
private int upgradingResource; //resource(entity or dataset) which this job is waiting for to upgrade
- //private PrimitiveIntHashMap dLockHT; //used for keeping dataset-granule-lock's count acquired by this job.
+ private PrimitiveIntHashMap dLockHT; //used for keeping dataset-granule-lock's count acquired by this job.
public JobInfo(EntityInfoManager entityInfoManager, LockWaiterManager lockWaiterManager, TransactionContext txnCtx) {
this.entityInfoManager = entityInfoManager;
@@ -20,7 +20,11 @@
this.lastHoldingResource = -1;
this.firstWaitingResource = -1;
this.upgradingResource = -1;
- //this.dLockHT = new PrimitiveIntHashMap(1<<6, 1<<3, 180000);
+ if (LockManager.ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+ //This table maintains the number of locks acquired by this jobInfo.
+ //[Notice] But this doesn't decrease the count even if the lock is released.
+ this.dLockHT = new PrimitiveIntHashMap(1 << 4, 1 << 2, Integer.MAX_VALUE);
+ }
}
public void addHoldingResource(int resource) {
@@ -40,7 +44,9 @@
entityInfoManager.setNextJobResource(resource, -1);
lastHoldingResource = resource;
- //increaseDatasetLockCount(resource);
+ if (LockManager.ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+ increaseDatasetLockCount(resource);
+ }
}
public void removeHoldingResource(int resource) {
@@ -181,16 +187,26 @@
// }
}
+ private void increaseDatasetLockCount(int entityInfo) {
+ int datasetId = entityInfoManager.getDatasetId(entityInfo);
+ int count = dLockHT.get(datasetId);
+ if (count == -1) {
+ dLockHT.upsert(datasetId, 1);
+ } else {
+ dLockHT.upsert(datasetId, count + 1);
+ }
+ }
+
+ public int getDatasetLockCount(int datasetId) {
+ int count = dLockHT.get(datasetId);
+ if (count == -1) {
+ return 0;
+ } else {
+ return count;
+ }
+ }
+
/**********************************************************************************
- * public void increaseDatasetLockCount(int entityInfo) {
- * int datasetId = entityInfoManager.getDatasetId(entityInfo);
- * int count = dLockHT.get(datasetId);
- * if (count == -1) {
- * dLockHT.upsert(datasetId, 1);
- * } else {
- * dLockHT.upsert(datasetId, count+1);
- * }
- * }
* public void decreaseDatasetLockCount(int entityInfo) {
* int datasetId = entityInfoManager.getDatasetId(entityInfo);
* int count = dLockHT.get(datasetId);
@@ -244,7 +260,7 @@
}
}
- public String printHoldingResource () {
+ public String printHoldingResource() {
StringBuilder s = new StringBuilder();
int entityInfo = lastHoldingResource;
@@ -262,7 +278,7 @@
}
return s.toString();
}
-
+
/////////////////////////////////////////////////////////
// set/get method for private variable
/////////////////////////////////////////////////////////
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index fb5bb68..5687656 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -45,6 +45,12 @@
public static final boolean IS_DEBUG_MODE = false;//true
+ public static final boolean ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET = true;
+ public static final int UPGRADE_TRHESHOLD_ENTITY_TO_DATASET = 10000;
+ private static final int DO_UPGRADE = 0;
+ private static final int UPGRADED = 1;
+ private static final int DONOT_UPGRADE = 2;
+
private TransactionSubsystem txnSubsystem;
//all threads accessing to LockManager's tables such as jobHT and datasetResourceHT
@@ -123,6 +129,24 @@
dLockInfo = datasetResourceHT.get(datasetId);
jobInfo = jobHT.get(jobId);
+ if (ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+ if (jobInfo != null && dLockInfo != null && entityHashValue != -1) {
+ int upgradeStatus = needUpgradeFromEntityToDataset(jobInfo, dId, lockMode);
+ switch (upgradeStatus) {
+ case DO_UPGRADE:
+ entityHashValue = -1;
+ break;
+
+ case UPGRADED:
+ unlatchLockTable();
+ return;
+
+ default:
+ break;
+ }
+ }
+ }
+
//#. if the datasetLockInfo doesn't exist in datasetResourceHT
if (dLockInfo == null || dLockInfo.isNoHolder()) {
if (dLockInfo == null) {
@@ -179,6 +203,22 @@
return;
}
+ private int needUpgradeFromEntityToDataset(JobInfo jobInfo, int datasetId, byte lockMode) {
+ //we currently allow upgrade only if the lockMode is S.
+ if (lockMode != LockMode.S) {
+ return DONOT_UPGRADE;
+ }
+
+ int count = jobInfo.getDatasetLockCount(datasetId);
+ if (count == UPGRADE_TRHESHOLD_ENTITY_TO_DATASET) {
+ return DO_UPGRADE;
+ } else if (count > UPGRADE_TRHESHOLD_ENTITY_TO_DATASET){
+ return UPGRADED;
+ } else {
+ return DONOT_UPGRADE;
+ }
+ }
+
private void validateJob(TransactionContext txnContext) throws ACIDException {
if (txnContext.getTxnState() == TransactionState.ABORTED) {
unlatchLockTable();
@@ -888,6 +928,24 @@
dLockInfo = datasetResourceHT.get(datasetId);
jobInfo = jobHT.get(jobId);
+ if (ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+ if (jobInfo != null && dLockInfo != null && entityHashValue != -1) {
+ int upgradeStatus = needUpgradeFromEntityToDataset(jobInfo, dId, lockMode);
+ switch (upgradeStatus) {
+ case DO_UPGRADE:
+ entityHashValue = -1;
+ break;
+
+ case UPGRADED:
+ unlatchLockTable();
+ return true;
+
+ default:
+ break;
+ }
+ }
+ }
+
//#. if the datasetLockInfo doesn't exist in datasetResourceHT
if (dLockInfo == null || dLockInfo.isNoHolder()) {
if (dLockInfo == null) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java
index be9c080..a65f385 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java
@@ -530,7 +530,7 @@
for (i = 1; i < NUM_OF_SLOTS; i++) {
if (cArray[bucketNum][i*2] == key) {
if (isUpsert) {
- cArray[bucketNum][emptySlot*2+1] = value;
+ cArray[bucketNum][i*2+1] = value;
}
return 0;
}