changes to add sharp checkpoint
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1261 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
index 201a593..51b61d7 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
@@ -75,7 +75,7 @@
recoveryMgr.startRecovery(true);
}
}
- recoveryMgr.checkpoint();
+ recoveryMgr.checkpoint(false);
if (isMetadataNode) {
//#. clean-up incomplete DDL operations, which is DDLRecovery
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index b038e2b..84161e2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -166,7 +166,7 @@
if (isNewUniverse) {
//Do checkpoint only if it is new universe
- runtimeContext.getTransactionSubsystem().getRecoveryManager().checkpoint();
+ runtimeContext.getTransactionSubsystem().getRecoveryManager().checkpoint(false);
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
try {
// Begin a transaction against the metadata.
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/exception/ACIDException.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/exception/ACIDException.java
index 339e349..2bc9935 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/exception/ACIDException.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/exception/ACIDException.java
@@ -56,5 +56,9 @@
public ACIDException(String message) {
super(message);
}
+
+ public ACIDException(Throwable cause) {
+ super(cause);
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java
index 587404d..74f39ad 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -22,7 +22,6 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
@@ -37,12 +36,12 @@
}
@Override
- public void beforeOperation(ILSMIOOperation operation) {
+ public void beforeOperation() {
// Do nothing.
}
@Override
- public void afterFinalize(ILSMIOOperation operation, ILSMComponent newComponent) {
+ public void afterFinalize(ILSMComponent newComponent) {
opTracker.resetLSNs();
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java
index 10e3805..fb444c3 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -22,7 +22,6 @@
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeImmutableComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
@@ -31,7 +30,7 @@
}
@Override
- public void afterOperation(ILSMIOOperation operation, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+ public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
throws HyracksDataException {
LSMBTreeImmutableComponent btreeComponent = (LSMBTreeImmutableComponent) newComponent;
putLSNIntoMetadata(btreeComponent.getBTree(), oldComponents);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 53cdfd1..56d4f80 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -20,7 +20,6 @@
import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexImmutableComponent;
public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
@@ -30,7 +29,7 @@
}
@Override
- public void afterOperation(ILSMIOOperation operation, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+ public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
throws HyracksDataException {
LSMInvertedIndexImmutableComponent invIndexComponent = (LSMInvertedIndexImmutableComponent) newComponent;
putLSNIntoMetadata(invIndexComponent.getDeletedKeysBTree(), oldComponents);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java
index 62da7e7..df1d945 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -20,7 +20,6 @@
import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeImmutableComponent;
public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
@@ -30,7 +29,7 @@
}
@Override
- public void afterOperation(ILSMIOOperation operation, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+ public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
throws HyracksDataException {
LSMRTreeImmutableComponent rtreeComponent = (LSMRTreeImmutableComponent) newComponent;
putLSNIntoMetadata(rtreeComponent.getRTree(), oldComponents);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/IRecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/IRecoveryManager.java
index 5328091..ed77629 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/IRecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/IRecoveryManager.java
@@ -68,5 +68,5 @@
*/
public void rollbackTransaction(TransactionContext txnContext) throws ACIDException;
- public void checkpoint() throws ACIDException;
+ public void checkpoint(boolean isSharpCheckpoint) throws ACIDException;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index e048b3a..a894f9e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -27,6 +27,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -61,9 +62,12 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeImmutableComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexImmutableComponent;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeImmutableComponent;
import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
@@ -131,10 +135,6 @@
}
}
- private PhysicalLogLocator getBeginRecoveryLSN() throws ACIDException {
- return new PhysicalLogLocator(0, txnSubsystem.getLogManager());
- }
-
public void startRecovery(boolean synchronous) throws IOException, ACIDException {
state = SystemState.RECOVERING;
@@ -382,7 +382,7 @@
}
@Override
- public void checkpoint() throws ACIDException {
+ public void checkpoint(boolean isSharpCheckpoint) throws ACIDException {
LogManager logMgr = (LogManager) txnSubsystem.getLogManager();
TransactionManager txnMgr = (TransactionManager) txnSubsystem.getTransactionManager();
@@ -392,20 +392,53 @@
// right after the new checkpoint file is written.
File[] prevCheckpointFiles = getPreviousCheckpointFiles();
- //#. create and store the checkpointObject into the new checkpoint file
- long minMCTFirstLSM = Long.MAX_VALUE;
-
- //#. get indexLifeCycleManager
IIndexLifecycleManager indexLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getIndexLifecycleManager();
List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
+ List<BlockingIOOperationCallbackWrapper> callbackList = new LinkedList<BlockingIOOperationCallbackWrapper>();
+
+ //#. flush all in-memory components if it is the sharp checkpoint
+ if (isSharpCheckpoint) {
+ for (IIndex index : openIndexList) {
+ ILSMIndex lsmIndex = (ILSMIndex) index;
+ ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ IndexOperationTracker indexOpTracker = (IndexOperationTracker) lsmIndex.getOperationTracker();
+ BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(
+ indexOpTracker.getIOOperationCallback());
+ callbackList.add(cb);
+ try {
+ indexAccessor.scheduleFlush(cb);
+ } catch (HyracksDataException e) {
+ throw new ACIDException(e);
+ }
+ }
+
+ for (BlockingIOOperationCallbackWrapper cb : callbackList) {
+ try {
+ cb.waitForIO();
+ } catch (InterruptedException e) {
+ throw new ACIDException(e);
+ }
+ }
+ }
+
+ //TODO
+ //think about discarding all existing logs.
+
+ //#. create and store the checkpointObject into the new checkpoint file
+ long minMCTFirstLSN = Long.MAX_VALUE;
long firstLSN;
- for (IIndex index : openIndexList) {
- firstLSN = ((IndexOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
- minMCTFirstLSM = Math.min(minMCTFirstLSM, firstLSN);
+ if (openIndexList.size() > 0) {
+ for (IIndex index : openIndexList) {
+ firstLSN = ((IndexOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
+ minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
+ }
+ } else {
+ minMCTFirstLSN = -1;
}
- CheckpointObject checkpointObject = new CheckpointObject(logMgr.getCurrentLsn().get(), minMCTFirstLSM,
+ CheckpointObject checkpointObject = new CheckpointObject(logMgr.getCurrentLsn().get(), minMCTFirstLSN,
txnMgr.getMaxJobId(), System.currentTimeMillis());
FileOutputStream fos = null;