changes to add sharp checkpoint

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1261 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
index 201a593..51b61d7 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
@@ -75,7 +75,7 @@
                 recoveryMgr.startRecovery(true);
             }
         }
-        recoveryMgr.checkpoint();
+        recoveryMgr.checkpoint(false);
         
         if (isMetadataNode) {
             //#. clean-up incomplete DDL operations, which is DDLRecovery
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index b038e2b..84161e2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -166,7 +166,7 @@
 
         if (isNewUniverse) {
             //Do checkpoint only if it is new universe
-            runtimeContext.getTransactionSubsystem().getRecoveryManager().checkpoint();
+            runtimeContext.getTransactionSubsystem().getRecoveryManager().checkpoint(false);
             MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             try {
                 // Begin a transaction against the metadata.
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/exception/ACIDException.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/exception/ACIDException.java
index 339e349..2bc9935 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/exception/ACIDException.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/exception/ACIDException.java
@@ -56,5 +56,9 @@
     public ACIDException(String message) {
         super(message);
     }
+    
+    public ACIDException(Throwable cause) {
+        super(cause);
+    }
 
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java
index 587404d..74f39ad 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -22,7 +22,6 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
@@ -37,12 +36,12 @@
     }
 
     @Override
-    public void beforeOperation(ILSMIOOperation operation) {
+    public void beforeOperation() {
         // Do nothing.
     }
 
     @Override
-    public void afterFinalize(ILSMIOOperation operation, ILSMComponent newComponent) {
+    public void afterFinalize(ILSMComponent newComponent) {
         opTracker.resetLSNs();
     }
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java
index 10e3805..fb444c3 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -22,7 +22,6 @@
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeImmutableComponent;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 
 public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
 
@@ -31,7 +30,7 @@
     }
 
     @Override
-    public void afterOperation(ILSMIOOperation operation, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+    public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
             throws HyracksDataException {
         LSMBTreeImmutableComponent btreeComponent = (LSMBTreeImmutableComponent) newComponent;
         putLSNIntoMetadata(btreeComponent.getBTree(), oldComponents);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 53cdfd1..56d4f80 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -20,7 +20,6 @@
 import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexImmutableComponent;
 
 public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
@@ -30,7 +29,7 @@
     }
 
     @Override
-    public void afterOperation(ILSMIOOperation operation, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+    public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
             throws HyracksDataException {
         LSMInvertedIndexImmutableComponent invIndexComponent = (LSMInvertedIndexImmutableComponent) newComponent;
         putLSNIntoMetadata(invIndexComponent.getDeletedKeysBTree(), oldComponents);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java
index 62da7e7..df1d945 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -20,7 +20,6 @@
 import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeImmutableComponent;
 
 public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
@@ -30,7 +29,7 @@
     }
 
     @Override
-    public void afterOperation(ILSMIOOperation operation, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+    public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
             throws HyracksDataException {
         LSMRTreeImmutableComponent rtreeComponent = (LSMRTreeImmutableComponent) newComponent;
         putLSNIntoMetadata(rtreeComponent.getRTree(), oldComponents);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/IRecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/IRecoveryManager.java
index 5328091..ed77629 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/IRecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/IRecoveryManager.java
@@ -68,5 +68,5 @@
      */
     public void rollbackTransaction(TransactionContext txnContext) throws ACIDException;
 
-    public void checkpoint() throws ACIDException;
+    public void checkpoint(boolean isSharpCheckpoint) throws ACIDException;
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index e048b3a..a894f9e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -27,6 +27,7 @@
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -61,9 +62,12 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeImmutableComponent;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexImmutableComponent;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeImmutableComponent;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
@@ -131,10 +135,6 @@
         }
     }
 
-    private PhysicalLogLocator getBeginRecoveryLSN() throws ACIDException {
-        return new PhysicalLogLocator(0, txnSubsystem.getLogManager());
-    }
-
     public void startRecovery(boolean synchronous) throws IOException, ACIDException {
 
         state = SystemState.RECOVERING;
@@ -382,7 +382,7 @@
     }
 
     @Override
-    public void checkpoint() throws ACIDException {
+    public void checkpoint(boolean isSharpCheckpoint) throws ACIDException {
 
         LogManager logMgr = (LogManager) txnSubsystem.getLogManager();
         TransactionManager txnMgr = (TransactionManager) txnSubsystem.getTransactionManager();
@@ -392,20 +392,53 @@
         //   right after the new checkpoint file is written.
         File[] prevCheckpointFiles = getPreviousCheckpointFiles();
 
-        //#. create and store the checkpointObject into the new checkpoint file
-        long minMCTFirstLSM = Long.MAX_VALUE;
-
-        //#. get indexLifeCycleManager
         IIndexLifecycleManager indexLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getIndexLifecycleManager();
         List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
+        List<BlockingIOOperationCallbackWrapper> callbackList = new LinkedList<BlockingIOOperationCallbackWrapper>();
+
+        //#. flush all in-memory components if it is the sharp checkpoint
+        if (isSharpCheckpoint) {
+            for (IIndex index : openIndexList) {
+                ILSMIndex lsmIndex = (ILSMIndex) index;
+                ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
+                        NoOpOperationCallback.INSTANCE);
+                IndexOperationTracker indexOpTracker = (IndexOperationTracker) lsmIndex.getOperationTracker();
+                BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(
+                        indexOpTracker.getIOOperationCallback());
+                callbackList.add(cb);
+                try {
+                    indexAccessor.scheduleFlush(cb);
+                } catch (HyracksDataException e) {
+                    throw new ACIDException(e);
+                }
+            }
+
+            for (BlockingIOOperationCallbackWrapper cb : callbackList) {
+                try {
+                    cb.waitForIO();
+                } catch (InterruptedException e) {
+                    throw new ACIDException(e);
+                }
+            }
+        }
+        
+        //TODO
+        //think about discarding all existing logs. 
+
+        //#. create and store the checkpointObject into the new checkpoint file
+        long minMCTFirstLSN = Long.MAX_VALUE;
         long firstLSN;
-        for (IIndex index : openIndexList) {
-            firstLSN = ((IndexOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
-            minMCTFirstLSM = Math.min(minMCTFirstLSM, firstLSN);
+        if (openIndexList.size() > 0) {
+            for (IIndex index : openIndexList) {
+                firstLSN = ((IndexOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
+                minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
+            }
+        } else {
+            minMCTFirstLSN = -1;
         }
 
-        CheckpointObject checkpointObject = new CheckpointObject(logMgr.getCurrentLsn().get(), minMCTFirstLSM,
+        CheckpointObject checkpointObject = new CheckpointObject(logMgr.getCurrentLsn().get(), minMCTFirstLSN,
                 txnMgr.getMaxJobId(), System.currentTimeMillis());
 
         FileOutputStream fos = null;