Fixing LSMHarness issue.
Change-Id: I8afc0f189f5a64cc56be4a89903999c7c90a65c1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/252
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index f94d6f5..ee12248 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
@@ -63,13 +64,13 @@
public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
// Searches are immediately considered complete, because they should not prevent the execution of flushes.
- if (opType == LSMOperationType.SEARCH || opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+ if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
completeOperation(index, opType, searchCallback, modificationCallback);
}
}
@Override
- public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
+ public synchronized void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
decrementNumActiveOperations(modificationCallback);
@@ -98,47 +99,46 @@
}
if (needsFlush || flushOnExit) {
+ //Make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering them until the current flush is schedule.
+ for (ILSMIndex lsmIndex : indexes) {
+ if (((AbstractLSMIndex) lsmIndex).getCurrentMutableComponentState() == ComponentState.READABLE_WRITABLE) {
+ ((AbstractLSMIndex) lsmIndex).setCurrentMutableComponentState(ComponentState.READABLE_UNWRITABLE);
+ }
+ }
+
LogRecord logRecord = new LogRecord();
logRecord.formFlushLogRecord(datasetID, this);
+
try {
logManager.log(logRecord);
} catch (ACIDException e) {
throw new HyracksDataException("could not write flush log", e);
}
+
flushLogCreated = true;
- }
-
- if (flushOnExit) {
- //Make the current mutable components (if have been modified) UnWritable to stop coming modify operations from entering them
- for (ILSMIndex lsmIndex : indexes) {
- if (!((AbstractLSMIndex) lsmIndex).isCurrentMutableComponentEmpty()) {
- ((AbstractLSMIndex) lsmIndex).makeCurrentMutableComponentUnWritable();
- }
- }
flushOnExit = false;
}
}
- public void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
- // why synchronized ??
- synchronized (this) {
- for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
+ //Since this method is called sequentially by LogPage.notifyFlushTerminator in the sequence flush were scheduled.
+ public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
+ for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
- //get resource
- ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
+ //get resource
+ ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
- //update resource lsn
- AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
- .getIOOperationCallback();
- ioOpCallback.updateLastLSN(logRecord.getLSN());
+ //update resource lsn
+ AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
+ .getIOOperationCallback();
+ ioOpCallback.updateLastLSN(logRecord.getLSN());
- //schedule flush after update
- accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
- }
-
- flushLogCreated = false;
+ //schedule flush after update
+ accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
+
}
+
+ flushLogCreated = false;
}
@Override