fixes for issue333(aborting transaction failure), 334(non-existing storage directory exception in a non-new-universe), 337(true instant lock), and a bug in creating multiple log files
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
index 092f99c..5b55e9a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
@@ -18,12 +18,14 @@
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
 import edu.uci.ics.asterix.transaction.management.service.locking.ILockManager;
 import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
 
 /**
  * Secondary-index modifications do not require any locking.
@@ -40,8 +42,8 @@
     protected final TransactionSubsystem txnSubsystem;
 
     public SecondaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
-            TransactionContext txnCtx, ILockManager lockManager,
-            TransactionSubsystem txnSubsystem, long resourceId, byte resourceType, IndexOperation indexOp) {
+            TransactionContext txnCtx, ILockManager lockManager, TransactionSubsystem txnSubsystem, long resourceId,
+            byte resourceType, IndexOperation indexOp) {
         super(datasetId, primaryKeyFields, txnCtx, lockManager);
         this.resourceId = resourceId;
         this.resourceType = resourceType;
@@ -60,8 +62,21 @@
         IndexLogger logger = txnSubsystem.getTreeLoggerRepository().getIndexLogger(resourceId, resourceType);
         int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
         try {
+            IndexOperation effectiveOldOp;
+            if (resourceType == ResourceType.LSM_BTREE) {
+                LSMBTreeTupleReference lsmBTreeTuple = (LSMBTreeTupleReference) before;
+                if (before == null) {
+                    effectiveOldOp = IndexOperation.NOOP;
+                } else if (lsmBTreeTuple != null && lsmBTreeTuple.isAntimatter()) {
+                    effectiveOldOp = IndexOperation.DELETE;
+                } else {
+                    effectiveOldOp = IndexOperation.INSERT;
+                }
+            } else {
+                effectiveOldOp = oldOp;
+            }
             logger.generateLogRecord(txnSubsystem, txnCtx, datasetId.getId(), pkHash, resourceId, indexOp, after,
-                    oldOp, before);
+                    effectiveOldOp, before);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 838dc6d..0a12ba1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -87,7 +87,8 @@
         //#. load all local resources. 
         File rootDirFile = new File(this.rootDir);
         if (!rootDirFile.exists()) {
-            throw new HyracksDataException(rootDirFile.getAbsolutePath() + "doesn't exist.");
+            //rootDir may not exist if this node is not the metadata node and doesn't have any user data.
+            return;
         }
 
         FilenameFilter filter = new FilenameFilter() {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index b1a28ca..b3c7f8b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -617,8 +617,9 @@
             throws ACIDException {
         internalUnlock(datasetId, entityHashValue, txnContext, false, commitFlag);
     }
-    
-    private void instantUnlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext) throws ACIDException {
+
+    private void instantUnlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext)
+            throws ACIDException {
         internalUnlock(datasetId, entityHashValue, txnContext, true, false);
     }
 
@@ -981,20 +982,189 @@
     @Override
     public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode, TransactionContext txnContext)
             throws ACIDException {
-        boolean isGranted = false;
-        //        try {
-        //            isGranted = internalTryLock(datasetId, entityHashValue, lockMode, txnContext);
-        //            return isGranted;
-        //        } finally {
-        //            if (isGranted) {
-        //                unlock(datasetId, entityHashValue, txnContext);
-        //            }
-        //        }
-        isGranted = internalTryLock(datasetId, entityHashValue, lockMode, txnContext, true);
-        if (isGranted) {
-            instantUnlock(datasetId, entityHashValue, txnContext);
+        return internalInstantTryLock(datasetId, entityHashValue, lockMode, txnContext);
+    }
+
+    private boolean internalInstantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
+            TransactionContext txnContext) throws ACIDException {
+        DatasetLockInfo dLockInfo = null;
+        boolean isSuccess = true;
+
+        latchLockTable();
+        validateJob(txnContext);
+
+        if (IS_DEBUG_MODE) {
+            trackLockRequest("Requested", RequestType.INSTANT_TRY_LOCK, datasetId, entityHashValue, lockMode,
+                    txnContext, dLockInfo, -1);
         }
-        return isGranted;
+
+        dLockInfo = datasetResourceHT.get(datasetId);
+
+        //#. if the datasetLockInfo doesn't exist in datasetResourceHT 
+        if (dLockInfo == null || dLockInfo.isNoHolder()) {
+            if (IS_DEBUG_MODE) {
+                trackLockRequest("Granted", RequestType.INSTANT_TRY_LOCK, datasetId, entityHashValue, lockMode,
+                        txnContext, dLockInfo, -1);
+            }
+
+            unlatchLockTable();
+            return true;
+        }
+
+        //#. the datasetLockInfo exists in datasetResourceHT.
+        //1. handle dataset-granule lock
+        //tryLockDatasetGranuleRevertOperation = 0;
+        isSuccess = instantTryLockDatasetGranule(datasetId, entityHashValue, lockMode, txnContext);
+        if (isSuccess && entityHashValue != -1) {
+            //2. handle entity-granule lock
+            isSuccess = instantTryLockEntityGranule(datasetId, entityHashValue, lockMode, txnContext);
+        }
+
+        if (IS_DEBUG_MODE) {
+            if (isSuccess) {
+                trackLockRequest("Granted", RequestType.INSTANT_TRY_LOCK, datasetId, entityHashValue, lockMode,
+                        txnContext, dLockInfo, -1);
+            } else {
+                trackLockRequest("Failed", RequestType.INSTANT_TRY_LOCK, datasetId, entityHashValue, lockMode,
+                        txnContext, dLockInfo, -1);
+            }
+        }
+
+        unlatchLockTable();
+
+        return isSuccess;
+    }
+
+    private boolean instantTryLockDatasetGranule(DatasetId datasetId, int entityHashValue, byte lockMode,
+            TransactionContext txnContext) throws ACIDException {
+        JobId jobId = txnContext.getJobId();
+        int jId = jobId.getId(); //int-type jobId
+        int dId = datasetId.getId(); //int-type datasetId
+        int waiterObjId;
+        int entityInfo = -1;
+        DatasetLockInfo dLockInfo;
+        JobInfo jobInfo;
+        boolean isUpgrade = false;
+        byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S ? LockMode.IS : LockMode.IX;
+
+        dLockInfo = datasetResourceHT.get(datasetId);
+        jobInfo = jobHT.get(jobId);
+
+        //check duplicated call
+
+        //1. lock request causing duplicated upgrading requests from different threads in a same job
+        waiterObjId = dLockInfo.findUpgraderFromUpgraderList(jId, entityHashValue);
+        if (waiterObjId != -1) {
+            return false;
+        }
+
+        //2. lock request causing duplicated waiting requests from different threads in a same job
+        waiterObjId = dLockInfo.findWaiterFromWaiterList(jId, entityHashValue);
+        if (waiterObjId != -1) {
+            return false;
+        }
+
+        //3. lock request causing duplicated holding requests from different threads or a single thread in a same job
+        entityInfo = dLockInfo.findEntityInfoFromHolderList(jId, entityHashValue);
+        if (entityInfo == -1) { //new call from this job -> doesn't mean that eLockInfo doesn't exist since another thread might have create the eLockInfo already.
+
+            //return fail if any upgrader exists or upgrading lock mode is not compatible
+            if (dLockInfo.getFirstUpgrader() != -1 || dLockInfo.getFirstWaiter() != -1
+                    || !dLockInfo.isCompatible(datasetLockMode)) {
+
+                //[Notice]
+                //There has been no same caller as (jId, dId, entityHashValue) triplet.
+                //But there could be the same caller as (jId, dId) pair.
+                //For example, two requests (J1, D1, E1) and (J1, D1, E2) are considered as duplicated call in dataset-granule perspective.
+                //Therefore, the above duplicated call case is covered in the following code.
+                //find the same dataset-granule lock request, that is, (J1, D1) pair in the above example.
+                if (jobInfo != null && jobInfo.isDatasetLockGranted(dId, LockMode.IS)) {
+                    if (dLockInfo.isCompatible(datasetLockMode)) {
+                        //this is duplicated call
+                        return true;
+                    }
+                }
+
+                return false;
+            }
+        } else {
+            isUpgrade = isLockUpgrade(entityInfoManager.getDatasetLockMode(entityInfo), lockMode);
+            if (isUpgrade) { //upgrade call 
+                //return fail if any upgrader exists or upgrading lock mode is not compatible
+                if (dLockInfo.getFirstUpgrader() != -1 || !dLockInfo.isUpgradeCompatible(datasetLockMode, entityInfo)) {
+                    return false;
+                }
+            }
+            /************************************
+             * else { //duplicated call
+             * //do nothing
+             * }
+             *************************************/
+        }
+
+        return true;
+    }
+
+    private boolean instantTryLockEntityGranule(DatasetId datasetId, int entityHashValue, byte lockMode,
+            TransactionContext txnContext) throws ACIDException {
+        JobId jobId = txnContext.getJobId();
+        int jId = jobId.getId(); //int-type jobId
+        int waiterObjId;
+        int eLockInfo = -1;
+        int entityInfo;
+        DatasetLockInfo dLockInfo;
+        boolean isUpgrade = false;
+
+        dLockInfo = datasetResourceHT.get(datasetId);
+        eLockInfo = dLockInfo.getEntityResourceHT().get(entityHashValue);
+
+        if (eLockInfo != -1) {
+            //check duplicated call
+
+            //1. lock request causing duplicated upgrading requests from different threads in a same job
+            waiterObjId = entityLockInfoManager.findUpgraderFromUpgraderList(eLockInfo, jId, entityHashValue);
+            if (waiterObjId != -1) {
+                return false;
+            }
+
+            //2. lock request causing duplicated waiting requests from different threads in a same job
+            waiterObjId = entityLockInfoManager.findWaiterFromWaiterList(eLockInfo, jId, entityHashValue);
+            if (waiterObjId != -1) {
+                return false;
+            }
+
+            //3. lock request causing duplicated holding requests from different threads or a single thread in a same job
+            entityInfo = entityLockInfoManager.findEntityInfoFromHolderList(eLockInfo, jId, entityHashValue);
+            if (entityInfo != -1) {//duplicated call or upgrader
+
+                isUpgrade = isLockUpgrade(entityInfoManager.getEntityLockMode(entityInfo), lockMode);
+                if (isUpgrade) {//upgrade call
+                    //wait if any upgrader exists or upgrading lock mode is not compatible
+                    if (entityLockInfoManager.getUpgrader(eLockInfo) != -1
+                            || !entityLockInfoManager.isUpgradeCompatible(eLockInfo, lockMode, entityInfo)) {
+                        return false;
+                    }
+                }
+                /***************************
+                 * else {//duplicated call
+                 * //do nothing
+                 * }
+                 ****************************/
+            } else {//new call from this job, but still eLockInfo exists since other threads hold it or wait on it
+                if (entityLockInfoManager.getUpgrader(eLockInfo) != -1
+                        || entityLockInfoManager.getFirstWaiter(eLockInfo) != -1
+                        || !entityLockInfoManager.isCompatible(eLockInfo, lockMode)) {
+                    return false;
+                }
+            }
+        }
+        /*******************************
+         * else {//eLockInfo doesn't exist, so this lock request is the first request and can be granted without waiting.
+         * //do nothing
+         * }
+         *********************************/
+
+        return true;
     }
 
     private boolean internalTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
@@ -1007,7 +1177,7 @@
         DatasetLockInfo dLockInfo = null;
         JobInfo jobInfo;
         byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S ? LockMode.IS : LockMode.IX;
-        boolean isSuccess = false;
+        boolean isSuccess = true;
         boolean doEscalate = false;
 
         latchLockTable();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 4e96d2e..1e0e0ae 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -524,9 +524,9 @@
      * This method resets the log page and is called by the log flusher thread
      * after a page has been flushed to disk.
      */
-    public void resetLogPage(long nextWritePosition, int pageIndex) throws IOException {
+    public void resetLogPage(long lsn, long nextWritePosition, int pageIndex) throws IOException {
 
-        String filePath = LogUtil.getLogFilePath(logManagerProperties, getLogFileId(nextWritePosition));
+        String filePath = LogUtil.getLogFilePath(logManagerProperties, getLogFileId(lsn));
 
         logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition),
                 logManagerProperties.getLogPageSize());
@@ -906,19 +906,6 @@
 
                 synchronized (logManager.getLogPage(pageToFlush)) {
 
-                    // lock the internal state of the log manager and create a
-                    // log file if necessary.
-                    int prevLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get());
-                    int nextLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get()
-                            + logManager.getLogManagerProperties().getLogPageSize());
-                    if (prevLogFileId != nextLogFileId) {
-                        String filePath = LogUtil.getLogFilePath(logManager.getLogManagerProperties(), nextLogFileId);
-                        FileUtil.createFileIfNotExists(filePath);
-                        logManager.getLogPage(pageToFlush).reset(
-                                LogUtil.getLogFilePath(logManager.getLogManagerProperties(), nextLogFileId), 0,
-                                logManager.getLogManagerProperties().getLogPageSize());
-                    }
-
                     // #. sleep during the groupCommitWaitTime
                     sleep(groupCommitWaitPeriod);
 
@@ -941,6 +928,13 @@
                     // the log page)
                     logManager.getLogPage(pageToFlush).flush();
 
+                    // Map the log page to a new region in the log file.
+                    long nextWritePosition = logManager.getLogPages()[pageToFlush].getNextWritePosition()
+                            + logManager.getLogManagerProperties().getLogBufferSize();
+
+                    logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1
+                            + logManager.getLogManagerProperties().getLogBufferSize(), nextWritePosition, pageToFlush);
+
                     // increment the last flushed lsn and lastFlushedPage
                     logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties().getLogPageSize());
 
@@ -950,12 +944,6 @@
                     // reset the count to 1
                     logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_WRITER);
 
-                    // Map the log page to a new region in the log file.
-                    long nextWritePosition = logManager.getLogPages()[pageToFlush].getNextWritePosition()
-                            + logManager.getLogManagerProperties().getLogBufferSize();
-
-                    logManager.resetLogPage(nextWritePosition, pageToFlush);
-
                     // mark the page as ACTIVE
                     logManager.getLogPageStatus(pageToFlush).set(LogManager.PageState.ACTIVE);