use lock counts to determine whether to decrease active transaction count on unlock
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
index 6e42e2d..79fbf3b 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
@@ -64,8 +64,10 @@
      * @param txnContext
      * @throws ACIDException
      *             TODO
+     * @return true if the lock count is 0, false otherwise.
      */
-    public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext) throws ACIDException;
+    public boolean unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext)
+            throws ACIDException;
 
     /**
      * @param datasetId
@@ -73,8 +75,9 @@
      * @param txnContext
      * @throws ACIDException
      *             TODO
+     * @return true if the lock count is 0, false otherwise.
      */
-    public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
+    public boolean unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
             throws ACIDException;
 
     /**
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 8fb7494..dc6f111 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -633,14 +633,15 @@
     }
 
     @Override
-    public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext) throws ACIDException {
-        internalUnlock(datasetId, entityHashValue, txnContext, false, false);
+    public boolean unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext)
+            throws ACIDException {
+        return internalUnlock(datasetId, entityHashValue, txnContext, false, false);
     }
 
     @Override
-    public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
+    public boolean unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
             throws ACIDException {
-        internalUnlock(datasetId, entityHashValue, txnContext, false, commitFlag);
+        return internalUnlock(datasetId, entityHashValue, txnContext, false, commitFlag);
     }
 
     private void instantUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext)
@@ -648,7 +649,7 @@
         internalUnlock(datasetId, entityHashValue, txnContext, true, false);
     }
 
-    private void internalUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext,
+    private boolean internalUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext,
             boolean isInstant, boolean commitFlag) throws ACIDException {
         JobId jobId = txnContext.getJobId();
         int eLockInfo = -1;
@@ -657,6 +658,7 @@
         int entityInfo = -1;
         byte datasetLockMode;
 
+        boolean lockCountIsZero = false;
         if (IS_DEBUG_MODE) {
             if (entityHashValue == -1) {
                 throw new UnsupportedOperationException(
@@ -704,6 +706,7 @@
 
             if (entityInfoManager.getEntityLockCount(entityInfo) == 0
                     && entityInfoManager.getDatasetLockCount(entityInfo) == 0) {
+                lockCountIsZero = true;
                 int threadCount = 0; //number of threads(in the same job) waiting on the same resource 
                 int waiterObjId = jobInfo.getFirstWaitingResource();
                 int waitingEntityInfo;
@@ -783,6 +786,7 @@
         } finally {
             unlatchLockTable();
         }
+        return lockCountIsZero;
     }
 
     @Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 4e8808f..85bdff8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -26,6 +26,7 @@
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.JobId;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * An implementation of the @see ITransactionManager interface that provides
@@ -105,15 +106,23 @@
 
             //for entity-level commit
             if (PKHashVal != -1) {
-                transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
-                /*****************************
-                try {
-                    //decrease the transaction reference count on index
-                    txnContext.decreaseActiveTransactionCountOnIndexes();
-                } catch (HyracksDataException e) {
-                    throw new ACIDException("failed to complete index operation", e);
+                boolean countIsZero = transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext,
+                        true);
+                if (!countIsZero) {
+                    // Lock count != 0 for a particular entity implies that the entity has been locked 
+                    // more than once (probably due to a hash collision in our current model).
+                    // It is safe to decrease the active transaction count on indexes since,  
+                    // by virtue of the counter not being zero, there is another transaction 
+                    // that has increased the transaction count. Thus, decreasing it will not 
+                    // allow the data to be flushed (yet). The flush will occur when the log page
+                    // flush thread decides to decrease the count for the last time.
+                    try {
+                        //decrease the transaction reference count on index
+                        txnContext.decreaseActiveTransactionCountOnIndexes();
+                    } catch (HyracksDataException e) {
+                        throw new ACIDException("failed to complete index operation", e);
+                    }
                 }
-                *****************************/
                 return;
             }
 
@@ -151,11 +160,11 @@
     public TransactionSubsystem getTransactionProvider() {
         return transactionProvider;
     }
-    
+
     public void setMaxJobId(int jobId) {
         maxJobId.set(Math.max(maxJobId.get(), jobId));
     }
-    
+
     public int getMaxJobId() {
         return maxJobId.get();
     }