changes to deal with entity level commit and setting the lastLSN to the modified LSMIndexes

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@819 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ILockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ILockManager.java
index 207ee58..1341cc1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ILockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ILockManager.java
@@ -69,6 +69,15 @@
     public void unlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext) throws ACIDException;
 
     /**
+     * 
+     * @param datasetId
+     * @param entityHashValue
+     * @param txnContext
+     * @throws ACIDException TODO
+     */
+    public void unlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext, boolean commitFlag) throws ACIDException;
+    
+    /**
      * Call to lock and unlock a specific resource in a specific lock mode
      * @param datasetId
      * @param entityHashValue
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 0691504..db603da 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -24,6 +24,9 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
 import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
 import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionManager.TransactionState;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
@@ -39,7 +42,7 @@
  * @author pouria, kisskys
  */
 
-public class LockManager implements ILockManager{
+public class LockManager implements ILockManager {
 
     private static final Logger LOGGER = Logger.getLogger(LockManager.class.getName());
     private static final int LOCK_MANAGER_INITIAL_HASH_TABLE_SIZE = 50;// do we need this?
@@ -67,11 +70,10 @@
 
     private LockRequestTracker lockRequestTracker; //for debugging
     private ConsecutiveWakeupContext consecutiveWakeupContext;
-    
-    //----------------------------------------------------------
-    // ITransactionManager Members
-    //----------------------------------------------------------    
-    private Map<JobId, TransactionContext> transactionContextRepository = new HashMap<JobId, TransactionContext>();
+
+    //TODO 
+    //This code should be taken care properly when there is a way to avoid doubling memory space for txnIds. 
+    private LogicalLogLocator logicalLogLocator;
 
     public LockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
         this.txnSubsystem = txnSubsystem;
@@ -88,6 +90,8 @@
         this.tempDatasetIdObj = new DatasetId(0);
         this.consecutiveWakeupContext = new ConsecutiveWakeupContext();
 
+        this.logicalLogLocator = LogUtil.getDummyLogicalLogLocator(txnSubsystem.getLogManager());
+
         if (IS_DEBUG_MODE) {
             this.lockRequestTracker = new LockRequestTracker();
         }
@@ -272,7 +276,7 @@
                 jobInfo = new JobInfo(entityInfoManager, lockWaiterManager, txnContext);
                 jobHT.put(jobId, jobInfo);
             }
-            
+
             //wait if any upgrader exists or upgrading lock mode is not compatible
             if (dLockInfo.getFirstUpgrader() != -1 || dLockInfo.getFirstWaiter() != -1
                     || !dLockInfo.isCompatible(datasetLockMode)) {
@@ -298,10 +302,9 @@
                         }
                         //add entityInfo to JobInfo's holding-resource list
                         jobInfo.addHoldingResource(entityInfo);
-                        
+
                         return entityInfo;
-                    }
-                    else {
+                    } else {
                         //considered as upgrader
                         waiterCount = handleLockWaiter(dLockInfo, -1, entityInfo, true, true, txnContext, jobInfo, -1);
                         if (waiterCount > 0) {
@@ -320,7 +323,7 @@
                     }
                 }
                 /////////////////////////////////////////////////////////////////////////////////////////////
-                
+
                 waiterCount = handleLockWaiter(dLockInfo, -1, entityInfo, false, true, txnContext, jobInfo, -1);
             } else {
                 waiterCount = 1;
@@ -367,11 +370,11 @@
             } else { //duplicated call
                 entityInfoManager.increaseDatasetLockCount(entityInfo);
                 datasetLockMode = entityInfoManager.getDatasetLockMode(entityInfo);
-                
+
                 if (entityHashValue == -1) { //dataset-granule
                     dLockInfo.increaseLockCount(datasetLockMode);
                 } else { //entity-granule
-                    datasetLockMode = datasetLockMode == LockMode.S? LockMode.IS: LockMode.IX;
+                    datasetLockMode = datasetLockMode == LockMode.S ? LockMode.IS : LockMode.IX;
                     dLockInfo.increaseLockCount(datasetLockMode);
                 }
             }
@@ -441,8 +444,8 @@
                     //wait if any upgrader exists or upgrading lock mode is not compatible
                     if (entityLockInfoManager.getUpgrader(eLockInfo) != -1
                             || !entityLockInfoManager.isUpgradeCompatible(eLockInfo, lockMode, entityInfo)) {
-                        waiterCount = handleLockWaiter(dLockInfo, eLockInfo, entityInfo, true, false, txnContext, jobInfo,
-                                -1);
+                        waiterCount = handleLockWaiter(dLockInfo, eLockInfo, entityInfo, true, false, txnContext,
+                                jobInfo, -1);
                     } else {
                         waiterCount = 1;
                     }
@@ -466,7 +469,8 @@
                 if (entityLockInfoManager.getUpgrader(eLockInfo) != -1
                         || entityLockInfoManager.getFirstWaiter(eLockInfo) != -1
                         || !entityLockInfoManager.isCompatible(eLockInfo, lockMode)) {
-                    waiterCount = handleLockWaiter(dLockInfo, eLockInfo, entityInfo, false, false, txnContext, jobInfo, -1);
+                    waiterCount = handleLockWaiter(dLockInfo, eLockInfo, entityInfo, false, false, txnContext, jobInfo,
+                            -1);
                 } else {
                     waiterCount = 1;
                 }
@@ -488,6 +492,17 @@
 
     @Override
     public void unlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext) throws ACIDException {
+        internalUnlock(datasetId, entityHashValue, txnContext, false);
+    }
+
+    @Override
+    public void unlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext, boolean commitFlag)
+            throws ACIDException {
+        internalUnlock(datasetId, entityHashValue, txnContext, commitFlag);
+    }
+
+    private void internalUnlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext,
+            boolean commitFlag) throws ACIDException {
         JobId jobId = txnContext.getJobId();
         int eLockInfo = -1;
         DatasetLockInfo dLockInfo = null;
@@ -547,6 +562,18 @@
             int waitingEntityInfo;
             LockWaiter waiterObj;
 
+            //TODO
+            //This code should be taken care properly when there is a way to avoid doubling memory space for txnIds.
+            //This commit log is written here in order to avoid increasing the memory space for managing transactionIds
+            if (commitFlag) {
+                if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) {
+                    txnSubsystem.getLogManager().log(LogType.COMMIT, txnContext, datasetId.getId(), entityHashValue,
+                            -1, (byte) 0, 0, null, null, logicalLogLocator);
+                }
+
+                txnContext.setLastLSNToIndexes(logicalLogLocator.getLsn());
+            }
+
             //1) wake up waiters and remove holder
             //wake up waiters of dataset-granule lock
             wakeUpDatasetLockWaiters(dLockInfo);
@@ -625,7 +652,7 @@
         JobInfo jobInfo = jobHT.get(jobId);
         if (jobInfo == null) {
             unlatchLockTable();
-            return ;
+            return;
         }
 
         //remove waiterObj of JobInfo 
@@ -649,12 +676,12 @@
                             + entityInfoManager.getJobId(entityInfo) + "'s lock request!!!");
                 }
             }
-            
+
             //1. remove from waiter(or upgrader)'s list of dLockInfo or eLockInfo.
             did = entityInfoManager.getDatasetId(entityInfo);
             tempDatasetIdObj.setId(did);
             dLockInfo = datasetResourceHT.get(tempDatasetIdObj);
-            
+
             if (waiterObj.isWaitingOnEntityLock()) {
                 entityHashValue = entityInfoManager.getPKHashVal(entityInfo);
                 eLockInfo = dLockInfo.getEntityResourceHT().get(entityHashValue);
@@ -670,7 +697,7 @@
                     dLockInfo.removeUpgrader(waiterObjId);
                 }
             }
-            
+
             //2. wake-up waiters
             latchWaitNotify();
             synchronized (waiterObj) {
@@ -682,7 +709,7 @@
                 }
                 waiterObj.notifyAll();
             }
-            
+
             //3. deallocate waiterObj
             lockWaiterManager.deallocate(waiterObjId);
 
@@ -711,10 +738,10 @@
                 datasetLockCount = entityInfoManager.getDatasetLockCount(entityInfo);
                 if (datasetLockCount != 0) {
                     dLockInfo.decreaseLockCount(lockMode, datasetLockCount);
-    
+
                     //wakeup waiters of datasetLock and remove holder from datasetLockInfo
                     wakeUpDatasetLockWaiters(dLockInfo);
-    
+
                     //remove the holder from datasetLockInfo only if the lock is dataset-granule lock.
                     //--> this also removes the holding resource from jobInfo               
                     //(Because the IX and IS lock's holders are handled implicitly, 
@@ -726,7 +753,7 @@
                 lockMode = entityInfoManager.getDatasetLockMode(entityInfo);
                 lockMode = lockMode == LockMode.S ? LockMode.IS : LockMode.IX;
                 datasetLockCount = entityInfoManager.getDatasetLockCount(entityInfo);
-                
+
                 if (datasetLockCount != 0) {
                     dLockInfo.decreaseLockCount(lockMode, datasetLockCount);
                 }
@@ -740,7 +767,7 @@
                         System.out.println("eLockInfo:" + eLockInfo);
                     }
                 }
-                
+
                 if (entityLockCount != 0) {
                     entityLockInfoManager.decreaseLockCount(eLockInfo, lockMode, (short) entityLockCount);
                 }
@@ -750,10 +777,10 @@
                     wakeUpDatasetLockWaiters(dLockInfo);
                 }
 
-                if (entityLockCount != 0) { 
+                if (entityLockCount != 0) {
                     //wakeup waiters of entityLock
                     wakeUpEntityLockWaiters(eLockInfo);
-    
+
                     //remove the holder from entityLockInfo 
                     //--> this also removes the holding resource from jobInfo
                     entityLockInfoManager.removeHolder(eLockInfo, entityInfo, jobInfo);
@@ -782,7 +809,7 @@
 
         //remove JobInfo
         jobHT.remove(jobId);
-        
+
         if (existWaiter) {
             txnContext.setStatus(TransactionContext.TIMED_OUT_STATUS);
             txnContext.setTxnState(TransactionState.ABORTED);
@@ -991,7 +1018,7 @@
         }
         return null;
     }
-    
+
     public String getRequestHistoryForAllJobs() {
         if (IS_DEBUG_MODE) {
             return lockRequestTracker.getRequestHistoryForAllJobs();
@@ -1012,7 +1039,7 @@
 
         //see tryLockDatasetGranule() function to know the revert operation
         switch (tryLockDatasetGranuleRevertOperation) {
-            
+
             case 1://[revertOperation1]: reverting 'adding a holder'
 
                 if (entityHashValue == -1) {
@@ -1047,7 +1074,7 @@
                 if (entityHashValue == -1) { //dataset-granule
                     dLockInfo.decreaseLockCount(datasetLockMode);
                 } else { //entity-granule
-                    datasetLockMode = datasetLockMode == LockMode.S? LockMode.IS: LockMode.IX;
+                    datasetLockMode = datasetLockMode == LockMode.S ? LockMode.IS : LockMode.IX;
                     dLockInfo.decreaseLockCount(datasetLockMode);
                 }
 
@@ -1099,11 +1126,11 @@
                 jobHT.put(jobId, jobInfo);
             }
             //////////////////////////////////////////////////////////////////////////////////////
-            
+
             //return fail if any upgrader exists or upgrading lock mode is not compatible
             if (dLockInfo.getFirstUpgrader() != -1 || dLockInfo.getFirstWaiter() != -1
                     || !dLockInfo.isCompatible(datasetLockMode)) {
-                
+
                 //[Notice]
                 //There has been no same caller as (jId, dId, entityHashValue) triplet.
                 //But there could be the same caller as (jId, dId) pair.
@@ -1123,19 +1150,19 @@
                         }
                         //add entityInfo to JobInfo's holding-resource list
                         jobInfo.addHoldingResource(entityInfo);
-                        
+
                         tryLockDatasetGranuleRevertOperation = 1;
-                        
+
                         return entityInfo;
                     }
                 }
-                
+
                 //revert [part of revertOperation1] before return
                 if (jobInfo.getLastHoldingResource() == -1 && jobInfo.getFirstWaitingResource() == -1) {
                     jobHT.remove(jobId);
                 }
                 entityInfoManager.deallocate(entityInfo);
-                
+
                 return -2;
             }
 
@@ -1190,14 +1217,14 @@
                 //[revertOperation3]
                 entityInfoManager.increaseDatasetLockCount(entityInfo);
                 datasetLockMode = entityInfoManager.getDatasetLockMode(entityInfo);
-                
+
                 if (entityHashValue == -1) { //dataset-granule
                     dLockInfo.increaseLockCount(datasetLockMode);
                 } else { //entity-granule
-                    datasetLockMode = datasetLockMode == LockMode.S? LockMode.IS: LockMode.IX;
+                    datasetLockMode = datasetLockMode == LockMode.S ? LockMode.IS : LockMode.IX;
                     dLockInfo.increaseLockCount(datasetLockMode);
                 }
-                
+
                 tryLockDatasetGranuleRevertOperation = 3;
                 //////////////////////////////////////////////////////////////////////////////////////
 
@@ -1305,7 +1332,8 @@
         int waiterCount = 0;
         boolean isInterruptedExceptionOccurred = false;
 
-        if (duplicatedWaiterObjId != -1 || isDeadlockFree(dLockInfo, eLockInfo, entityInfo, isDatasetLockInfo, isUpgrade)) {//deadlock free -> wait
+        if (duplicatedWaiterObjId != -1
+                || isDeadlockFree(dLockInfo, eLockInfo, entityInfo, isDatasetLockInfo, isUpgrade)) {//deadlock free -> wait
             if (duplicatedWaiterObjId == -1) {
                 waiterId = lockWaiterManager.allocate(); //initial value of waiterObj: wait = true, victim = false
                 waiter = lockWaiterManager.getLockWaiter(waiterId);
@@ -1362,7 +1390,7 @@
                     }
                 }
             }
-            
+
             if (isInterruptedExceptionOccurred) {
                 throw new ACIDException("InterruptedException is caught");
             }
@@ -1407,7 +1435,7 @@
                 }
 
                 //if (!isUpgrade && isDatasetLockInfo) {
-                    jobInfo.removeWaitingResource(waiterId);
+                jobInfo.removeWaitingResource(waiterId);
                 //}
                 lockWaiterManager.deallocate(waiterId);
             }
@@ -1429,7 +1457,8 @@
         return waiterCount;
     }
 
-    private boolean isDeadlockFree(DatasetLockInfo dLockInfo, int eLockInfo, int entityInfo, boolean isDatasetLockInfo, boolean isUpgrade) {
+    private boolean isDeadlockFree(DatasetLockInfo dLockInfo, int eLockInfo, int entityInfo, boolean isDatasetLockInfo,
+            boolean isUpgrade) {
         return deadlockDetector.isSafeToAdd(dLockInfo, eLockInfo, entityInfo, isDatasetLockInfo, isUpgrade);
     }
 
@@ -1439,7 +1468,6 @@
         throw new ACIDException("Transaction " + txnContext.getJobId()
                 + " should abort (requested by the Lock Manager)");
     }
-    
 
     /**
      * For now, upgrading lock granule from entity-granule to dataset-granule is not supported!!
@@ -1527,13 +1555,14 @@
         int entityInfo;
         LockWaiter waiterObj;
         byte entityLockMode;
-        
+
         consecutiveWakeupContext.reset();
         while (waiterObjId != -1) {
             //wake up upgraders
             waiterObj = lockWaiterManager.getLockWaiter(waiterObjId);
             entityInfo = waiterObj.getEntityInfoSlot();
-            if (entityLockInfoManager.isUpgradeCompatible(eLockInfo, LockMode.X, entityInfo) && consecutiveWakeupContext.isCompatible(LockMode.X)) {
+            if (entityLockInfoManager.isUpgradeCompatible(eLockInfo, LockMode.X, entityInfo)
+                    && consecutiveWakeupContext.isCompatible(LockMode.X)) {
                 consecutiveWakeupContext.setLockMode(LockMode.X);
                 latchWaitNotify();
                 synchronized (waiterObj) {
@@ -1551,7 +1580,7 @@
                 break;
             }
         }
-        
+
         if (areAllUpgradersAwaken) {
             //wake up waiters
             waiterObjId = entityLockInfoManager.getFirstWaiter(eLockInfo);
@@ -1559,7 +1588,8 @@
                 waiterObj = lockWaiterManager.getLockWaiter(waiterObjId);
                 entityInfo = waiterObj.getEntityInfoSlot();
                 entityLockMode = entityInfoManager.getEntityLockMode(entityInfo);
-                if (entityLockInfoManager.isCompatible(eLockInfo, entityLockMode) && consecutiveWakeupContext.isCompatible(entityLockMode)) {
+                if (entityLockInfoManager.isCompatible(eLockInfo, entityLockMode)
+                        && consecutiveWakeupContext.isCompatible(entityLockMode)) {
                     consecutiveWakeupContext.setLockMode(entityLockMode);
                     //compatible waiter is waken up
                     latchWaitNotify();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 52d766d..ebcd506 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -22,6 +22,7 @@
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
 import edu.uci.ics.asterix.transaction.management.opcallbacks.AbstractOperationCallback;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
 import edu.uci.ics.asterix.transaction.management.resource.ICloseable;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
@@ -77,12 +78,19 @@
         status = ACTIVE_STATUS;
     }
 
-    public void registerIndex(ILSMIndex index) {
-        indexes.add(index);
+    public void registerIndexAndCallback(ILSMIndex index, AbstractOperationCallback callback) {
+        synchronized (indexes) {
+            indexes.add(index);
+            callbacks.add(callback);
+        }
     }
 
-    public void registerCallback(AbstractOperationCallback callback) {
-        callbacks.add(callback);
+    public void setLastLSNToIndexes(long lastLSN) {
+        synchronized (indexes) {
+            for (ILSMIndex index : indexes) {
+                ((IndexOperationTracker) index.getOperationTracker()).setLastLSN(lastLSN);
+            }
+        }
     }
 
     public void setTransactionType(TransactionType transactionType) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 0af6303..f537e88 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -36,7 +36,8 @@
     }
 
     @Override
-    public void abortTransaction(TransactionContext txnContext, DatasetId datasetId, int PKHashVal) throws ACIDException {
+    public void abortTransaction(TransactionContext txnContext, DatasetId datasetId, int PKHashVal)
+            throws ACIDException {
         synchronized (txnContext) {
             if (txnContext.getTxnState().equals(TransactionState.ABORTED)) {
                 return;
@@ -82,14 +83,25 @@
     }
 
     @Override
-    public void commitTransaction(TransactionContext txnContext, DatasetId datasetId, int PKHashVal) throws ACIDException {
+    public void commitTransaction(TransactionContext txnContext, DatasetId datasetId, int PKHashVal)
+            throws ACIDException {
         synchronized (txnContext) {
             if ((txnContext.getTxnState().equals(TransactionState.COMMITTED))) {
                 return;
             }
 
+            //There is either job-level commit or entity-level commit.
+            //The job-level commit will have -1 value both for datasetId and PKHashVal.
+
+            //for entity-level commit
+            if (PKHashVal != -1) {
+                transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext);
+                return;
+            }
+
+            //for job-level commit
             try {
-                if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) { 
+                if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) {
                     transactionProvider.getLogManager().log(LogType.COMMIT, txnContext, -1, -1, -1, (byte) 0, 0, null,
                             null, txnContext.getLastLogLocator());
                 }
@@ -108,7 +120,8 @@
     }
 
     @Override
-    public void completedTransaction(TransactionContext txnContext, DatasetId datasetId, int PKHashVal, boolean success) throws ACIDException {
+    public void completedTransaction(TransactionContext txnContext, DatasetId datasetId, int PKHashVal, boolean success)
+            throws ACIDException {
         if (!success) {
             abortTransaction(txnContext, datasetId, PKHashVal);
         } else {