fixed bugs related to 1)job-level-commit(which is used for DDL) log and 2) sharp checkpoint

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1416 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 1d04cd7..d9c6ed4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -63,6 +63,7 @@
 import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext.TransactionType;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -71,6 +72,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
 import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
@@ -269,6 +271,9 @@
 
         IIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
 
+        TransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        txnCtx.setTransactionType(TransactionType.READ_WRITE);
+
         // TODO: fix exceptions once new BTree exception model is in hyracks.
         indexAccessor.insert(tuple);
 
@@ -567,6 +572,10 @@
         IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID, metadataIndex,
                 lsmIndex, IndexOperation.DELETE);
         IIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
+
+        TransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        txnCtx.setTransactionType(TransactionType.READ_WRITE);
+
         indexAccessor.delete(tuple);
         indexLifecycleManager.close(resourceID);
     }
@@ -590,6 +599,7 @@
 
     @Override
     public Dataverse getDataverse(JobId jobId, String dataverseName) throws MetadataException, RemoteException {
+
         try {
             ITupleReference searchKey = createTuple(dataverseName);
             DataverseTupleTranslator tupleReaderWriter = new DataverseTupleTranslator(false);
@@ -843,6 +853,86 @@
         return results.get(0);
     }
 
+    //Debugging Method
+    public String printMetadata() {
+
+        StringBuilder sb = new StringBuilder();
+        try {
+            IMetadataIndex index = MetadataPrimaryIndexes.DATAVERSE_DATASET;
+            long resourceID = index.getResourceID();
+            IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
+            indexLifecycleManager.open(resourceID);
+            IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
+                    NoOpOperationCallback.INSTANCE);
+            ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor();
+
+            RangePredicate rangePred = null;
+            rangePred = new RangePredicate(null, null, true, true, null, null);
+            indexAccessor.search(rangeCursor, rangePred);
+            try {
+                while (rangeCursor.hasNext()) {
+                    rangeCursor.next();
+                    sb.append(TupleUtils.printTuple(rangeCursor.getTuple(),
+                            new ISerializerDeserializer[] { AqlSerializerDeserializerProvider.INSTANCE
+                                    .getSerializerDeserializer(BuiltinType.ASTRING) }));
+                }
+            } finally {
+                rangeCursor.close();
+            }
+            indexLifecycleManager.close(resourceID);
+
+            index = MetadataPrimaryIndexes.DATASET_DATASET;
+            resourceID = index.getResourceID();
+            indexInstance = indexLifecycleManager.getIndex(resourceID);
+            indexLifecycleManager.open(resourceID);
+            indexAccessor = indexInstance
+                    .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor();
+
+            rangePred = null;
+            rangePred = new RangePredicate(null, null, true, true, null, null);
+            indexAccessor.search(rangeCursor, rangePred);
+            try {
+                while (rangeCursor.hasNext()) {
+                    rangeCursor.next();
+                    sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), new ISerializerDeserializer[] {
+                            AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
+                            AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING) }));
+                }
+            } finally {
+                rangeCursor.close();
+            }
+            indexLifecycleManager.close(resourceID);
+
+            index = MetadataPrimaryIndexes.INDEX_DATASET;
+            resourceID = index.getResourceID();
+            indexInstance = indexLifecycleManager.getIndex(resourceID);
+            indexLifecycleManager.open(resourceID);
+            indexAccessor = indexInstance
+                    .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor();
+
+            rangePred = null;
+            rangePred = new RangePredicate(null, null, true, true, null, null);
+            indexAccessor.search(rangeCursor, rangePred);
+            try {
+                while (rangeCursor.hasNext()) {
+                    rangeCursor.next();
+                    sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), new ISerializerDeserializer[] {
+                            AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
+                            AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
+                            AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING) }));
+                }
+            } finally {
+                rangeCursor.close();
+            }
+            indexLifecycleManager.close(resourceID);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return sb.toString();
+    }
+
     private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
             IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception {
         IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index f3c33f8..242f228 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -697,6 +697,7 @@
         closeLogPages();
         initLSN();
         openLogPages();
+        logPageFlusher.renew();
     }
 
     private PhysicalLogLocator initLSN() throws ACIDException {
@@ -858,8 +859,9 @@
      */
     private final LinkedBlockingQueue<Object>[] flushRequestQueue;
     private final Object[] flushRequests;
-    private int lastFlushedPageIndex;
+    private int pageToFlush;
     private final long groupCommitWaitPeriod;
+    private boolean isRenewRequest;
 
     public LogPageFlushThread(LogManager logManager) {
         this.logManager = logManager;
@@ -871,8 +873,16 @@
             flushRequestQueue[i] = new LinkedBlockingQueue<Object>(1);
             flushRequests[i] = new Object();
         }
-        this.lastFlushedPageIndex = -1;
+        this.pageToFlush = -1;
         groupCommitWaitPeriod = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
+        isRenewRequest = false;
+    }
+
+    public void renew() {
+        isRenewRequest = true;
+        pageToFlush = -1;
+        this.interrupt();
+        isRenewRequest = false;
     }
 
     public void requestFlush(int pageIndex, long lsn, boolean isSynchronous) {
@@ -908,12 +918,19 @@
     public void run() {
         while (true) {
             try {
-                int pageToFlush = logManager.getNextPageInSequence(lastFlushedPageIndex);
+                pageToFlush = logManager.getNextPageInSequence(pageToFlush);
 
                 // A wait call on the linkedBLockingQueue. The flusher thread is
                 // notified when an object is added to the queue. Please note
                 // that each page has an associated blocking queue.
-                flushRequestQueue[pageToFlush].take();
+                try {
+                    flushRequestQueue[pageToFlush].take();
+                } catch (InterruptedException ie) {
+                    while (isRenewRequest) {
+                        sleep(1);
+                    }
+                    continue;
+                }
 
                 synchronized (logManager.getLogPage(pageToFlush)) {
 
@@ -954,7 +971,6 @@
 
                     // increment the last flushed lsn and lastFlushedPage
                     logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties().getLogPageSize());
-                    lastFlushedPageIndex = pageToFlush;
 
                     // decrement activeTxnCountOnIndexes
                     logManager.decrementActiveTxnCountOnIndexes(pageToFlush);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index ed88112..c73f693 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -241,13 +241,15 @@
         long resourceId;
         byte resourceMgrId;
         long maxDiskLastLSN;
-        long currentLSN;
+        long currentLSN = -1;
         int resourceType;
         ILSMIndex index = null;
         LocalResource localResource = null;
         ILocalResourceMetadata localResourceMetadata = null;
         Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
         List<ILSMComponent> immutableDiskIndexList = null;
+        TxnId jobLevelTxnId = new TxnId(-1, -1, -1);
+        boolean foundWinnerTxn;
 
         //#. get indexLifeCycleManager 
         IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
@@ -256,7 +258,7 @@
 
         //#. redo
         while (logCursor.next(currentLogLocator)) {
-
+            foundWinnerTxn = false;
             if (LogManager.IS_DEBUG_MODE) {
                 System.out.println(logManager.getLogRecordHelper().getLogRecordForDisplay(currentLogLocator));
             }
@@ -268,9 +270,16 @@
                     tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator),
                             logRecordHelper.getDatasetId(currentLogLocator),
                             logRecordHelper.getPKHashValue(currentLogLocator));
-
+                    jobLevelTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator), -1, -1);
                     if (winnerTxnTable.containsKey(tempKeyTxnId)) {
                         currentLSN = winnerTxnTable.get(tempKeyTxnId);
+                        foundWinnerTxn = true;
+                    } else if (winnerTxnTable.containsKey(jobLevelTxnId)) {
+                        currentLSN = winnerTxnTable.get(jobLevelTxnId);
+                        foundWinnerTxn = true;
+                    }
+
+                    if (foundWinnerTxn) {
                         resourceId = logRecordHelper.getResourceId(currentLogLocator);
                         localResource = localResourceRepository.getResourceById(resourceId);
 
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java
index 260e5ff..162660f 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java
@@ -75,7 +75,7 @@
     public static void main(String[] args) throws ACIDException, Exception {
         Properties props = new Properties();
         props.setProperty(LogManagerProperties.LOG_DIR_KEY,
-                "/home/kisskys/workspace/lsm_merge/asterix_lsm_stabilization/debug/asterix_logs/nc1");
+                "/Users/kisskys/workspace/sanitycheck_lsm_merge/asterix_lsm_stabilization/asterix-app/asterix_logs/nc1");
         LogManagerProperties logProps = new LogManagerProperties(props);
         LogManager logManager = new LogManager(null, logProps);
         LogRecordReader logReader = new LogRecordReader(logManager);