Added flush logs and enabled non-sharp checkpoints
The following commits from your working branch will be included:
commit c11a22cfd916041c806409c6d8e66dd6ea162b7b
Author: hubailmor@gmail.com <mhubail@uci.edu>
Date: Tue Nov 18 23:12:19 2014 -0800
Defined Startup LSN
commit 95a8752072e53ef62a11c39799f296209a3c622c
Author: hubailmor@gmail.com <mhubail@uci.edu>
Date: Tue Nov 18 20:22:08 2014 -0800
Revised Checkpoint Logic
commit ea4baa22089ba5d3eaff3d5c787828efcbff8ad9
Author: hubailmor@gmail.com <mhubail@uci.edu>
Date: Thu Nov 13 10:25:45 2014 -0800
added comments
commit 8991554f840f50d800a619fc2ac327f1c024e838
Author: hubailmor@gmail.com <mhubail@uci.edu>
Date: Wed Nov 12 23:34:25 2014 -0800
Added a method to the interface
commit a31e36b0b8234fe876b42e0cdc4543e4e73314a1
Author: hubailmor@gmail.com <mhubail@uci.edu>
Date: Wed Nov 12 22:57:51 2014 -0800
Refactored getMinFirstLSN code
commit 25f62d585b2f279e78711efd2d327c777919f62c
Author: hubailmor@gmail.com <mhubail@uci.edu>
Date: Tue Nov 11 14:05:34 2014 -0800
Added flush logs
Change-Id: I13fb61c04b6b510b6e8af68d2a0e7e14859f519d
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/182
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
index 32bfa58..af83b83 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
@@ -23,7 +23,7 @@
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index 8c6ee8f..6409a13 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -38,6 +38,7 @@
import edu.uci.ics.asterix.metadata.feeds.FeedManager;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -135,12 +136,15 @@
localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory
.createRepository();
resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
- indexLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
- MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID);
+
IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProdiverForRecovery(
this);
txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider,
txnProperties);
+
+ indexLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
+ MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID,(LogManager)txnSubsystem.getLogManager());
+
isShuttingdown = false;
feedManager = new FeedManager(ncApplicationContext.getNodeId());
@@ -148,9 +152,9 @@
// The order of registration is important. The buffer cache must registered before recovery and transaction managers.
ILifeCycleComponentManager lccm = ncApplicationContext.getLifeCycleComponentManager();
lccm.register((ILifeCycleComponent) bufferCache);
- lccm.register((ILifeCycleComponent) indexLifecycleManager);
lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
lccm.register((ILifeCycleComponent) txnSubsystem.getLogManager());
+ lccm.register((ILifeCycleComponent) indexLifecycleManager);
lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 1f0c91d..295fac7 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -51,7 +51,8 @@
private boolean isMetadataNode = false;
private boolean stopInitiated = false;
private SystemState systemState = SystemState.NEW_UNIVERSE;
-
+ private final long NON_SHARP_CHECKPOINT_TARGET_LSN = -1;
+
@Override
public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
ncAppCtx.setThreadFactory(new AsterixThreadFactory(ncAppCtx.getLifeCycleComponentManager()));
@@ -101,7 +102,7 @@
}
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
- recoveryMgr.checkpoint(true);
+ recoveryMgr.checkpoint(true, NON_SHARP_CHECKPOINT_TARGET_LSN);
if (isMetadataNode) {
MetadataBootstrap.stopUniverse();
@@ -183,7 +184,7 @@
lccm.startAll();
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
- recoveryMgr.checkpoint(true);
+ recoveryMgr.checkpoint(true, NON_SHARP_CHECKPOINT_TARGET_LSN);
if (isMetadataNode) {
IMetadataNode stub = null;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
index 63f4c3c..379a7eb 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -23,8 +23,13 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import edu.uci.ics.asterix.common.transactions.ILogManager;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
@@ -34,6 +39,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
@@ -49,9 +55,12 @@
private final int firstAvilableUserDatasetID;
private final long capacity;
private long used;
+ private final ILogManager logManager;
+ private LogRecord logRecord;
public DatasetLifecycleManager(AsterixStorageProperties storageProperties,
- ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID) {
+ ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID, ILogManager logManager) {
+ this.logManager = logManager;
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
this.firstAvilableUserDatasetID = firstAvilableUserDatasetID;
@@ -60,6 +69,7 @@
datasetInfos = new HashMap<Integer, DatasetInfo>();
capacity = storageProperties.getMemoryComponentGlobalBudget();
used = 0;
+ logRecord = new LogRecord();
}
@Override
@@ -80,7 +90,7 @@
int did = getDIDfromRID(resourceID);
DatasetInfo dsInfo = datasetInfos.get(did);
if (dsInfo == null) {
- dsInfo = new DatasetInfo(did,!index.hasMemoryComponents());
+ dsInfo = new DatasetInfo(did, !index.hasMemoryComponents());
} else if (dsInfo.indexes.containsKey(resourceID)) {
throw new HyracksDataException("Index with resource ID " + resourceID + " already exists.");
}
@@ -286,7 +296,7 @@
synchronized (datasetOpTrackers) {
ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
if (opTracker == null) {
- opTracker = new PrimaryIndexOperationTracker(this, datasetID);
+ opTracker = new PrimaryIndexOperationTracker(this, datasetID, logManager);
datasetOpTrackers.put(datasetID, opTracker);
}
return opTracker;
@@ -338,7 +348,7 @@
private final int datasetID;
private long lastAccess;
private int numActiveIOOps;
- private final boolean isExternal;
+ private final boolean isExternal;
public DatasetInfo(int datasetID, boolean isExternal) {
this.indexes = new HashMap<Long, IndexInfo>();
@@ -410,6 +420,81 @@
used = 0;
}
+ public synchronized void flushAllDatasets() throws HyracksDataException {
+ for (DatasetInfo dsInfo : datasetInfos.values()) {
+ flushDatasetOpenIndexes(dsInfo, false);
+ }
+ }
+
+ public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException {
+
+ List<DatasetInfo> laggingDatasets = new ArrayList<DatasetInfo>();
+ long firstLSN;
+ //find dataset with min lsn < targetLSN
+ for (DatasetInfo dsInfo : datasetInfos.values()) {
+ for (IndexInfo iInfo : dsInfo.indexes.values()) {
+ AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) iInfo.index)
+ .getIOOperationCallback();
+ if (!((AbstractLSMIndex) iInfo.index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
+ firstLSN = ioCallback.getFirstLSN();
+
+ if (firstLSN < targetLSN) {
+ laggingDatasets.add(dsInfo);
+ break;
+ }
+ }
+ }
+ }
+
+ //schedule a sync flush
+ for (DatasetInfo dsInfo : laggingDatasets) {
+ flushDatasetOpenIndexes(dsInfo, true);
+ }
+
+ }
+
+ private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException {
+ if (!dsInfo.isExternal) {
+ synchronized (logRecord) {
+ logRecord.formFlushLogRecord(dsInfo.datasetID, null);
+ try {
+ logManager.log(logRecord);
+ } catch (ACIDException e) {
+ throw new HyracksDataException("could not write flush log while closing dataset", e);
+ }
+ try {
+ logRecord.wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ for (IndexInfo iInfo : dsInfo.indexes.values()) {
+ //update resource lsn
+ AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) iInfo.index
+ .getIOOperationCallback();
+ ioOpCallback.updateLastLSN(logRecord.getLSN());
+ }
+ }
+
+ if (asyncFlush) {
+
+ for (IndexInfo iInfo : dsInfo.indexes.values()) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) iInfo.index.createAccessor(
+ NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
+ }
+ } else {
+
+ for (IndexInfo iInfo : dsInfo.indexes.values()) {
+ // TODO: This is not efficient since we flush the indexes sequentially.
+ // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this
+ // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
+
+ flushAndWaitForIO(dsInfo, iInfo);
+ }
+ }
+ }
+
private void closeDataset(DatasetInfo dsInfo) throws HyracksDataException {
// First wait for any ongoing IO operations
while (dsInfo.numActiveIOOps > 0) {
@@ -420,11 +505,10 @@
}
}
- for (IndexInfo iInfo : dsInfo.indexes.values()) {
- // TODO: This is not efficient since we flush the indexes sequentially.
- // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this
- // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
- flushAndWaitForIO(dsInfo, iInfo);
+ try {
+ flushDatasetOpenIndexes(dsInfo, false);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
}
for (IndexInfo iInfo : dsInfo.indexes.values()) {
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index ef58937..5f15bf3 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -18,7 +18,11 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
+import edu.uci.ics.asterix.common.transactions.ILogManager;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
@@ -32,9 +36,12 @@
// Number of active operations on an ILSMIndex instance.
private final AtomicInteger numActiveOperations;
+ private final ILogManager logManager;
- public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID) {
+ public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID,
+ ILogManager logManager) {
super(datasetLifecycleManager, datasetID);
+ this.logManager = logManager;
this.numActiveOperations = new AtomicInteger();
}
@@ -62,7 +69,9 @@
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
decrementNumActiveOperations(modificationCallback);
- flushIfFull();
+ if (numActiveOperations.get() == 0) {
+ flushIfFull();
+ }
} else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
datasetLifecycleManager.undeclareActiveIOOperation(datasetID);
}
@@ -71,6 +80,7 @@
private void flushIfFull() throws HyracksDataException {
// If we need a flush, and this is the last completing operation, then schedule the flush.
// TODO: Is there a better way to check if we need to flush instead of communicating with the datasetLifecycleManager each time?
+ // Maybe we could keep the set of indexes here instead of consulting the datasetLifecycleManager each time?
Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetIndexes(datasetID);
boolean needsFlush = false;
for (ILSMIndex lsmIndex : indexes) {
@@ -79,13 +89,34 @@
break;
}
}
+
+ if (needsFlush) {
+
+ LogRecord logRecord = new LogRecord();
+ logRecord.formFlushLogRecord(datasetID, this);
+ try {
+ logManager.log(logRecord);
+ } catch (ACIDException e) {
+ throw new HyracksDataException("could not write flush log", e);
+ }
+ }
+ }
+
+ public void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
+ // why synchronized ??
synchronized (this) {
- if (needsFlush && numActiveOperations.get() == 0) {
- for (ILSMIndex lsmIndex : indexes) {
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
- NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
- }
+ for (ILSMIndex lsmIndex : datasetLifecycleManager.getDatasetIndexes(datasetID)) {
+ //get resource
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
+ NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+
+ //update resource lsn
+ AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
+ .getIOOperationCallback();
+ ioOpCallback.updateLastLSN(logRecord.getLSN());
+
+ //schedule flush after update
+ accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
}
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 26a79c3..4673266 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -16,14 +16,23 @@
import java.nio.ByteBuffer;
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import edu.uci.ics.asterix.common.transactions.ILogManager;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
@@ -37,6 +46,43 @@
}
@Override
+ public void open() throws HyracksDataException {
+ RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+ writeBuffer = ctx.allocateFrame();
+ writer.open();
+ indexHelper.open();
+ AbstractLSMIndex lsmIndex = (AbstractLSMIndex) indexHelper.getIndexInstance();
+ try {
+ modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
+ indexHelper.getResourceID(), lsmIndex, ctx);
+ indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
+ ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
+ if (tupleFilterFactory != null) {
+ tupleFilter = tupleFilterFactory.createTupleFilter(indexHelper.getTaskContext());
+ frameTuple = new FrameTupleReference();
+ }
+ // If the index has an empty memory component, we need to set its first LSN (For soft checkpoint)
+ if (lsmIndex.isCurrentMutableComponentEmpty()) {
+ //prevent transactions from incorrectly setting the first LSN on a modified component
+ synchronized (lsmIndex.getOperationTracker()) {
+ if (lsmIndex.isCurrentMutableComponentEmpty()) {
+ AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
+ .getIOOperationCallback();
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
+ ILogManager logManager = runtimeCtx.getTransactionSubsystem().getLogManager();
+ ioOpCallback.setFirstLSN(logManager.getAppendLSN());
+ }
+ }
+ }
+ } catch (Exception e) {
+ indexHelper.close();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
boolean first = true;
accessor.reset(buffer);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index d22280e..ec56f3d 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -17,6 +17,7 @@
import java.util.List;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
@@ -27,21 +28,26 @@
import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+// A single LSMIOOperationCallback per LSM index used to perform actions around Flush and Merge operations
public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback {
- protected long firstLSN;
- protected long lastLSN;
- protected long[] immutableLastLSNs;
+ // First LSN per mutable component
+ protected long[] firstLSNs;
+ // A boolean array to keep track of flush operations
+ protected boolean[] flushRequested;
+ // I think this was meant to be mutableLastLSNs
+ // protected long[] immutableLastLSNs;
+ protected long[] mutableLastLSNs;
+ // Index of the currently flushing or next to be flushed component
protected int readIndex;
+ // Index of the currently being written to component
protected int writeIndex;
- public AbstractLSMIOOperationCallback() {
- resetLSNs();
- }
-
@Override
public void setNumOfMutableComponents(int count) {
- immutableLastLSNs = new long[count];
+ mutableLastLSNs = new long[count];
+ firstLSNs = new long[count];
+ flushRequested = new boolean[count];
readIndex = 0;
writeIndex = 0;
}
@@ -49,17 +55,36 @@
@Override
public void beforeOperation(LSMOperationType opType) {
if (opType == LSMOperationType.FLUSH) {
+ /*
+ * This method was called on the scheduleFlush operation.
+ * We set the lastLSN to the last LSN for the index (the LSN for the flush log)
+ * We mark the component flushing flag
+ * We then move the write pointer to the next component and sets its first LSN to the flush log LSN
+ */
synchronized (this) {
- immutableLastLSNs[writeIndex] = lastLSN;
- writeIndex = (writeIndex + 1) % immutableLastLSNs.length;
- resetLSNs();
+ flushRequested[writeIndex] = true;
+ writeIndex = (writeIndex + 1) % mutableLastLSNs.length;
+ // Set the firstLSN of the next component unless it is being flushed
+ if (writeIndex != readIndex) {
+ firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
+ }
}
}
}
@Override
public void afterFinalize(LSMOperationType opType, ILSMComponent newComponent) {
- // Do nothing.
+ // The operation was complete and the next I/O operation for the LSM index didn't start yet
+ if (opType == LSMOperationType.FLUSH && newComponent != null) {
+ synchronized (this) {
+ flushRequested[readIndex] = false;
+ // if the component which just finished flushing is the component that will be modified next, we set its first LSN to its previous LSN
+ if (readIndex == writeIndex) {
+ firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
+ }
+ readIndex = (readIndex + 1) % mutableLastLSNs.length;
+ }
+ }
}
public abstract long getComponentLSN(List<ILSMComponent> oldComponents) throws HyracksDataException;
@@ -98,24 +123,28 @@
}
}
- protected void resetLSNs() {
- firstLSN = -1;
- lastLSN = -1;
- }
-
public void updateLastLSN(long lastLSN) {
- if (firstLSN == -1) {
- firstLSN = lastLSN;
+ mutableLastLSNs[writeIndex] = lastLSN;
+ }
+
+ public void setFirstLSN(long firstLSN) throws AsterixException {
+ // We make sure that this method is only called on an empty component so the first LSN is not set incorrectly
+ firstLSNs[writeIndex] = firstLSN;
+ }
+
+ public synchronized long getFirstLSN() {
+ // We make sure that this method is only called on a non-empty component so the returned LSN is meaningful
+ // The firstLSN is always the lsn of the currently being flushed component or the next to be flushed when no flush operation is on going
+ return firstLSNs[readIndex];
+ }
+
+ public synchronized boolean hasPendingFlush(){
+
+ for(int i=0; i<flushRequested.length; i++){
+ if(flushRequested[i]){
+ return true;
+ }
}
- this.lastLSN = Math.max(this.lastLSN, lastLSN);
+ return false;
}
-
- public long getFirstLSN() {
- return firstLSN;
- }
-
- public long getLastLSN() {
- return lastLSN;
- }
-
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index 8e9b44e..5d21653 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -41,10 +41,10 @@
@Override
public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
if (diskComponents == null) {
- // Implies a flush IO operation.
+ // Implies a flush IO operation. --> moves the flush pointer
+ // Flush operation of an LSM index are executed sequentially.
synchronized (this) {
- long lsn = immutableLastLSNs[readIndex];
- readIndex = (readIndex + 1) % immutableLastLSNs.length;
+ long lsn = mutableLastLSNs[readIndex];
return lsn;
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
index 57f8658..dbad7ef 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
@@ -36,10 +36,9 @@
@Override
public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
if (diskComponents == null) {
- // Implies a flush IO operation.
+ // Implies a flush IO operation <Will never happen currently as Btree with buddy btree is only used with external datasets>
synchronized (this) {
- long lsn = immutableLastLSNs[readIndex];
- readIndex = (readIndex + 1) % immutableLastLSNs.length;
+ long lsn = mutableLastLSNs[readIndex];
return lsn;
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 5532f97..62b928e 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -42,8 +42,7 @@
if (diskComponents == null) {
// Implies a flush IO operation.
synchronized (this) {
- long lsn = immutableLastLSNs[readIndex];
- readIndex = (readIndex + 1) % immutableLastLSNs.length;
+ long lsn = mutableLastLSNs[readIndex];
return lsn;
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index 1497e17..308460e 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -43,8 +43,7 @@
if (diskComponents == null) {
// Implies a flush IO operation.
synchronized (this) {
- long lsn = immutableLastLSNs[readIndex];
- readIndex = (readIndex + 1) % immutableLastLSNs.length;
+ long lsn = mutableLastLSNs[readIndex];
return lsn;
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
index 27a91a4..69155ac 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
@@ -18,8 +18,36 @@
public interface ILogManager {
+ /**
+ * Submits a logRecord to log Manager which appends it to the log tail
+ * @param logRecord
+ * @throws ACIDException
+ */
public void log(ILogRecord logRecord) throws ACIDException;
+ /**
+ *
+ * @param isRecoveryMode
+ * @returnLogReader instance which enables reading the log files
+ */
public ILogReader getLogReader(boolean isRecoveryMode);
+
+ /**
+ *
+ * @return the last LSN the log manager used
+ */
+ public long getAppendLSN();
+
+ /**
+ * Deletes all log partitions which have a maximum LSN less than checkpointLSN
+ * @param checkpointLSN
+ */
+ public void deleteOldLogFiles(long checkpointLSN);
+
+ /**
+ *
+ * @return the smallest readable LSN on the current log partitions
+ */
+ public long getReadableSmallestLSN();
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
index b9baa3b..89aabd7 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
@@ -23,7 +23,8 @@
public static final int JOB_TERMINATE_LOG_SIZE = 13; //JOB_COMMIT or ABORT log type
public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 25;
public static final int UPDATE_LOG_BASE_SIZE = 54;
-
+ public static final int FLUSH_LOG_SIZE = 17;
+
public boolean readLogRecord(ByteBuffer buffer);
public void writeLogRecord(ByteBuffer buffer);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IRecoveryManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IRecoveryManager.java
index 6af5ebf..9c6d0b4 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IRecoveryManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IRecoveryManager.java
@@ -75,5 +75,15 @@
*/
public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException;
- public void checkpoint(boolean isSharpCheckpoint) throws ACIDException, HyracksDataException;
+ /**
+ * Makes a system checkpoint.
+ * @param isSharpCheckpoint a flag indicating whether to perform a sharp or non-sharp checkpoint.
+ * @param nonSharpCheckpointTargetLSN if a non-sharp checkpoint to be performed, what is the minimum LSN it should target.
+ * @return the LSN at which the checkpoint was performed.
+ * @throws ACIDException
+ * @throws HyracksDataException
+ */
+ public long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN) throws ACIDException, HyracksDataException;
+
+
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
index dc33e69..56bb3e9 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
@@ -35,7 +35,7 @@
public long getLastLSN();
- public void setLastLSN(long resourceId, long LSN);
+ public void setLastLSN(long LSN);
public boolean isWriteTxn();
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionSubsystem.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionSubsystem.java
index 9b1cbb6..6eb01c9 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionSubsystem.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionSubsystem.java
@@ -28,4 +28,5 @@
public IAsterixAppRuntimeContextProvider getAsterixAppRuntimeContextProvider();
public String getId();
+
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogRecord.java
similarity index 86%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogRecord.java
index 1337e33..2a53712 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogRecord.java
@@ -12,18 +12,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.transaction.management.service.logging;
+package edu.uci.ics.asterix.common.transactions;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.CRC32;
-import edu.uci.ics.asterix.common.transactions.ILogRecord;
-import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType;
-import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.context.PrimaryIndexOperationTracker;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.SimpleTupleReference;
import edu.uci.ics.hyracks.storage.am.common.tuples.SimpleTupleWriter;
@@ -59,6 +56,7 @@
* 2) ENTITY_COMMIT: 25 + PKSize (5 + 12 + PKSize + 8)
* --> ENTITY_COMMIT_LOG_BASE_SIZE = 25
* 3) UPDATE: 54 + PKValueSize + NewValueSize (5 + 12 + PKValueSize + 20 + 9 + NewValueSize + 8)
+ * 4) FLUSH: 5 + 8 + DatasetId(4)
* --> UPDATE_LOG_BASE_SIZE = 54
*/
public class LogRecord implements ILogRecord {
@@ -90,6 +88,7 @@
private final SimpleTupleReference readNewValue;
private final CRC32 checksumGen;
private int[] PKFields;
+ private PrimaryIndexOperationTracker opTracker;
public LogRecord() {
isFlushed = new AtomicBoolean(false);
@@ -122,6 +121,11 @@
buffer.putInt(newValueSize);
writeTuple(buffer, newValue, newValueSize);
}
+
+ if (logType == LogType.FLUSH) {
+ buffer.putInt(datasetId);
+ }
+
checksum = generateChecksum(buffer, beginOffset, logSize - CHECKSUM_SIZE);
buffer.putLong(checksum);
}
@@ -151,29 +155,38 @@
try {
logType = buffer.get();
jobId = buffer.getInt();
- if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
- datasetId = -1;
- PKHashValue = -1;
- } else {
- datasetId = buffer.getInt();
- PKHashValue = buffer.getInt();
- PKValueSize = buffer.getInt();
- if (PKValueSize <= 0) {
- throw new IllegalStateException("Primary Key Size is less than or equal to 0");
+ if(logType != LogType.FLUSH)
+ {
+ if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
+ datasetId = -1;
+ PKHashValue = -1;
+ } else {
+ datasetId = buffer.getInt();
+ PKHashValue = buffer.getInt();
+ PKValueSize = buffer.getInt();
+ if (PKValueSize <= 0) {
+ throw new IllegalStateException("Primary Key Size is less than or equal to 0");
+ }
+ PKValue = readPKValue(buffer);
}
- PKValue = readPKValue(buffer);
+ if (logType == LogType.UPDATE) {
+ prevLSN = buffer.getLong();
+ resourceId = buffer.getLong();
+ logSize = buffer.getInt();
+ fieldCnt = buffer.getInt();
+ newOp = buffer.get();
+ newValueSize = buffer.getInt();
+ newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
+ } else {
+ computeAndSetLogSize();
+ }
}
- if (logType == LogType.UPDATE) {
- prevLSN = buffer.getLong();
- resourceId = buffer.getLong();
- logSize = buffer.getInt();
- fieldCnt = buffer.getInt();
- newOp = buffer.get();
- newValueSize = buffer.getInt();
- newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
- } else {
+ else{
computeAndSetLogSize();
+ datasetId = buffer.getInt();
+ resourceId = 0l;
}
+
checksum = buffer.getLong();
if (checksum != generateChecksum(buffer, beginOffset, logSize - CHECKSUM_SIZE)) {
throw new IllegalStateException();
@@ -213,6 +226,14 @@
this.PKHashValue = -1;
computeAndSetLogSize();
}
+
+ public void formFlushLogRecord(int datasetId, PrimaryIndexOperationTracker opTracker) {
+ this.logType = LogType.FLUSH;
+ this.jobId = -1;
+ this.datasetId = datasetId;
+ this.opTracker = opTracker;
+ computeAndSetLogSize();
+ }
@Override
public void formEntityCommitLogRecord(ITransactionContext txnCtx, int datasetId, int PKHashValue,
@@ -255,6 +276,9 @@
case LogType.ENTITY_COMMIT:
logSize = ENTITY_COMMIT_LOG_BASE_SIZE + PKValueSize;
break;
+ case LogType.FLUSH:
+ logSize = FLUSH_LOG_SIZE;
+ break;
default:
throw new IllegalStateException("Unsupported Log Type");
}
@@ -445,4 +469,8 @@
public void setPKValue(ITupleReference PKValue) {
this.PKValue = PKValue;
}
+
+ public PrimaryIndexOperationTracker getOpTracker() {
+ return opTracker;
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogType.java
similarity index 87%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogType.java
index 1716cfb..e548c50 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogType.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.transaction.management.service.logging;
+package edu.uci.ics.asterix.common.transactions;
public class LogType {
@@ -20,10 +20,14 @@
public static final byte JOB_COMMIT = 1;
public static final byte ENTITY_COMMIT = 2;
public static final byte ABORT = 3;
+ public static final byte FLUSH = 4;
+
private static final String STRING_UPDATE = "UPDATE";
private static final String STRING_JOB_COMMIT = "JOB_COMMIT";
private static final String STRING_ENTITY_COMMIT = "ENTITY_COMMIT";
private static final String STRING_ABORT = "ABORT";
+ private static final String STRING_FLUSH = "FLUSH";
+
private static final String STRING_INVALID_LOG_TYPE = "INVALID_LOG_TYPE";
public static String toString(byte logType) {
@@ -36,6 +40,8 @@
return STRING_ENTITY_COMMIT;
case LogType.ABORT:
return STRING_ABORT;
+ case LogType.FLUSH:
+ return STRING_FLUSH;
default:
return STRING_INVALID_LOG_TYPE;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/PrimaryKeyTupleReference.java
similarity index 90%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/PrimaryKeyTupleReference.java
index 7acc000..78cfc08 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/PrimaryKeyTupleReference.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.asterix.transaction.management.service.logging;
+package edu.uci.ics.asterix.common.transactions;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index cba690d..ef584fe 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -18,11 +18,11 @@
import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
import edu.uci.ics.asterix.common.transactions.ILockManager;
import edu.uci.ics.asterix.common.transactions.ILogRecord;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
+import edu.uci.ics.asterix.common.transactions.LogType;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.SimpleTupleWriter;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 1b22a34..3720f08 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -31,10 +31,10 @@
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
+import edu.uci.ics.asterix.common.transactions.LogType;
import edu.uci.ics.asterix.transaction.management.service.logging.LogPage;
import edu.uci.ics.asterix.transaction.management.service.logging.LogPageReader;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 7292f82..baaa87a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -29,6 +29,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -39,6 +40,7 @@
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.LogManagerProperties;
+import edu.uci.ics.asterix.common.transactions.LogType;
import edu.uci.ics.asterix.common.transactions.MutableLong;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
@@ -58,7 +60,7 @@
private final MutableLong flushLSN;
private LinkedBlockingQueue<LogPage> emptyQ;
private LinkedBlockingQueue<LogPage> flushQ;
- private long appendLSN;
+ private final AtomicLong appendLSN;
private FileChannel appendChannel;
private LogPage appendPage;
private LogFlusher logFlusher;
@@ -74,6 +76,7 @@
logDir = logManagerProperties.getLogDir();
logFilePrefix = logManagerProperties.getLogFilePrefix();
flushLSN = new MutableLong();
+ appendLSN = new AtomicLong();
initializeLogManager(0);
}
@@ -83,23 +86,25 @@
for (int i = 0; i < numLogPages; i++) {
emptyQ.offer(new LogPage(txnSubsystem, logPageSize, flushLSN));
}
- appendLSN = initializeLogAnchor(nextLogFileId);
- flushLSN.set(appendLSN);
+ appendLSN.set(initializeLogAnchor(nextLogFileId));
+ flushLSN.set(appendLSN.get());
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("LogManager starts logging in LSN: " + appendLSN);
}
- appendChannel = getFileChannel(appendLSN, false);
+ appendChannel = getFileChannel(appendLSN.get(), false);
getAndInitNewPage();
logFlusher = new LogFlusher(this, emptyQ, flushQ);
futureLogFlusher = txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(logFlusher);
}
-
+
@Override
public void log(ILogRecord logRecord) throws ACIDException {
if (logRecord.getLogSize() > logPageSize) {
throw new IllegalStateException();
}
+
syncLog(logRecord);
+
if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)
&& !logRecord.isFlushed()) {
synchronized (logRecord) {
@@ -115,11 +120,17 @@
}
private synchronized void syncLog(ILogRecord logRecord) throws ACIDException {
- ITransactionContext txnCtx = logRecord.getTxnCtx();
- if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
- throw new ACIDException("Aborted job(" + txnCtx.getJobId() + ") tried to write non-abort type log record.");
+ ITransactionContext txnCtx = null;
+
+ if(logRecord.getLogType() != LogType.FLUSH)
+ {
+ txnCtx = logRecord.getTxnCtx();
+ if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
+ throw new ACIDException("Aborted job(" + txnCtx.getJobId() + ") tried to write non-abort type log record.");
+ }
+
}
- if (getLogFileOffset(appendLSN) + logRecord.getLogSize() > logFileSize) {
+ if (getLogFileOffset(appendLSN.get()) + logRecord.getLogSize() > logFileSize) {
prepareNextLogFile();
appendPage.isFull(true);
getAndInitNewPage();
@@ -130,8 +141,13 @@
if (logRecord.getLogType() == LogType.UPDATE) {
logRecord.setPrevLSN(txnCtx.getLastLSN());
}
- appendPage.append(logRecord, appendLSN);
- appendLSN += logRecord.getLogSize();
+ appendPage.append(logRecord, appendLSN.get());
+
+ if(logRecord.getLogType() == LogType.FLUSH)
+ {
+ logRecord.setLSN(appendLSN.get());
+ }
+ appendLSN.addAndGet(logRecord.getLogSize());
}
private void getAndInitNewPage() {
@@ -149,8 +165,8 @@
}
private void prepareNextLogFile() {
- appendLSN += logFileSize - getLogFileOffset(appendLSN);
- appendChannel = getFileChannel(appendLSN, true);
+ appendLSN.addAndGet(logFileSize - getLogFileOffset(appendLSN.get()));
+ appendChannel = getFileChannel(appendLSN.get(), true);
appendPage.isLastPage(true);
//[Notice]
//the current log file channel is closed if
@@ -170,8 +186,9 @@
return txnSubsystem;
}
+ @Override
public long getAppendLSN() {
- return appendLSN;
+ return appendLSN.get();
}
@Override
@@ -275,6 +292,22 @@
initializeLogManager(lastMaxLogFileId + 1);
}
+ public void deleteOldLogFiles(long checkpointLSN){
+
+ Long checkpointLSNLogFileID = getLogFileId(checkpointLSN);
+ List<Long> logFileIds = getLogFileIds();
+ for (Long id : logFileIds) {
+
+ if(id < checkpointLSNLogFileID)
+ {
+ File file = new File(getLogFilePath(id));
+ if (!file.delete()) {
+ throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath());
+ }
+ }
+ }
+ }
+
private void terminateLogFlusher() {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Terminating LogFlusher thread ...");
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index cec6cd3..f3e8f77 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -21,16 +21,19 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
+import edu.uci.ics.asterix.common.context.PrimaryIndexOperationTracker;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILogPage;
import edu.uci.ics.asterix.common.transactions.ILogRecord;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
+import edu.uci.ics.asterix.common.transactions.LogType;
import edu.uci.ics.asterix.common.transactions.MutableLong;
-import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public class LogPage implements ILogPage {
@@ -48,6 +51,8 @@
private final ByteBuffer unlockBuffer;
private boolean isLastPage;
private final LinkedBlockingQueue<ILogRecord> syncCommitQ;
+ private final LinkedBlockingQueue<ILogRecord> flushQ;
+
private FileChannel fileChannel;
private boolean stop;
private DatasetId reusableDsId;
@@ -66,6 +71,8 @@
flushOffset = 0;
isLastPage = false;
syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
+ flushQ = new LinkedBlockingQueue<ILogRecord>();
+
reusableDsId = new DatasetId(-1);
reusableJobId = new JobId(-1);
}
@@ -77,8 +84,10 @@
@Override
public void append(ILogRecord logRecord, long appendLSN) {
logRecord.writeLogRecord(appendBuffer);
- logRecord.getTxnCtx().setLastLSN(logRecord.getLogType() == LogType.UPDATE ? logRecord.getResourceId() : -1,
- appendLSN);
+ // mhubail Update impacted resource with the flushed lsn
+ if (logRecord.getLogType() != LogType.FLUSH) {
+ logRecord.getTxnCtx().setLastLSN(appendLSN);
+ }
synchronized (this) {
appendOffset += logRecord.getLogSize();
if (IS_DEBUG_MODE) {
@@ -88,6 +97,10 @@
logRecord.isFlushed(false);
syncCommitQ.offer(logRecord);
}
+ if (logRecord.getLogType() == LogType.FLUSH) {
+ logRecord.isFlushed(false);
+ flushQ.offer(logRecord);
+ }
this.notify();
}
}
@@ -205,14 +218,18 @@
reusableJobId.setId(logRecord.getJobId());
txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableJobId, false);
reusableDsId.setId(logRecord.getDatasetId());
- txnSubsystem.getLockManager().unlock(reusableDsId, logRecord.getPKHashValue(), LockMode.ANY, txnCtx);
+ txnSubsystem.getLockManager()
+ .unlock(reusableDsId, logRecord.getPKHashValue(), LockMode.ANY, txnCtx);
txnCtx.notifyOptracker(false);
} else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
reusableJobId.setId(logRecord.getJobId());
txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableJobId, false);
txnCtx.notifyOptracker(true);
notifyJobTerminator();
+ } else if (logRecord.getLogType() == LogType.FLUSH) {
+ notifyFlushTerminator();
}
+
logRecord = logPageReader.next();
}
}
@@ -233,6 +250,27 @@
}
}
+ public void notifyFlushTerminator() throws ACIDException {
+ LogRecord logRecord = null;
+ try {
+ logRecord = (LogRecord) flushQ.take();
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ synchronized (logRecord) {
+ logRecord.isFlushed(true);
+ logRecord.notifyAll();
+ }
+ PrimaryIndexOperationTracker opTracker = logRecord.getOpTracker();
+ if (opTracker != null) {
+ try {
+ opTracker.triggerScheduleFlush(logRecord);
+ } catch (HyracksDataException e) {
+ throw new ACIDException(e);
+ }
+ }
+ }
+
public boolean isStop() {
return stop;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPageReader.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPageReader.java
index 9e54abc..9663a8c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPageReader.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPageReader.java
@@ -16,6 +16,8 @@
import java.nio.ByteBuffer;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
+
public class LogPageReader {
private final ByteBuffer buffer;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
index 778d24b..728c694 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
@@ -22,6 +22,7 @@
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.ILogReader;
import edu.uci.ics.asterix.common.transactions.ILogRecord;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
import edu.uci.ics.asterix.common.transactions.MutableLong;
public class LogReader implements ILogReader {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
index dca14d8..a9d1cf2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -14,49 +14,72 @@
*/
package edu.uci.ics.asterix.transaction.management.service.recovery;
-import java.util.List;
-
import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import edu.uci.ics.asterix.common.transactions.ILogManager;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
public class CheckpointThread extends Thread {
private long lsnThreshold;
private long checkpointTermInSecs;
- private long lastMinMCTFirstLSN = 0;
-
+ private final ILogManager logManager;
private final IRecoveryManager recoveryMgr;
- private final IIndexLifecycleManager indexLifecycleManager;
- public CheckpointThread(IRecoveryManager recoveryMgr, IIndexLifecycleManager indexLifecycleManager,
+ public CheckpointThread(IRecoveryManager recoveryMgr, IIndexLifecycleManager indexLifecycleManager, ILogManager logManager,
long lsnThreshold, long checkpointTermInSecs) {
this.recoveryMgr = recoveryMgr;
- this.indexLifecycleManager = indexLifecycleManager;
+ this.logManager = logManager;
this.lsnThreshold = lsnThreshold;
this.checkpointTermInSecs = checkpointTermInSecs;
}
@Override
public void run() {
- long currentMinMCTFirstLSN = 0;
+ long currentCheckpointAttemptMinLSN = -1;
+ long lastCheckpointLSN = -1;
+ long currentLogLSN = 0;
+ long targetCheckpointLSN = 0;
while (true) {
try {
sleep(checkpointTermInSecs * 1000);
} catch (InterruptedException e) {
//ignore
}
-
- currentMinMCTFirstLSN = getMinMCTFirstLSN();
- if (currentMinMCTFirstLSN - lastMinMCTFirstLSN > lsnThreshold) {
+
+
+ if(lastCheckpointLSN == -1)
+ {
try {
- recoveryMgr.checkpoint(false);
- lastMinMCTFirstLSN = currentMinMCTFirstLSN;
+ //Since the system just started up after sharp checkpoint, last checkpoint LSN is considered as the min LSN of the current log partition
+ lastCheckpointLSN = logManager.getReadableSmallestLSN();
+ } catch (Exception e) {
+ lastCheckpointLSN = 0;
+ }
+ }
+
+ //1. get current log LSN
+ currentLogLSN = logManager.getAppendLSN();
+
+ //2. if current log LSN - previous checkpoint > threshold, do checkpoint
+ if (currentLogLSN - lastCheckpointLSN > lsnThreshold) {
+ try {
+ // in check point:
+ //1. get minimum first LSN (MFL) from open indexes.
+ //2. if current MinFirstLSN < targetCheckpointLSN, schedule async flush for any open index witch has first LSN < force flush delta
+ //3. next time checkpoint comes, it will be able to remove log files which have end range less than current targetCheckpointLSN
+
+ targetCheckpointLSN = lastCheckpointLSN + lsnThreshold;
+ currentCheckpointAttemptMinLSN = recoveryMgr.checkpoint(false, targetCheckpointLSN);
+
+ //checkpoint was completed at target LSN or above
+ if(currentCheckpointAttemptMinLSN >= targetCheckpointLSN)
+ {
+ lastCheckpointLSN = currentCheckpointAttemptMinLSN;
+ }
+
} catch (ACIDException | HyracksDataException e) {
throw new Error("failed to checkpoint", e);
}
@@ -64,19 +87,4 @@
}
}
- private long getMinMCTFirstLSN() {
- List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
- long minMCTFirstLSN = Long.MAX_VALUE;
- long firstLSN;
- if (openIndexList.size() > 0) {
- for (IIndex index : openIndexList) {
- firstLSN = ((AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback())
- .getFirstLSN();
- minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
- }
- } else {
- minMCTFirstLSN = -1;
- }
- return minMCTFirstLSN;
- }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index a7fb73b..492f020 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -38,6 +38,7 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
@@ -45,8 +46,8 @@
import edu.uci.ics.asterix.common.transactions.ILogRecord;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.LogType;
import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -59,7 +60,7 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
import edu.uci.ics.hyracks.storage.common.file.LocalResource;
@@ -213,6 +214,8 @@
case LogType.ABORT:
abortLogCount++;
break;
+ case LogType.FLUSH:
+ break;
default:
throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
}
@@ -321,6 +324,8 @@
case LogType.JOB_COMMIT:
case LogType.ENTITY_COMMIT:
case LogType.ABORT:
+ case LogType.FLUSH:
+
//do nothing
break;
@@ -347,9 +352,11 @@
}
@Override
- public synchronized void checkpoint(boolean isSharpCheckpoint) throws ACIDException, HyracksDataException {
+ public synchronized long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN) throws ACIDException, HyracksDataException {
long minMCTFirstLSN;
+ boolean nonSharpCheckpointSucceeded = false;
+
if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting sharp checkpoint ... ");
}
@@ -361,52 +368,26 @@
// right after the new checkpoint file is written.
File[] prevCheckpointFiles = getPreviousCheckpointFiles();
- IIndexLifecycleManager indexLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
- .getIndexLifecycleManager();
- List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
-
+ DatasetLifecycleManager datasetLifecycleManager = (DatasetLifecycleManager)txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager();
//#. flush all in-memory components if it is the sharp checkpoint
if (isSharpCheckpoint) {
- ///////////////////////////////////////////////
- //TODO : change the code inside the if statement into indexLifeCycleManager.flushAllDatasets()
- //indexLifeCycleManager.flushAllDatasets();
- ///////////////////////////////////////////////
- List<BlockingIOOperationCallbackWrapper> callbackList = new LinkedList<BlockingIOOperationCallbackWrapper>();
- for (IIndex index : openIndexList) {
- ILSMIndex lsmIndex = (ILSMIndex) index;
- ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(
- lsmIndex.getIOOperationCallback());
- callbackList.add(cb);
- try {
- indexAccessor.scheduleFlush(cb);
- } catch (HyracksDataException e) {
- throw new ACIDException(e);
- }
- }
- for (BlockingIOOperationCallbackWrapper cb : callbackList) {
- try {
- cb.waitForIO();
- } catch (InterruptedException e) {
- throw new ACIDException(e);
- }
- }
+ datasetLifecycleManager.flushAllDatasets();
+
minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
} else {
- long firstLSN;
- minMCTFirstLSN = Long.MAX_VALUE;
- if (openIndexList.size() > 0) {
- for (IIndex index : openIndexList) {
- firstLSN = ((AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback())
- .getFirstLSN();
- minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
- }
- } else {
- minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
+
+ minMCTFirstLSN = getMinFirstLSN();
+
+ if(minMCTFirstLSN >= nonSharpCheckpointTargetLSN){
+ nonSharpCheckpointSucceeded = true;
+ }
+ else{
+ //flush datasets with indexes behind target checkpoint LSN
+ datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(nonSharpCheckpointTargetLSN);
}
}
+
CheckpointObject checkpointObject = new CheckpointObject(logMgr.getAppendLSN(), minMCTFirstLSN,
txnMgr.getMaxJobId(), System.currentTimeMillis());
@@ -454,9 +435,39 @@
}
}
+ if(nonSharpCheckpointSucceeded){
+ logMgr.deleteOldLogFiles(minMCTFirstLSN);
+ }
+
if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Completed sharp checkpoint.");
}
+
+ //return the min LSN that was recorded in the checkpoint
+ return minMCTFirstLSN;
+ }
+
+ public long getMinFirstLSN() throws HyracksDataException
+ {
+ IIndexLifecycleManager indexLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager();
+ List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
+ long firstLSN;
+ //the min first lsn can only be the current append or smaller
+ long minFirstLSN = logMgr.getAppendLSN();
+
+ if (openIndexList.size() > 0) {
+
+ for (IIndex index : openIndexList) {
+
+ AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback)((ILSMIndex) index).getIOOperationCallback();
+ if(!((AbstractLSMIndex)index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()){
+ firstLSN = ioCallback.getFirstLSN();
+ minFirstLSN = Math.min(minFirstLSN, firstLSN);
+ }
+ }
+ }
+
+ return minFirstLSN;
}
private CheckpointObject readCheckpoint() throws ACIDException, FileNotFoundException {
@@ -628,6 +639,7 @@
break;
case LogType.ABORT:
+ case LogType.FLUSH:
//ignore
break;
@@ -722,7 +734,6 @@
} else {
throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp());
}
- ((AbstractLSMIOOperationCallback) index.getIOOperationCallback()).updateLastLSN(logRecord.getLSN());
} catch (Exception e) {
throw new IllegalStateException("Failed to redo", e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 664a4f1..b28964a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -28,9 +28,9 @@
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
import edu.uci.ics.asterix.common.transactions.MutableLong;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallback;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
@@ -87,7 +87,6 @@
// avoid object creations.
// Those are used in synchronized methods.
private MutableLong tempResourceIdForRegister;
- private MutableLong tempResourceIdForSetLSN;
private LogRecord logRecord;
// TODO: implement transactionContext pool in order to avoid object
@@ -106,7 +105,6 @@
indexMap = new HashMap<MutableLong, AbstractLSMIOOperationCallback>();
primaryIndex = null;
tempResourceIdForRegister = new MutableLong();
- tempResourceIdForSetLSN = new MutableLong();
logRecord = new LogRecord();
}
@@ -128,20 +126,10 @@
// [Notice]
// This method is called sequentially by the LogAppender threads.
- // However, the indexMap is concurrently read and modified through this
- // method and registerIndexAndCallback()
@Override
- public void setLastLSN(long resourceId, long LSN) {
- synchronized (indexMap) {
- firstLSN.compareAndSet(-1, LSN);
- lastLSN.set(Math.max(lastLSN.get(), LSN));
- if (resourceId != -1) {
- // Non-update log's resourceId is -1.
- tempResourceIdForSetLSN.set(resourceId);
- AbstractLSMIOOperationCallback ioOpCallback = indexMap.get(tempResourceIdForSetLSN);
- ioOpCallback.updateLastLSN(LSN);
- }
- }
+ public void setLastLSN(long LSN) {
+ firstLSN.compareAndSet(-1, LSN);
+ lastLSN.set(Math.max(lastLSN.get(), LSN));
}
@Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 94be125..ec90cc4 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -27,7 +27,7 @@
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
/**
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index e6bcc5d..20612ed 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -52,8 +52,9 @@
this.recoveryManager = new RecoveryManager(this);
if (asterixAppRuntimeContextProvider != null) {
this.checkpointThread = new CheckpointThread(recoveryManager,
- asterixAppRuntimeContextProvider.getIndexLifecycleManager(),
+ asterixAppRuntimeContextProvider.getIndexLifecycleManager(),logManager,
this.txnProperties.getCheckpointLSNThreshold(), this.txnProperties.getCheckpointPollFrequency());
+ this.checkpointThread.start();
} else {
this.checkpointThread = null;
}