merge from asterix_lsm_stabilization rev r1457 through r1462

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization_managix@1463 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
new file mode 100644
index 0000000..718ea3f
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.opcallbacks;
+
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.service.locking.ILockManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+
+/**
+ * Assumes LSM-BTrees as primary indexes. Implements try/locking and unlocking on primary keys.
+ */
+public class PrimaryIndexInstantSearchOperationCallback extends AbstractOperationCallback implements
+        ISearchOperationCallback {
+
+    public PrimaryIndexInstantSearchOperationCallback(int datasetId, int[] entityIdFields, ILockManager lockManager,
+            TransactionContext txnCtx) {
+        super(datasetId, entityIdFields, txnCtx, lockManager);
+    }
+
+    @Override
+    public boolean proceed(ITupleReference tuple) throws HyracksDataException {
+        int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
+        try {
+            return lockManager.instantTryLock(datasetId, pkHash, LockMode.S, txnCtx);
+        } catch (ACIDException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void reconcile(ITupleReference tuple) throws HyracksDataException {
+        int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
+        try {
+            lockManager.instantLock(datasetId, pkHash, LockMode.S, txnCtx);
+        } catch (ACIDException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void cancel(ITupleReference tuple) throws HyracksDataException {
+        //no op
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
index 1a3ac03..8abc5b1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
@@ -11,7 +11,7 @@
     private int firstWaitingResource; //resource(entity or dataset) which this job is waiting for
     private int upgradingResource; //resource(entity or dataset) which this job is waiting for to upgrade
 
-    //private PrimitiveIntHashMap dLockHT; //used for keeping dataset-granule-lock's count acquired by this job. 
+    private PrimitiveIntHashMap dLockHT; //used for keeping dataset-granule-lock's count acquired by this job. 
 
     public JobInfo(EntityInfoManager entityInfoManager, LockWaiterManager lockWaiterManager, TransactionContext txnCtx) {
         this.entityInfoManager = entityInfoManager;
@@ -20,7 +20,11 @@
         this.lastHoldingResource = -1;
         this.firstWaitingResource = -1;
         this.upgradingResource = -1;
-        //this.dLockHT = new PrimitiveIntHashMap(1<<6, 1<<3, 180000);
+        if (LockManager.ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+            //This table maintains the number of locks acquired by this jobInfo.
+            //[Notice] But this doesn't decrease the count even if the lock is released.
+            this.dLockHT = new PrimitiveIntHashMap(1 << 4, 1 << 2, Integer.MAX_VALUE);
+        }
     }
 
     public void addHoldingResource(int resource) {
@@ -40,7 +44,9 @@
         entityInfoManager.setNextJobResource(resource, -1);
         lastHoldingResource = resource;
 
-        //increaseDatasetLockCount(resource);
+        if (LockManager.ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+            increaseDatasetLockCount(resource);
+        }
     }
 
     public void removeHoldingResource(int resource) {
@@ -181,16 +187,26 @@
         //        }
     }
 
+    private void increaseDatasetLockCount(int entityInfo) {
+        int datasetId = entityInfoManager.getDatasetId(entityInfo);
+        int count = dLockHT.get(datasetId);
+        if (count == -1) {
+            dLockHT.upsert(datasetId, 1);
+        } else {
+            dLockHT.upsert(datasetId, count + 1);
+        }
+    }
+
+    public int getDatasetLockCount(int datasetId) {
+        int count = dLockHT.get(datasetId);
+        if (count == -1) {
+            return 0;
+        } else {
+            return count;
+        }
+    }
+
     /**********************************************************************************
-     * public void increaseDatasetLockCount(int entityInfo) {
-     * int datasetId = entityInfoManager.getDatasetId(entityInfo);
-     * int count = dLockHT.get(datasetId);
-     * if (count == -1) {
-     * dLockHT.upsert(datasetId, 1);
-     * } else {
-     * dLockHT.upsert(datasetId, count+1);
-     * }
-     * }
      * public void decreaseDatasetLockCount(int entityInfo) {
      * int datasetId = entityInfoManager.getDatasetId(entityInfo);
      * int count = dLockHT.get(datasetId);
@@ -244,7 +260,7 @@
         }
     }
 
-    public String printHoldingResource () {
+    public String printHoldingResource() {
         StringBuilder s = new StringBuilder();
         int entityInfo = lastHoldingResource;
 
@@ -262,7 +278,7 @@
         }
         return s.toString();
     }
-    
+
     /////////////////////////////////////////////////////////
     //  set/get method for private variable
     /////////////////////////////////////////////////////////
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index fb5bb68..5687656 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -45,6 +45,12 @@
 
     public static final boolean IS_DEBUG_MODE = false;//true
 
+    public static final boolean ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET = true;
+    public static final int UPGRADE_TRHESHOLD_ENTITY_TO_DATASET = 10000;
+    private static final int DO_UPGRADE = 0;
+    private static final int UPGRADED = 1;
+    private static final int DONOT_UPGRADE = 2;
+
     private TransactionSubsystem txnSubsystem;
 
     //all threads accessing to LockManager's tables such as jobHT and datasetResourceHT
@@ -123,6 +129,24 @@
         dLockInfo = datasetResourceHT.get(datasetId);
         jobInfo = jobHT.get(jobId);
 
+        if (ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+            if (jobInfo != null && dLockInfo != null && entityHashValue != -1) {
+                int upgradeStatus = needUpgradeFromEntityToDataset(jobInfo, dId, lockMode);
+                switch (upgradeStatus) {
+                    case DO_UPGRADE:
+                        entityHashValue = -1;
+                        break;
+                        
+                    case UPGRADED:
+                        unlatchLockTable();
+                        return;
+                        
+                    default:
+                        break;
+                }
+            }
+        }
+
         //#. if the datasetLockInfo doesn't exist in datasetResourceHT 
         if (dLockInfo == null || dLockInfo.isNoHolder()) {
             if (dLockInfo == null) {
@@ -179,6 +203,22 @@
         return;
     }
 
+    private int needUpgradeFromEntityToDataset(JobInfo jobInfo, int datasetId, byte lockMode) {
+        //we currently allow upgrade only if the lockMode is S. 
+        if (lockMode != LockMode.S) {
+            return DONOT_UPGRADE;
+        }
+
+        int count = jobInfo.getDatasetLockCount(datasetId);
+        if (count == UPGRADE_TRHESHOLD_ENTITY_TO_DATASET) {
+            return DO_UPGRADE;
+        } else if (count > UPGRADE_TRHESHOLD_ENTITY_TO_DATASET){
+            return UPGRADED;
+        } else {
+            return DONOT_UPGRADE;
+        }
+    }
+
     private void validateJob(TransactionContext txnContext) throws ACIDException {
         if (txnContext.getTxnState() == TransactionState.ABORTED) {
             unlatchLockTable();
@@ -888,6 +928,24 @@
         dLockInfo = datasetResourceHT.get(datasetId);
         jobInfo = jobHT.get(jobId);
 
+        if (ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+            if (jobInfo != null && dLockInfo != null && entityHashValue != -1) {
+                int upgradeStatus = needUpgradeFromEntityToDataset(jobInfo, dId, lockMode);
+                switch (upgradeStatus) {
+                    case DO_UPGRADE:
+                        entityHashValue = -1;
+                        break;
+                        
+                    case UPGRADED:
+                        unlatchLockTable();
+                        return true;
+                        
+                    default:
+                        break;
+                }
+            }
+        }
+
         //#. if the datasetLockInfo doesn't exist in datasetResourceHT 
         if (dLockInfo == null || dLockInfo.isNoHolder()) {
             if (dLockInfo == null) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java
index be9c080..a65f385 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java
@@ -530,7 +530,7 @@
         for (i = 1; i < NUM_OF_SLOTS; i++) {
             if (cArray[bucketNum][i*2] == key) {
                 if (isUpsert) {
-                    cArray[bucketNum][emptySlot*2+1] = value;
+                    cArray[bucketNum][i*2+1] = value;
                 }
                 return 0;
             }