Rollback is implemented and all existing tests pass.(The existing failure test cases work, but more test cases need to be added).

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@979 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitPOperator.java
index a46e52a..37983c9 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitPOperator.java
@@ -42,11 +42,14 @@
     private final List<LogicalVariable> primaryKeyLogicalVars;
     private final JobId jobId;
     private final int datasetId;
+    private final boolean isWriteTransaction;
 
-    public CommitPOperator(JobId jobId, int datasetId, List<LogicalVariable> primaryKeyLogicalVars) {
+    public CommitPOperator(JobId jobId, int datasetId, List<LogicalVariable> primaryKeyLogicalVars,
+            boolean isWriteTransaction) {
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
+        this.isWriteTransaction = isWriteTransaction;
     }
 
     @Override
@@ -85,7 +88,7 @@
                 primaryKeyLogicalVars, varTypeEnv, context);
 
         CommitRuntimeFactory runtime = new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
-                binaryHashFunctionFactories);
+                binaryHashFunctionFactories, isWriteTransaction);
         builder.contributeMicroOperator(op, runtime, recDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
index 9f04a0e..98dbb5b 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext.TransactionType;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -46,6 +47,7 @@
     private final DatasetId datasetId;
     private final int[] primaryKeyFields;
     private final IBinaryHashFunction[] primaryKeyHashFunctions;
+    private final boolean isWriteTransaction;
 
     private TransactionContext transactionContext;
     private RecordDescriptor inputRecordDesc;
@@ -53,7 +55,7 @@
     private FrameTupleReference frameTupleReference;
 
     public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
-            IBinaryHashFunctionFactory[] binaryHashFunctionFactories) {
+            IBinaryHashFunctionFactory[] binaryHashFunctionFactories, boolean isWriteTransaction) {
         this.hyracksTaskCtx = ctx;
         AsterixAppRuntimeContext runtimeCtx = (AsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject();
@@ -66,12 +68,15 @@
             this.primaryKeyHashFunctions[i] = binaryHashFunctionFactories[i].createBinaryHashFunction();
         }
         this.frameTupleReference = new FrameTupleReference();
+        this.isWriteTransaction = isWriteTransaction;
     }
 
     @Override
     public void open() throws HyracksDataException {
         try {
             transactionContext = transactionManager.getTransactionContext(jobId);
+            transactionContext.setTransactionType(isWriteTransaction ? TransactionType.READ_WRITE
+                    : TransactionType.READ);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntimeFactory.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntimeFactory.java
index df7da97..262fa8f 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntimeFactory.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntimeFactory.java
@@ -35,13 +35,15 @@
     private final int datasetId;
     private final int[] primaryKeyFields;
     IBinaryHashFunctionFactory[] binaryHashFunctionFactories;
+    private final boolean isWriteTransaction;
 
     public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
-            IBinaryHashFunctionFactory[] binaryHashFunctionFactories) {
+            IBinaryHashFunctionFactory[] binaryHashFunctionFactories, boolean isWriteTransaction) {
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
         this.binaryHashFunctionFactories = binaryHashFunctionFactories;
+        this.isWriteTransaction = isWriteTransaction;
     }
 
     @Override
@@ -51,6 +53,6 @@
 
     @Override
     public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
-        return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, binaryHashFunctionFactories);
+        return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, binaryHashFunctionFactories, isWriteTransaction);
     }
 }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
index 02b7d1b..026725c 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
@@ -76,7 +76,7 @@
 
             //create the logical and physical operator
             CommitOperator commitOperator = new CommitOperator();
-            CommitPOperator commitPOperator = new CommitPOperator(jobId, datasetId, primaryKeyLogicalVars);
+            CommitPOperator commitPOperator = new CommitPOperator(jobId, datasetId, primaryKeyLogicalVars, mp.isWriteTransaction());
             commitOperator.setPhysicalOperator(commitPOperator);
 
             //create ExtensionOperator and put the commitOperator in it.
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
index 2649e2c..ce23c17 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
@@ -203,6 +203,7 @@
 
         edu.uci.ics.asterix.transaction.management.service.transaction.JobId asterixJobId = JobIdFactory
                 .generateJobId();
+        queryMetadataProvider.setJobId(asterixJobId);
         AqlExpressionToPlanTranslator t = new AqlExpressionToPlanTranslator(queryMetadataProvider, varCounter,
                 outputDatasetName, statement);
 
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index b363d1c..591d8a8 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -83,7 +83,6 @@
 import edu.uci.ics.asterix.om.types.TypeSignature;
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
 import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetIdFactory;
-import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
 import edu.uci.ics.asterix.translator.AbstractAqlTranslator;
 import edu.uci.ics.asterix.translator.CompiledStatements.CompiledBeginFeedStatement;
 import edu.uci.ics.asterix.translator.CompiledStatements.CompiledControlFeedStatement;
@@ -690,9 +689,6 @@
         // Query Compilation (happens under the same ongoing metadata
         // transaction)
         sessionConfig.setGenerateJobSpec(true);
-        if (metadataProvider.isWriteTransaction()) {
-            metadataProvider.setJobId(JobIdFactory.generateJobId());
-        }
         JobSpecification spec = APIFramework.compileQuery(declaredFunctions, metadataProvider, query,
                 reWrittenQuery.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, out, pdf, stmt);
         sessionConfig.setGenerateJobSpec(false);
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
index 3eb9fcf..111f76b 100644
--- a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
@@ -98,8 +98,7 @@
             if (!testFile.getAbsolutePath().contains("runtimets/queries/failure/")) {
                 continue;
             }
-            System.out.println(testFile.getAbsolutePath());
-            ***********************************************************/
+            ************************************************************/
 
             File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
             File actualFile = new File(PATH_ACTUAL + File.separator
diff --git a/asterix-app/src/test/resources/runtimets/queries/failure/delete-rtree.aql b/asterix-app/src/test/resources/runtimets/queries/failure/delete-rtree.aql
index 465d94f..6fc11e5 100644
--- a/asterix-app/src/test/resources/runtimets/queries/failure/delete-rtree.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/failure/delete-rtree.aql
@@ -18,7 +18,7 @@
 using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter"
 (("path"="nc1://data/twitter/smalltweets.txt"),("format"="adm")) pre-sorted;
 
-delete $l from dataset MyData where $l.id>=50 die after 1500;
+delete $l from dataset MyData where $l.id>=50 die after 1000;
 
 write output to nc1:"rttest/failure_delete-rtree.adm";      
 
diff --git a/asterix-app/src/test/resources/runtimets/queries/failure/delete.aql b/asterix-app/src/test/resources/runtimets/queries/failure/delete.aql
index cd4a922..3da88f8 100644
--- a/asterix-app/src/test/resources/runtimets/queries/failure/delete.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/failure/delete.aql
@@ -29,7 +29,7 @@
 using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter"
 (("path"="nc1://data/tpch0.001/lineitem.tbl"),("format"="delimited-text"),("delimiter"="|")) pre-sorted;
 
-delete $l from dataset LineItem where $l.l_orderkey>=10 die after 1500;
+delete $l from dataset LineItem where $l.l_orderkey>=10 die after 1000;
 
 write output to nc1:"rttest/failure_delete.adm";      
 for $c in dataset('LineItem')
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index a428042..fa5290c 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -918,7 +918,7 @@
         <output-file compare="Text">delete-rtree.adm</output-file>
       </compilation-unit>
     </test-case>
-<!--     <test-case FilePath="failure">
+    <test-case FilePath="failure">
       <compilation-unit name="delete">
         <output-file compare="Text">delete.adm</output-file>
         <expected-error>edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException</expected-error>
@@ -926,7 +926,7 @@
       <compilation-unit name="verify_delete">
         <output-file compare="Text">delete.adm</output-file>
       </compilation-unit>
-    </test-case> -->
+    </test-case>
     <test-case FilePath="failure">
       <compilation-unit name="insert-rtree">
         <output-file compare="Text">insert-rtree.adm</output-file>
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
index fbaa12f..4532390 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
@@ -42,6 +42,8 @@
     public void registerTransactionalResource(long resourceId, Object resource) {
         synchronized (resourceRepository) {
             mutableResourceId.setId(resourceId);
+//            MutableResourceId newMutableResourceId = new MutableResourceId(resourceId);
+//            resourceRepository.put(newMutableResourceId, resource);
             if (resourceRepository.get(resourceId) == null) {
                 MutableResourceId newMutableResourceId = new MutableResourceId(resourceId);
                 resourceRepository.put(newMutableResourceId, resource);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 050f1a1..6df3928 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -515,7 +515,7 @@
                         "Unsupported unlock request: dataset-granule unlock is not supported");
             }
         }
-        
+
         latchLockTable();
         validateJob(txnContext);
 
@@ -527,37 +527,24 @@
         //find the resource to be unlocked
         dLockInfo = datasetResourceHT.get(datasetId);
         jobInfo = jobHT.get(jobId);
-        if (IS_DEBUG_MODE) {
-            if (dLockInfo == null || jobInfo == null) {
-                throw new IllegalStateException("Invalid unlock request: Corresponding lock info doesn't exist.");
-            }
-        }
-        
-        //////////////////////////////////////////////////////////////
-        //TODO
-        //Check whether the dLockInfo or jobInfo could be null 
-        //even when the callback is called properly
         if (dLockInfo == null || jobInfo == null) {
             unlatchLockTable();
-            return;
+            throw new IllegalStateException("Invalid unlock request: Corresponding lock info doesn't exist.");
         }
-        /////////////////////////////////////////////////////////////
-        
+
         eLockInfo = dLockInfo.getEntityResourceHT().get(entityHashValue);
-        
-        if (IS_DEBUG_MODE) {
-            if (eLockInfo == -1) {
-                throw new IllegalStateException("Invalid unlock request: Corresponding lock info doesn't exist.");
-            }
+
+        if (eLockInfo == -1) {
+            unlatchLockTable();
+            throw new IllegalStateException("Invalid unlock request: Corresponding lock info doesn't exist.");
         }
 
         //find the corresponding entityInfo
         entityInfo = entityLockInfoManager.findEntityInfoFromHolderList(eLockInfo, jobId.getId(), entityHashValue);
-        if (IS_DEBUG_MODE) {
-            if (entityInfo == -1) {
-                throw new IllegalStateException("Invalid unlock request[" + jobId.getId() + "," + datasetId.getId()
-                        + "," + entityHashValue + "]: Corresponding lock info doesn't exist.");
-            }
+        if (entityInfo == -1) {
+            unlatchLockTable();
+            throw new IllegalStateException("Invalid unlock request[" + jobId.getId() + "," + datasetId.getId() + ","
+                    + entityHashValue + "]: Corresponding lock info doesn't exist.");
         }
 
         //decrease the corresponding count of dLockInfo/eLockInfo/entityInfo
@@ -579,8 +566,16 @@
             //This commit log is written here in order to avoid increasing the memory space for managing transactionIds
             if (commitFlag) {
                 if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) {
-                    txnSubsystem.getLogManager().log(LogType.COMMIT, txnContext, datasetId.getId(), entityHashValue,
-                            -1, (byte) 0, 0, null, null, logicalLogLocator);
+                    try {
+                        txnSubsystem.getLogManager().log(LogType.COMMIT, txnContext, datasetId.getId(),
+                                entityHashValue, -1, (byte) 0, 0, null, null, logicalLogLocator);
+                    } catch (ACIDException e) {
+                        try {
+                            requestAbort(txnContext);
+                        } finally {
+                            unlatchLockTable();
+                        }
+                    }
                 }
 
                 txnContext.setLastLSNToIndexes(logicalLogLocator.getLsn());
@@ -660,7 +655,7 @@
             trackLockRequest("Requested", RequestType.RELEASE_LOCKS, new DatasetId(0), 0, (byte) 0, txnContext,
                     dLockInfo, eLockInfo);
         }
-        
+
         JobInfo jobInfo = jobHT.get(jobId);
         if (jobInfo == null) {
             unlatchLockTable();
@@ -971,7 +966,7 @@
         StringBuilder s = new StringBuilder();
         LockRequest request = new LockRequest(Thread.currentThread().getName(), requestType, datasetIdObj,
                 entityHashValue, lockMode, txnContext);
-        s.append(Thread.currentThread().getId()+":");
+        s.append(Thread.currentThread().getId() + ":");
         s.append(msg);
         if (msg.equals("Granted")) {
             if (dLockInfo != null) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
index cb6a67a..a70fbe2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
@@ -43,6 +43,8 @@
         long resourceId = logRecordHelper.getResourceId(logLocator);
         int offset = logRecordHelper.getLogContentBeginPos(logLocator);
 
+        //TODO
+        //replace TransactionResourceRepository with IndexLifeCycleManager
         // look up the repository to obtain the resource object
         IIndex index = (IIndex) provider.getTransactionalResourceRepository().getTransactionalResource(resourceId);
 
@@ -95,9 +97,6 @@
                     indexAccessor.physicalDelete(newTuple);
                 }
             } else {
-                //For LSMRtree and LSMInvertedIndex
-                //delete --> insert
-                //insert --> delete
                 if (newOperation == (byte) IndexOperation.DELETE.ordinal()) {
                     indexAccessor.insert(newTuple);
                 } else {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
index f1e3877..61c47f4 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
@@ -25,12 +25,12 @@
     private final ILogFilter logFilter;
     private IBuffer readOnlyBuffer;
     private LogicalLogLocator logicalLogLocator = null;
-    private int bufferIndex = 0;
+    private long bufferIndex = 0;
     private boolean firstNext = true;
     private boolean readMemory = false;
     private long readLSN = 0;
     private boolean needReloadBuffer = true;
-    
+
     /**
      * @param logFilter
      */
@@ -56,7 +56,8 @@
         String filePath = LogUtil.getLogFilePath(logManager.getLogManagerProperties(), fileId);
         File file = new File(filePath);
         if (file.exists()) {
-            return FileUtil.getFileBasedBuffer(filePath, lsn, size);
+            return FileUtil.getFileBasedBuffer(filePath, lsn
+                    % logManager.getLogManagerProperties().getLogPartitionSize(), size);
         } else {
             return null;
         }
@@ -75,6 +76,8 @@
     @Override
     public boolean next(LogicalLogLocator currentLogLocator) throws IOException, ACIDException {
 
+        //TODO
+        //Test the correctness when multiple log files are created
         int integerRead = -1;
         boolean logRecordBeginPosFound = false;
         long bytesSkipped = 0;
@@ -116,11 +119,19 @@
                 // bytes without finding a log record, it
                 // indicates an absence of logs any further.
             }
+
+            if (logicalLogLocator.getLsn() > logManager.getLastFlushedLsn().get()) {
+                return next(currentLogLocator); //should read from memory if there is any further log
+            }
         }
 
         if (!logRecordBeginPosFound) {
             // need to reload the buffer
-            long lsnpos = (++bufferIndex * logManager.getLogManagerProperties().getLogBufferSize());
+            // TODO
+            // reduce IO by reading more pages(equal to logBufferSize) at a time.
+            long lsnpos = ((logicalLogLocator.getLsn() / logManager.getLogManagerProperties().getLogPageSize()) + 1)
+                    * logManager.getLogManagerProperties().getLogPageSize();
+
             readOnlyBuffer = getReadOnlyBuffer(lsnpos, logManager.getLogManagerProperties().getLogBufferSize());
             if (readOnlyBuffer != null) {
                 logicalLogLocator.setBuffer(readOnlyBuffer);
@@ -163,7 +174,7 @@
     private boolean readFromMemory(LogicalLogLocator currentLogLocator) throws ACIDException, IOException {
         byte[] logRecord = null;
         long lsn = logicalLogLocator.getLsn();
-        
+
         //set the needReloadBuffer to true
         needReloadBuffer = true;
 
@@ -209,7 +220,7 @@
                     // need to read the next log page
                     readOnlyBuffer = null;
                     logicalLogLocator.setBuffer(null);
-                    logicalLogLocator.setLsn(lsn/logPageSize+1);
+                    logicalLogLocator.setLsn(lsn / logPageSize + 1);
                     logicalLogLocator.setMemoryOffset(0);
                     return next(currentLogLocator);
                 }
@@ -227,7 +238,7 @@
                 readOnlyBuffer = memBuffer;
                 logicalLogLocator.setBuffer(readOnlyBuffer);
                 logicalLogLocator.setMemoryOffset(0);
-                
+
                 if (logManager.getLogRecordHelper().validateLogRecord(logicalLogLocator)) {
                     if (currentLogLocator == null) {
                         currentLogLocator = new LogicalLogLocator(0, readOnlyBuffer, -1, logManager);
@@ -249,7 +260,7 @@
                     return next(currentLogLocator);
                 }
                 return logFilter.accept(readOnlyBuffer, currentLogLocator.getMemoryOffset(), logLength);
-                
+
             } else {
                 return next(currentLogLocator);//read from disk
             }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 0487aee..2221e86 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -36,7 +36,6 @@
 
 public class LogManager implements ILogManager {
 
-    
     public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
     private TransactionSubsystem provider;
@@ -156,7 +155,7 @@
     }
 
     public void addFlushRequest(int pageIndex) {
-        pendingFlushRequests[pageIndex].add(Thread.currentThread());
+        pendingFlushRequests[pageIndex].add(pendingFlushRequests);
     }
 
     public AtomicLong getLastFlushedLsn() {
@@ -427,6 +426,9 @@
             }
 
             if (forwardPage) {
+                //TODO 
+                //this is not safe since the incoming thread may reach the same page slot with this page 
+                //(differ by the log buffer size)
                 logPageStatus[prevPage].set(PageState.INACTIVE); // mark
                 // previous
                 // page
@@ -491,8 +493,8 @@
 
     @Override
     public void log(byte logType, TransactionContext context, int datasetId, int PKHashValue, long resourceId,
-            byte resourceMgrId, int logContentSize, ReusableLogContentObject reusableLogContentObject,
-            ILogger logger, LogicalLogLocator logicalLogLocator) throws ACIDException {
+            byte resourceMgrId, int logContentSize, ReusableLogContentObject reusableLogContentObject, ILogger logger,
+            LogicalLogLocator logicalLogLocator) throws ACIDException {
         /*
          * logLocator is a re-usable object that is appropriately set in each
          * invocation. If the reference is null, the log manager must throw an
@@ -531,6 +533,9 @@
             previousLSN = context.getLastLogLocator().getLsn();
             currentLSN = getLsn(totalLogSize, logType);
             context.setLastLSN(currentLSN);
+            if (IS_DEBUG_MODE) {
+                System.out.println("--------------> LSN(" + currentLSN + ") is allocated");
+            }
             logicalLogLocator.setLsn(currentLSN);
         }
 
@@ -567,7 +572,7 @@
              */
             logRecordHelper.writeLogHeader(logicalLogLocator, logType, context, datasetId, PKHashValue, previousLSN,
                     resourceId, resourceMgrId, logContentSize);
-            
+
             // increment the offset so that the transaction can fill up the
             // content in the correct region of the allocated space.
             logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType));
@@ -585,7 +590,8 @@
                 logger.log(context, logicalLogLocator, logContentSize, reusableLogContentObject);
                 logger.postLog(context, reusableLogContentObject);
                 if (IS_DEBUG_MODE) {
-                    logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset() - logRecordHelper.getLogHeaderSize(logType));
+                    logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset()
+                            - logRecordHelper.getLogHeaderSize(logType));
                     System.out.println(logRecordHelper.getLogRecordForDisplay(logicalLogLocator));
                     logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType));
                 }
@@ -598,8 +604,12 @@
             int startPosChecksum = logicalLogLocator.getMemoryOffset() - logRecordHelper.getLogHeaderSize(logType);
             int length = totalLogSize - logRecordHelper.getLogChecksumSize();
             long checksum = DataUtil.getChecksum(logPages[pageIndex], startPosChecksum, length);
-            logPages[pageIndex].writeLong(pageOffset + logRecordHelper.getLogHeaderSize(logType)
-                    + logContentSize, checksum);
+            logPages[pageIndex].writeLong(pageOffset + logRecordHelper.getLogHeaderSize(logType) + logContentSize,
+                    checksum);
+
+            if (IS_DEBUG_MODE) {
+                System.out.println("--------------> LSN(" + currentLSN + ") is written");
+            }
 
             /*
              * release the ownership as the log record has been placed in
@@ -614,8 +624,7 @@
              * If the transaction thread happens to be the last owner of the log
              * page the page must by marked as a candidate to be flushed.
              */
-            if (pageDirtyCount == 0) {
-                logPageStatus[pageIndex].set(PageState.INACTIVE);
+            if (pageDirtyCount == 0 && logPageStatus[pageIndex].get() == PageState.INACTIVE) {
                 addFlushRequest(pageIndex);
                 addedFlushRequest = true;
             }
@@ -627,24 +636,13 @@
              * been flushed to disk because the containing log page filled up.
              */
             if (logType == LogType.COMMIT) {
-                if (getLastFlushedLsn().get() < currentLSN) {
-                    if (!addedFlushRequest) {
-                        addFlushRequest(pageIndex);
-                    }
-
-                    /*
-                     * the commit log record is still in log buffer. need to
-                     * wait until the containing log page is flushed. When the
-                     * log flusher thread does flush the page, it notifies all
-                     * waiting threads of the flush event.
-                     */
-                    synchronized (logPages[pageIndex]) {
-                        while (getLastFlushedLsn().get() < currentLSN) {
-                            logPages[pageIndex].wait();
-                        }
+                synchronized (logPages[pageIndex]) {
+                    while (getLastFlushedLsn().get() < currentLSN) {
+                        logPages[pageIndex].wait();
                     }
                 }
             }
+
         } catch (Exception e) {
             e.printStackTrace();
             throw new ACIDException(context, "Thread: " + Thread.currentThread().getName()
@@ -669,6 +667,9 @@
 
         logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition),
                 logManagerProperties.getLogPageSize());
+
+        //TODO Check if this is necessary
+        //Arrays.fill(logPages[pageIndex].getArray(), (byte) 0);
     }
 
     @Override
@@ -688,8 +689,7 @@
      * Read a log that is residing on the disk.
      */
     private void readDiskLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException {
-        String filePath = LogUtil.getLogFilePath(logManagerProperties,
-                LogUtil.getFileId(this, lsnValue));
+        String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, lsnValue));
         long fileOffset = LogUtil.getFileOffset(this, lsnValue);
         ByteBuffer buffer = ByteBuffer.allocate(logManagerProperties.getLogPageSize());
         RandomAccessFile raf = null;
@@ -701,7 +701,7 @@
             buffer.position(0);
             byte logType = buffer.get(4);
             int logHeaderSize = logRecordHelper.getLogHeaderSize(logType);
-            int logBodySize = buffer.getInt(logHeaderSize-4);
+            int logBodySize = buffer.getInt(logHeaderSize - 4);
             int logRecordSize = logHeaderSize + logBodySize + logRecordHelper.getLogChecksumSize();
             buffer.limit(logRecordSize);
             MemBasedBuffer memBuffer = new MemBasedBuffer(buffer.slice());
@@ -716,8 +716,8 @@
                 throw new ACIDException(" invalid log record at lsn " + lsnValue);
             }
         } catch (Exception fnfe) {
-            throw new ACIDException(" unable to retrieve log record with lsn " + lsnValue
-                    + " from the file system", fnfe);
+            throw new ACIDException(" unable to retrieve log record with lsn " + lsnValue + " from the file system",
+                    fnfe);
         } finally {
             try {
                 if (raf != null) {
@@ -742,10 +742,10 @@
         if (lsnValue > getLastFlushedLsn().get()) {
             int pageIndex = getLogPageIndex(lsnValue);
             int pageOffset = getLogPageOffset(lsnValue);
-            
+
             //TODO
             //minimize memory allocation overhead. current code allocates the log page size per reading a log record.
-            
+
             byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
             // take a lock on the log page so that the page is not flushed to
             // disk interim
@@ -759,7 +759,7 @@
 
                     // get the log record length
                     logPages[pageIndex].getBytes(pageContent, 0, pageContent.length);
-                    byte logType = pageContent[pageOffset+4];
+                    byte logType = pageContent[pageOffset + 4];
                     int logHeaderSize = logRecordHelper.getLogHeaderSize(logType);
                     int logBodySize = DataUtil.byteArrayToInt(pageContent, pageOffset + logHeaderSize - 4);
                     int logRecordSize = logHeaderSize + logBodySize + logRecordHelper.getLogChecksumSize();
@@ -783,8 +783,7 @@
                             throw new ACIDException(" invalid log record at lsn " + lsnValue);
                         }
                     } catch (Exception e) {
-                        throw new ACIDException("exception encoutered in validating log record at lsn "
-                                + lsnValue, e);
+                        throw new ACIDException("exception encoutered in validating log record at lsn " + lsnValue, e);
                     }
                     return;
                 }
@@ -996,6 +995,7 @@
                     // that got flushed.
                     logManager.getLogPages()[pageToFlush].notifyAll();
                     logManager.setLastFlushedPage(pageToFlush);
+
                 }
             } catch (IOException ioe) {
                 ioe.printStackTrace();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
index fe48743..177533d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
@@ -40,13 +40,14 @@
 
     private int logPageSize = 128 * 1024; // 128 KB
     private int numLogPages = 8; // number of log pages in the log buffer.
-    private long logPartitionSize = logPageSize * 250; // maximum size of each
-                                                       // log file
+
     private long groupCommitWaitPeriod = 0; // time in milliseconds for which a
     // commit record will wait before
     // the housing page is marked for
     // flushing.
     private int logBufferSize = logPageSize * numLogPages;
+    // maximum size of each log file
+    private long logPartitionSize = logBufferSize * 1024 * 2; //2GB
 
     public int logMagicNumber = 123456789;
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 10d69dc..0905907 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -46,7 +46,6 @@
 import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
 import edu.uci.ics.asterix.transaction.management.service.logging.PhysicalLogLocator;
 import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
-import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -59,6 +58,7 @@
  */
 public class RecoveryManager implements IRecoveryManager {
 
+    public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName());
     private TransactionSubsystem txnSubsystem;
 
@@ -176,6 +176,9 @@
         Map<TxnId, List<Long>> loserTxnTable = new HashMap<TxnId, List<Long>>();
         TxnId tempKeyTxnId = new TxnId(-1, -1, -1);
 
+        int updateLogCount = 0;
+        int commitLogCount = 0;
+
         // Obtain the first log record written by the Job
         PhysicalLogLocator firstLSNLogLocator = txnContext.getFirstLogLocator();
         PhysicalLogLocator lastLSNLogLocator = txnContext.getLastLogLocator();
@@ -212,10 +215,10 @@
         List<Long> undoLSNSet = null;
 
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(" collecting loser transaction's LSNs from " + firstLSNLogLocator.getLsn() + " to " +
-                    + lastLSNLogLocator.getLsn());
+            LOGGER.info(" collecting loser transaction's LSNs from " + firstLSNLogLocator.getLsn() + " to "
+                    + +lastLSNLogLocator.getLsn());
         }
-        
+
         while (currentLogLocator.getLsn() != lastLSNLogLocator.getLsn()) {
             try {
                 valid = logCursor.next(currentLogLocator);
@@ -229,13 +232,13 @@
                     break;//End of Log File
                 }
             }
-            
+
             if (LogManager.IS_DEBUG_MODE) {
                 System.out.println(logManager.getLogRecordHelper().getLogRecordForDisplay(currentLogLocator));
             }
 
-            tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator), logRecordHelper.getDatasetId(currentLogLocator),
-                    logRecordHelper.getPKHashValue(currentLogLocator));
+            tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator),
+                    logRecordHelper.getDatasetId(currentLogLocator), logRecordHelper.getPKHashValue(currentLogLocator));
             logType = logRecordHelper.getLogType(currentLogLocator);
 
             switch (logType) {
@@ -243,11 +246,17 @@
                     undoLSNSet = loserTxnTable.get(tempKeyTxnId);
                     if (undoLSNSet == null) {
                         TxnId txnId = new TxnId(logRecordHelper.getJobId(currentLogLocator),
-                                logRecordHelper.getDatasetId(currentLogLocator), logRecordHelper.getPKHashValue(currentLogLocator));
+                                logRecordHelper.getDatasetId(currentLogLocator),
+                                logRecordHelper.getPKHashValue(currentLogLocator));
                         undoLSNSet = new ArrayList<Long>();
                         loserTxnTable.put(txnId, undoLSNSet);
                     }
                     undoLSNSet.add(currentLogLocator.getLsn());
+                    if (IS_DEBUG_MODE) {
+                        updateLogCount++;
+                        System.out.println("" + Thread.currentThread().getId() + "======> update["
+                                + currentLogLocator.getLsn() + "]:" + tempKeyTxnId);
+                    }
                     break;
 
                 case LogType.COMMIT:
@@ -255,6 +264,11 @@
                     if (undoLSNSet != null) {
                         loserTxnTable.remove(tempKeyTxnId);
                     }
+                    if (IS_DEBUG_MODE) {
+                        commitLogCount++;
+                        System.out.println("" + Thread.currentThread().getId() + "======> commit["
+                                + currentLogLocator.getLsn() + "]" + tempKeyTxnId);
+                    }
                     break;
 
                 default:
@@ -266,11 +280,13 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(" undoing loser transaction's effect");
         }
-        
+
         TxnId txnId = null;
         Iterator<Entry<TxnId, List<Long>>> iter = loserTxnTable.entrySet().iterator();
         byte resourceMgrId;
+        int undoCount = 0;
         while (iter.hasNext()) {
+
             Map.Entry<TxnId, List<Long>> loserTxn = (Map.Entry<TxnId, List<Long>>) iter.next();
             txnId = loserTxn.getKey();
 
@@ -280,7 +296,7 @@
 
             for (long undoLSN : undoLSNSet) {
                 // here, all the log records are UPDATE type. So, we don't need to check the type again.
-                
+
                 //read the corresponding log record to be undone.
                 logManager.readLog(undoLSN, currentLogLocator);
 
@@ -305,12 +321,20 @@
                             resourceMgrId, resourceMgr);
                 }
                 resourceMgr.undo(logRecordHelper, currentLogLocator);
+
+                if (IS_DEBUG_MODE) {
+                    undoCount++;
+                }
             }
         }
-        
+
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(" undone loser transaction's effect");
         }
+        if (IS_DEBUG_MODE) {
+            System.out.println("UpdateLogCount/CommitLogCount/UndoCount:" + updateLogCount + "/" + commitLogCount + "/"
+                    + undoCount);
+        }
     }
 }
 
@@ -338,6 +362,11 @@
     }
 
     @Override
+    public String toString() {
+        return "[" + jobId + "," + datasetId + "," + pkHashVal + "]";
+    }
+
+    @Override
     public int hashCode() {
         return pkHashVal;
     }
@@ -347,7 +376,7 @@
         if (o == this) {
             return true;
         }
-        if (!(o instanceof JobId)) {
+        if (!(o instanceof TxnId)) {
             return false;
         }
         TxnId txnId = (TxnId) o;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 3f71540..bafa753 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -74,6 +74,7 @@
     @Override
     public TransactionContext getTransactionContext(JobId jobId) throws ACIDException {
         synchronized (transactionContextRepository) {
+
             TransactionContext context = transactionContextRepository.get(jobId);
             if (context == null) {
                 context = transactionContextRepository.get(jobId);