Added many fixes. Checkpointing.
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index 5e07e0c..77bd040 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -62,74 +62,67 @@
}
public void create() throws HyracksDataException {
- synchronized (lcManager) {
- long resourceID = getResourceID();
- index = lcManager.getIndex(resourceID);
- if (index != null) {
- lcManager.unregister(resourceID);
- } else {
- index = createIndexInstance();
- }
-
- // The previous resource ID needs to be removed since calling IIndex.create() may possibly destroy
- // any physical artifact that the LocalResourceRepository is managing (e.g. a file containing the resource ID).
- // Once the index has been created, a new resource ID can be generated.
- if (resourceID != -1) {
- localResourceRepository.deleteResourceByName(file.getFile().getPath());
- }
- index.create();
- try {
- //TODO Create LocalResource through LocalResourceFactory interface
- resourceID = resourceIdFactory.createId();
- ILocalResourceFactory localResourceFactory = opDesc.getLocalResourceFactoryProvider()
- .getLocalResourceFactory();
- localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, file.getFile()
- .getPath(), partition));
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- lcManager.register(resourceID, index);
+ long resourceID = getResourceID();
+ index = lcManager.getIndex(resourceID);
+ if (index != null) {
+ lcManager.unregister(resourceID);
+ } else {
+ index = createIndexInstance();
}
+
+ // The previous resource ID needs to be removed since calling IIndex.create() may possibly destroy
+ // any physical artifact that the LocalResourceRepository is managing (e.g. a file containing the resource ID).
+ // Once the index has been created, a new resource ID can be generated.
+ if (resourceID != -1) {
+ localResourceRepository.deleteResourceByName(file.getFile().getPath());
+ }
+ index.create();
+ try {
+ //TODO Create LocalResource through LocalResourceFactory interface
+ resourceID = resourceIdFactory.createId();
+ ILocalResourceFactory localResourceFactory = opDesc.getLocalResourceFactoryProvider()
+ .getLocalResourceFactory();
+ localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, file.getFile()
+ .getPath(), partition));
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ lcManager.register(resourceID, index);
+
}
public void open() throws HyracksDataException {
- synchronized (lcManager) {
- long resourceID = getResourceID();
+ long resourceID = getResourceID();
- if (resourceID == -1) {
- throw new HyracksDataException("Index does not have a valid resource ID. Has it been created yet?");
- }
-
- index = lcManager.getIndex(resourceID);
- if (index == null) {
- index = createIndexInstance();
- lcManager.register(resourceID, index);
- }
- lcManager.open(resourceID);
+ if (resourceID == -1) {
+ throw new HyracksDataException("Index does not have a valid resource ID. Has it been created yet?");
}
+
+ index = lcManager.getIndex(resourceID);
+ if (index == null) {
+ index = createIndexInstance();
+ lcManager.register(resourceID, index);
+ }
+ lcManager.open(resourceID);
}
public void close() throws HyracksDataException {
- synchronized (lcManager) {
- lcManager.close(getResourceID());
- }
+ lcManager.close(getResourceID());
}
public void destroy() throws HyracksDataException {
- synchronized (lcManager) {
- long resourceID = getResourceID();
- index = lcManager.getIndex(resourceID);
- if (index != null) {
- lcManager.unregister(resourceID);
- } else {
- index = createIndexInstance();
- }
-
- if (resourceID != -1) {
- localResourceRepository.deleteResourceByName(file.getFile().getPath());
- }
- index.destroy();
+ long resourceID = getResourceID();
+ index = lcManager.getIndex(resourceID);
+ if (index != null) {
+ lcManager.unregister(resourceID);
+ } else {
+ index = createIndexInstance();
}
+
+ if (resourceID != -1) {
+ localResourceRepository.deleteResourceByName(file.getFile().getPath());
+ }
+ index.destroy();
}
public FileReference getFileReference() {
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 328629a..7c95480 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -67,9 +67,9 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.VirtualFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractMutableLSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
@@ -119,8 +119,8 @@
}
@Override
- public void setFlushStatus(boolean isFlushNeeded) {
- needsFlush[currentMutableComponentId.get()].set(isFlushNeeded);
+ public void requestFlush(boolean isFlushNeeded) {
+ flushRequests[currentMutableComponentId.get()].set(isFlushNeeded);
}
});
@@ -373,12 +373,9 @@
}
@Override
- public boolean scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
ILSMComponent flushingComponent = ctx.getComponentHolder().get(0);
- if (!((AbstractMutableLSMComponent) flushingComponent).isModified()) {
- return false;
- }
LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
LSMBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
assert ctx.getComponentHolder().size() == 1;
@@ -387,7 +384,6 @@
ILSMIndexAccessorInternal flushAccessor = new LSMBTreeAccessor(lsmHarness, opCtx);
ioScheduler.scheduleOperation(new LSMBTreeFlushOperation(flushAccessor, flushingComponent, componentFileRefs
.getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback));
- return true;
}
@Override
@@ -438,14 +434,13 @@
return component;
}
+ @Override
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException {
LSMBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
opCtx.getComponentHolder().addAll(mergingComponents);
ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx);
- RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
- search(opCtx, cursor, rangePred);
opCtx.setOperation(IndexOperation.MERGE);
BTree firstBTree = (BTree) ((LSMBTreeImmutableComponent) mergingComponents.get(0)).getBTree();
@@ -465,6 +460,8 @@
throws HyracksDataException, IndexException {
LSMBTreeMergeOperation mergeOp = (LSMBTreeMergeOperation) operation;
ITreeIndexCursor cursor = mergeOp.getCursor();
+ RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
+ search(((LSMIndexSearchCursor) cursor).getOpCtx(), cursor, rangePred);
mergedComponents.addAll(mergeOp.getMergingComponents());
long numElements = 0L;
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index d832583..bb868dd 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -67,6 +67,7 @@
proceed = false;
}
+ int kk = 0;
protected void checkPriorityQueue() throws HyracksDataException, IndexException {
while (!outputPriorityQueue.isEmpty() || needPush == true) {
if (!outputPriorityQueue.isEmpty()) {
@@ -127,6 +128,8 @@
// If there is no previous tuple or the previous tuple can be ignored
if (outputElement == null) {
if (isDeleted(checkElement)) {
+ kk++;
+ System.out.println(kk);
// If the key has been deleted then pop it and set needPush to true.
// We cannot push immediately because the tuple may be
// modified if hasNext() is called
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java
index 1d1de53..98ec6b9 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java
@@ -24,9 +24,11 @@
DISK
}
- public boolean threadEnter(LSMOperationType opType) throws InterruptedException, HyracksDataException;
+ public boolean threadEnter(LSMOperationType opType, boolean firstComponent) throws InterruptedException,
+ HyracksDataException;
- public void threadExit(LSMOperationType opType, boolean failedOperation) throws HyracksDataException;
+ public void threadExit(LSMOperationType opType, boolean failedOperation, boolean firstComponent)
+ throws HyracksDataException;
public LSMComponentType getType();
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
index 41e1425..9d6cec4 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
@@ -35,7 +35,7 @@
public void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred)
throws HyracksDataException, IndexException;
- public boolean scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException;
public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException;
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
index 5eaa6f6..1473071 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
@@ -19,5 +19,5 @@
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
public interface ILSMMergePolicy {
- public void diskComponentAdded(ILSMIndex index, int totalNumDiskComponents) throws HyracksDataException, IndexException;
+ public void diskComponentAdded(ILSMIndex index) throws HyracksDataException, IndexException;
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
index ccde3ba..2659b27 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
@@ -36,7 +36,7 @@
* then this method does not block and returns false.
* Otherwise, this method returns true, and the operation is considered 'active' in the index.
*/
- public boolean beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
+ public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException;
/**
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IMutableComponentSwitcherCallback.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IMutableComponentSwitcherCallback.java
index 8eae3a4..497a634 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IMutableComponentSwitcherCallback.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IMutableComponentSwitcherCallback.java
@@ -20,5 +20,5 @@
public void switchComponents() throws HyracksDataException;
- public void setFlushStatus(boolean isFlushNeeded);
+ public void requestFlush(boolean isFlushNeeded);
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractImmutableLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractImmutableLSMComponent.java
index 59132e6..31b96d2 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractImmutableLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractImmutableLSMComponent.java
@@ -34,7 +34,7 @@
}
@Override
- public synchronized boolean threadEnter(LSMOperationType opType) {
+ public boolean threadEnter(LSMOperationType opType, boolean firstComponent) {
if (state == ComponentState.KILLED) {
return false;
}
@@ -59,7 +59,8 @@
}
@Override
- public synchronized void threadExit(LSMOperationType opType, boolean failedOperation) throws HyracksDataException {
+ public void threadExit(LSMOperationType opType, boolean failedOperation, boolean firstComponent)
+ throws HyracksDataException {
switch (opType) {
case MERGE:
if (failedOperation) {
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 86f299f..02f9c45 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -58,7 +58,7 @@
protected boolean isActivated;
- protected final AtomicBoolean[] needsFlush;
+ protected final AtomicBoolean[] flushRequests;
public AbstractLSMIndex(List<IVirtualBufferCache> virtualBufferCaches, IBufferCache diskBufferCache,
ILSMIndexFileManager fileManager, IFileMapProvider diskFileMapProvider,
@@ -76,9 +76,9 @@
componentsRef = new AtomicReference<List<ILSMComponent>>();
componentsRef.set(new LinkedList<ILSMComponent>());
currentMutableComponentId = new AtomicInteger();
- needsFlush = new AtomicBoolean[virtualBufferCaches.size()];
+ flushRequests = new AtomicBoolean[virtualBufferCaches.size()];
for (int i = 0; i < virtualBufferCaches.size(); i++) {
- needsFlush[i] = new AtomicBoolean();
+ flushRequests[i] = new AtomicBoolean();
}
}
@@ -169,7 +169,7 @@
@Override
public boolean getFlushStatus() {
- return needsFlush[currentMutableComponentId.get()].get();
+ return flushRequests[currentMutableComponentId.get()].get();
}
@Override
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java
index 05b0a4f..4870baa 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java
@@ -56,27 +56,44 @@
}
@Override
- public synchronized boolean threadEnter(LSMOperationType opType) throws InterruptedException, HyracksDataException {
+ public boolean threadEnter(LSMOperationType opType, boolean firstComponent) throws InterruptedException,
+ HyracksDataException {
+ if (state == ComponentState.INACTIVE_READABLE_WRITABLE && requestedToBeActive) {
+ state = ComponentState.READABLE_WRITABLE;
+ requestedToBeActive = false;
+ }
switch (opType) {
case FORCE_MODIFICATION:
- if (state == ComponentState.INACTIVE_READABLE_WRITABLE && requestedToBeActive) {
- state = ComponentState.READABLE_WRITABLE;
- requestedToBeActive = false;
+ if (firstComponent) {
+ if (state == ComponentState.READABLE_WRITABLE || state == ComponentState.READABLE_UNWRITABLE) {
+ writerCount++;
+ } else {
+ return false;
+ }
+ } else {
+ if (state == ComponentState.READABLE_UNWRITABLE
+ || state == ComponentState.READABLE_UNWRITABLE_FLUSHING) {
+ readerCount++;
+ } else {
+ return false;
+ }
}
- if (state != ComponentState.READABLE_WRITABLE && state != ComponentState.READABLE_UNWRITABLE) {
- return false;
- }
- writerCount++;
break;
case MODIFICATION:
- if (state == ComponentState.INACTIVE_READABLE_WRITABLE && requestedToBeActive) {
- state = ComponentState.READABLE_WRITABLE;
- requestedToBeActive = false;
+ if (firstComponent) {
+ if (state == ComponentState.READABLE_WRITABLE) {
+ writerCount++;
+ } else {
+ return false;
+ }
+ } else {
+ if (state == ComponentState.READABLE_UNWRITABLE
+ || state == ComponentState.READABLE_UNWRITABLE_FLUSHING) {
+ readerCount++;
+ } else {
+ return false;
+ }
}
- if (state != ComponentState.READABLE_WRITABLE) {
- return false;
- }
- writerCount++;
break;
case SEARCH:
if (state == ComponentState.UNREADABLE_UNWRITABLE) {
@@ -86,14 +103,17 @@
break;
case FLUSH:
if (state == ComponentState.READABLE_UNWRITABLE_FLUSHING
- || state == ComponentState.UNREADABLE_UNWRITABLE) {
+ || state == ComponentState.UNREADABLE_UNWRITABLE
+ || state == ComponentState.INACTIVE_READABLE_WRITABLE) {
return false;
}
state = ComponentState.READABLE_UNWRITABLE_FLUSHING;
- switcherCallback.setFlushStatus(false);
- while (writerCount > 0) {
- wait();
+ switcherCallback.requestFlush(false);
+ synchronized (this) {
+ while (writerCount > 0) {
+ wait();
+ }
}
switcherCallback.switchComponents();
readerCount++;
@@ -105,14 +125,24 @@
}
@Override
- public synchronized void threadExit(LSMOperationType opType, boolean failedOperation) throws HyracksDataException {
+ public void threadExit(LSMOperationType opType, boolean failedOperation, boolean firstComponent)
+ throws HyracksDataException {
switch (opType) {
case FORCE_MODIFICATION:
case MODIFICATION:
- writerCount--;
- if (state == ComponentState.READABLE_WRITABLE && isFull()) {
- state = ComponentState.READABLE_UNWRITABLE;
- switcherCallback.setFlushStatus(true);
+ if (firstComponent) {
+ writerCount--;
+ if (state == ComponentState.READABLE_WRITABLE && isFull() && !failedOperation) {
+ state = ComponentState.READABLE_UNWRITABLE;
+ switcherCallback.requestFlush(true);
+ }
+ } else {
+ readerCount--;
+ if (state == ComponentState.UNREADABLE_UNWRITABLE && readerCount == 0) {
+ reset();
+ adderCallback.addComponent();
+ state = ComponentState.INACTIVE_READABLE_WRITABLE;
+ }
}
break;
case SEARCH:
@@ -120,36 +150,33 @@
if (state == ComponentState.UNREADABLE_UNWRITABLE && readerCount == 0) {
reset();
adderCallback.addComponent();
- if (requestedToBeActive == true) {
- state = ComponentState.READABLE_WRITABLE;
- requestedToBeActive = false;
- } else {
- state = ComponentState.INACTIVE_READABLE_WRITABLE;
- }
- } else if (state == ComponentState.READABLE_WRITABLE && isFull()) {
- state = ComponentState.READABLE_UNWRITABLE;
- switcherCallback.setFlushStatus(true);
+ state = ComponentState.INACTIVE_READABLE_WRITABLE;
}
break;
case FLUSH:
+ assert state == ComponentState.READABLE_UNWRITABLE_FLUSHING;
readerCount--;
if (readerCount == 0) {
reset();
adderCallback.addComponent();
- if (requestedToBeActive == true) {
- state = ComponentState.READABLE_WRITABLE;
- requestedToBeActive = false;
- } else {
- state = ComponentState.INACTIVE_READABLE_WRITABLE;
- }
- } else if (state == ComponentState.READABLE_UNWRITABLE_FLUSHING) {
+ state = ComponentState.INACTIVE_READABLE_WRITABLE;
+ } else {
state = ComponentState.UNREADABLE_UNWRITABLE;
}
break;
default:
throw new UnsupportedOperationException("Unsupported operation " + opType);
}
- notifyAll();
+ synchronized (this) {
+ notifyAll();
+ }
+ }
+
+ public boolean isReadable() {
+ if (state == ComponentState.INACTIVE_READABLE_WRITABLE || state == ComponentState.UNREADABLE_UNWRITABLE) {
+ return false;
+ }
+ return true;
}
@Override
@@ -157,14 +184,7 @@
return LSMComponentType.MEMORY;
}
- public synchronized boolean isReadable() {
- if (state == ComponentState.UNREADABLE_UNWRITABLE || state == ComponentState.INACTIVE_READABLE_WRITABLE) {
- return false;
- }
- return true;
- }
-
- public synchronized void setActive() {
+ public void setActive() {
requestedToBeActive = true;
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
index ba5b8c6..0554dc3 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
@@ -31,7 +31,6 @@
private AsynchronousScheduler() {
operationPerformerThread = new OperationPerformerThread();
- operationPerformerThread.setDaemon(true);
}
public void init(ThreadFactory threadFactory) {
@@ -69,6 +68,4 @@
}
}
}
-
}
-
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
index 2510311..a7ca95f 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
@@ -32,7 +32,7 @@
public synchronized void waitForIO() throws InterruptedException {
if (!notified) {
- this.wait();
+ wait();
}
notified = false;
}
@@ -51,7 +51,7 @@
@Override
public synchronized void afterFinalize(ILSMComponent newComponent) throws HyracksDataException {
wrappedCallback.afterFinalize(newComponent);
- this.notifyAll();
+ notifyAll();
notified = true;
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index 0ecfeb5..b6f5657 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -31,9 +31,8 @@
}
@Override
- public void diskComponentAdded(final ILSMIndex index, int totalNumDiskComponents) throws HyracksDataException,
- IndexException {
- if (totalNumDiskComponents >= threshold) {
+ public void diskComponentAdded(final ILSMIndex index) throws HyracksDataException, IndexException {
+ if (index.getImmutableComponents().size() >= threshold) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 364bc47..7d9fc00 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -58,55 +58,58 @@
boolean entranceSuccessful = false;
while (!entranceSuccessful) {
- int numEntered = 0;
- lsmIndex.getOperationalComponents(ctx);
- List<ILSMComponent> components = ctx.getComponentHolder();
- try {
- // The purpose of the synchronized block is to make the beforeOperation call and entering the mutable component an atomic operation.
- synchronized (this) {
+ synchronized (opTracker) {
+ int numEntered = 0;
+ lsmIndex.getOperationalComponents(ctx);
+ List<ILSMComponent> components = ctx.getComponentHolder();
+ try {
for (ILSMComponent c : components) {
- if (!c.threadEnter(opType)) {
+ if (!c.threadEnter(opType, numEntered == 0 ? true : false)) {
break;
}
numEntered++;
}
entranceSuccessful = numEntered == components.size();
if (entranceSuccessful) {
- if (!opTracker.beforeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
- ctx.getModificationCallback())) {
- entranceSuccessful = false;
+ opTracker.beforeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
+ ctx.getModificationCallback());
+ }
+ } catch (InterruptedException e) {
+ entranceSuccessful = false;
+ throw new HyracksDataException(e);
+ } finally {
+ if (!entranceSuccessful) {
+ int i = 0;
+ for (ILSMComponent c : components) {
+ if (numEntered == 0) {
+ break;
+ }
+ c.threadExit(opType, true, i == 0 ? true : false);
+ i++;
+ numEntered--;
}
}
}
- } catch (InterruptedException e) {
- entranceSuccessful = false;
- throw new HyracksDataException(e);
- } finally {
- if (!entranceSuccessful) {
- for (ILSMComponent c : components) {
- if (numEntered == 0) {
- break;
- }
- c.threadExit(opType, true);
- numEntered--;
- }
+ if (tryOperation && !entranceSuccessful) {
+ return false;
}
}
- if (tryOperation && !entranceSuccessful) {
- return false;
- }
}
return true;
}
private void exitComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, boolean failedOperation)
throws HyracksDataException {
- try {
- for (ILSMComponent c : ctx.getComponentHolder()) {
- c.threadExit(opType, failedOperation);
+ synchronized (opTracker) {
+ try {
+ int i = 0;
+ for (ILSMComponent c : ctx.getComponentHolder()) {
+ c.threadExit(opType, failedOperation, i == 0 ? true : false);
+ i++;
+ }
+ } finally {
+ threadExit(ctx, opType);
}
- } finally {
- threadExit(ctx, opType);
}
}
@@ -164,11 +167,12 @@
@Override
public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
- if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) {
+ LSMOperationType opType = LSMOperationType.FLUSH;
+ if (!getAndEnterComponents(ctx, opType, true)) {
return;
}
-
- if (!lsmIndex.scheduleFlush(ctx, callback)) {
+ ILSMComponent flushingComponent = ctx.getComponentHolder().get(0);
+ if (!((AbstractMutableLSMComponent) flushingComponent).isModified()) {
callback.beforeOperation();
callback.afterOperation(null, null);
AbstractMutableLSMComponent mutableComponent = (AbstractMutableLSMComponent) ctx.getComponentHolder()
@@ -177,11 +181,13 @@
@Override
public void addComponent() throws HyracksDataException {
- // do nothing
+ // Do nothing
}
});
- exitComponents(ctx, LSMOperationType.FLUSH, false);
+ exitComponents(ctx, opType, false);
callback.afterFinalize(null);
+ } else {
+ lsmIndex.scheduleFlush(ctx, callback);
}
}
@@ -203,9 +209,8 @@
@Override
public void addComponent() throws HyracksDataException {
lsmIndex.addComponent(newComponent);
- int numComponents = lsmIndex.getImmutableComponents().size();
try {
- mergePolicy.diskComponentAdded(lsmIndex, numComponents);
+ mergePolicy.diskComponentAdded(lsmIndex);
} catch (IndexException e) {
throw new HyracksDataException(e);
}
@@ -220,13 +225,13 @@
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException {
LSMOperationType opType = LSMOperationType.MERGE;
- if (!getAndEnterComponents(ctx, opType, false)) {
+ if (!getAndEnterComponents(ctx, opType, true)) {
return;
}
- if (ctx.getComponentHolder().size() > 1) {
- lsmIndex.scheduleMerge(ctx, callback);
- } else {
+ if (ctx.getComponentHolder().size() < 2) {
exitComponents(ctx, opType, true);
+ } else {
+ lsmIndex.scheduleMerge(ctx, callback);
}
}
@@ -251,8 +256,7 @@
public void addBulkLoadedComponent(ILSMComponent c) throws HyracksDataException, IndexException {
lsmIndex.markAsValid(c);
lsmIndex.addComponent(c);
- int numComponents = lsmIndex.getImmutableComponents().size();
- mergePolicy.diskComponentAdded(lsmIndex, numComponents);
+ mergePolicy.diskComponentAdded(lsmIndex);
}
@Override
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index ea23091..45cc69b 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -51,6 +51,10 @@
needPush = false;
}
+ public ILSMIndexOperationContext getOpCtx() {
+ return opCtx;
+ }
+
public void initPriorityQueue() throws HyracksDataException, IndexException {
int pqInitSize = (rangeCursors.length > 0) ? rangeCursors.length : 1;
outputPriorityQueue = new PriorityQueue<PriorityQueueElement>(pqInitSize, pqCmp);
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
index 80ac6d8..17d1b17 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
@@ -21,7 +21,7 @@
INSTANCE;
@Override
- public void diskComponentAdded(ILSMIndex index, int totalNumDiskComponents) {
+ public void diskComponentAdded(ILSMIndex index) {
// Do nothing
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerProvider.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerProvider.java
index 65c2b9f..9bbb476 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerProvider.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerProvider.java
@@ -45,10 +45,9 @@
}
@Override
- public boolean beforeOperation(ILSMIndex index, LSMOperationType opType,
+ public void beforeOperation(ILSMIndex index, LSMOperationType opType,
ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback)
throws HyracksDataException {
- return true;
}
@Override
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java
index 752e923..19ee4b8 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java
@@ -32,12 +32,11 @@
}
@Override
- public boolean beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
+ public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.MODIFICATION) {
threadRefCount.incrementAndGet();
}
- return true;
}
@Override
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 5b11f66..b966426 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -64,10 +64,10 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.VirtualFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractMutableLSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndex;
@@ -138,8 +138,8 @@
}
@Override
- public void setFlushStatus(boolean isFlushNeeded) {
- needsFlush[currentMutableComponentId.get()].set(isFlushNeeded);
+ public void requestFlush(boolean isFlushNeeded) {
+ flushRequests[currentMutableComponentId.get()].set(isFlushNeeded);
}
});
@@ -227,13 +227,12 @@
isActivated = false;
if (flushOnExit) {
- BlockingIOOperationCallbackWrapper blockingCallBack = new BlockingIOOperationCallbackWrapper(
+ BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(
ioOpCallbackProvider.getIOOperationCallback(this));
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- accessor.scheduleFlush(blockingCallBack);
+ ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFlush(cb);
try {
- blockingCallBack.waitForIO();
+ cb.waitForIO();
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}
@@ -442,12 +441,9 @@
}
@Override
- public boolean scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
ILSMComponent flushingComponent = ctx.getComponentHolder().get(0);
- if (!((AbstractMutableLSMComponent) flushingComponent).isModified()) {
- return false;
- }
LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
LSMInvertedIndexOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
@@ -457,7 +453,6 @@
new LSMInvertedIndexAccessor(lsmHarness, opCtx), flushingComponent, componentFileRefs
.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(),
componentFileRefs.getBloomFilterFileReference(), callback));
- return true;
}
@Override
@@ -548,10 +543,6 @@
List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
ictx.getComponentHolder().addAll(mergingComponents);
IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(ictx);
- RangePredicate mergePred = new RangePredicate(null, null, true, true, null, null);
-
- // Scan diskInvertedIndexes ignoring the memoryInvertedIndex.
- search(ictx, cursor, mergePred);
ictx.setOperation(IndexOperation.MERGE);
LSMInvertedIndexImmutableComponent firstComponent = (LSMInvertedIndexImmutableComponent) mergingComponents
@@ -575,6 +566,11 @@
public ILSMComponent merge(List<ILSMComponent> mergedComponents, ILSMIOOperation operation)
throws HyracksDataException, IndexException {
LSMInvertedIndexMergeOperation mergeOp = (LSMInvertedIndexMergeOperation) operation;
+ IIndexCursor cursor = mergeOp.getCursor();
+
+ RangePredicate mergePred = new RangePredicate(null, null, true, true, null, null);
+ // Scan diskInvertedIndexes ignoring the memoryInvertedIndex.
+ search(((LSMIndexSearchCursor) cursor).getOpCtx(), cursor, mergePred);
// Create an inverted index instance.
LSMInvertedIndexImmutableComponent component = createDiskInvIndexComponent(componentFactory,
@@ -582,7 +578,6 @@
mergeOp.getBloomFilterMergeTarget(), true);
IInvertedIndex mergedDiskInvertedIndex = component.getInvIndex();
- IIndexCursor cursor = mergeOp.getCursor();
IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true, 0L, false);
try {
while (cursor.hasNext()) {
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
index d7911b1..f6996d4 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
@@ -27,7 +27,9 @@
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BloomFilterAwareBTreePointSearchCursor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
@@ -71,18 +73,17 @@
if (!deletedKeysBTreeAccessors.isEmpty()) {
deletedKeysBTreeCursors = new IIndexCursor[deletedKeysBTreeAccessors.size()];
- int i = 0;
- if (includeMutableComponent) {
- // No need for a bloom filter for the in-memory BTree.
- deletedKeysBTreeCursors[i] = deletedKeysBTreeAccessors.get(i).createSearchCursor();
- ++i;
+ for (int i = 0; i < operationalComponents.size(); i++) {
+ ILSMComponent component = operationalComponents.get(i);
+ if (component.getType() == LSMComponentType.MEMORY) {
+ // No need for a bloom filter for the in-memory BTree.
+ deletedKeysBTreeCursors[i] = deletedKeysBTreeAccessors.get(i).createSearchCursor();
+ } else {
+ deletedKeysBTreeCursors[i] = new BloomFilterAwareBTreePointSearchCursor((IBTreeLeafFrame) lsmInitState
+ .getgetDeletedKeysBTreeLeafFrameFactory().createFrame(), false,
+ ((LSMInvertedIndexImmutableComponent) operationalComponents.get(i)).getBloomFilter());
+ }
}
- for (; i < deletedKeysBTreeCursors.length; i++) {
- deletedKeysBTreeCursors[i] = new BloomFilterAwareBTreePointSearchCursor((IBTreeLeafFrame) lsmInitState
- .getgetDeletedKeysBTreeLeafFrameFactory().createFrame(), false,
- ((LSMInvertedIndexImmutableComponent) operationalComponents.get(i)).getBloomFilter());
- }
-
}
MultiComparator keyCmp = lsmInitState.getKeyComparator();
keySearchPred = new RangePredicate(keysOnlyTuple, keysOnlyTuple, true, true, keyCmp, keyCmp);
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
index 882d1a1..2b69485 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BloomFilterAwareBTreePointSearchCursor;
@@ -44,7 +45,6 @@
private int accessorIndex = -1;
private boolean tupleConsumed = true;
private ILSMHarness harness;
- private boolean includeMemComponent;
private List<IIndexAccessor> indexAccessors;
private ISearchPredicate searchPred;
private ISearchOperationCallback searchCallback;
@@ -61,7 +61,6 @@
public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
LSMInvertedIndexSearchCursorInitialState lsmInitState = (LSMInvertedIndexSearchCursorInitialState) initialState;
harness = lsmInitState.getLSMHarness();
- includeMemComponent = lsmInitState.getIncludeMemComponent();
operationalComponents = lsmInitState.getOperationalComponents();
indexAccessors = lsmInitState.getIndexAccessors();
opCtx = lsmInitState.getOpContext();
@@ -72,16 +71,17 @@
// For searching the deleted-keys BTrees.
deletedKeysBTreeAccessors = lsmInitState.getDeletedKeysBTreeAccessors();
deletedKeysBTreeCursors = new IIndexCursor[deletedKeysBTreeAccessors.size()];
- int i = 0;
- if (includeMemComponent) {
- // No need for a bloom filter for the in-memory BTree.
- deletedKeysBTreeCursors[i] = deletedKeysBTreeAccessors.get(i).createSearchCursor();
- ++i;
- }
- for (; i < deletedKeysBTreeCursors.length; i++) {
- deletedKeysBTreeCursors[i] = new BloomFilterAwareBTreePointSearchCursor((IBTreeLeafFrame) lsmInitState
- .getgetDeletedKeysBTreeLeafFrameFactory().createFrame(), false,
- ((LSMInvertedIndexImmutableComponent) operationalComponents.get(i)).getBloomFilter());
+
+ for (int i = 0; i < operationalComponents.size(); i++) {
+ ILSMComponent component = operationalComponents.get(i);
+ if (component.getType() == LSMComponentType.MEMORY) {
+ // No need for a bloom filter for the in-memory BTree.
+ deletedKeysBTreeCursors[i] = deletedKeysBTreeAccessors.get(i).createSearchCursor();
+ } else {
+ deletedKeysBTreeCursors[i] = new BloomFilterAwareBTreePointSearchCursor((IBTreeLeafFrame) lsmInitState
+ .getgetDeletedKeysBTreeLeafFrameFactory().createFrame(), false,
+ ((LSMInvertedIndexImmutableComponent) operationalComponents.get(i)).getBloomFilter());
+ }
}
MultiComparator keyCmp = lsmInitState.getKeyComparator();
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 5ee6cb0..b253594 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -122,8 +122,8 @@
}
@Override
- public void setFlushStatus(boolean isFlushNeeded) {
- needsFlush[currentMutableComponentId.get()].set(isFlushNeeded);
+ public void requestFlush(boolean isFlushNeeded) {
+ flushRequests[currentMutableComponentId.get()].set(isFlushNeeded);
}
});
@@ -179,8 +179,7 @@
if (flushOnExit) {
BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(
ioOpCallbackProvider.getIOOperationCallback(this));
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
+ ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
accessor.scheduleFlush(cb);
try {
cb.waitForIO();
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 6a93e90..6bf07d4 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -54,7 +54,6 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractMutableLSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
@@ -168,12 +167,9 @@
}
@Override
- public boolean scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
ILSMComponent flushingComponent = ctx.getComponentHolder().get(0);
- if (!((AbstractMutableLSMComponent) flushingComponent).isModified()) {
- return false;
- }
LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
ILSMIndexOperationContext rctx = createOpContext(NoOpOperationCallback.INSTANCE);
rctx.setOperation(IndexOperation.FLUSH);
@@ -182,7 +178,6 @@
ioScheduler.scheduleOperation(new LSMRTreeFlushOperation(accessor, flushingComponent, componentFileRefs
.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), componentFileRefs
.getBloomFilterFileReference(), callback));
- return true;
}
@Override
@@ -296,9 +291,6 @@
ILSMIndexOperationContext rctx = createOpContext(NoOpOperationCallback.INSTANCE);
rctx.getComponentHolder().addAll(mergingComponents);
ITreeIndexCursor cursor = new LSMRTreeSortedCursor(rctx, linearizer);
- ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
- search(rctx, cursor, rtreeSearchPred);
-
rctx.setOperation(IndexOperation.MERGE);
LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents);
ILSMIndexAccessorInternal accessor = new LSMRTreeAccessor(lsmHarness, rctx);
@@ -312,6 +304,9 @@
throws HyracksDataException, IndexException {
LSMRTreeMergeOperation mergeOp = (LSMRTreeMergeOperation) operation;
ITreeIndexCursor cursor = mergeOp.getCursor();
+ ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
+ search(((LSMRTreeSortedCursor) cursor).getOpCtx(), cursor, rtreeSearchPred);
+
mergedComponents.addAll(mergeOp.getMergingComponents());
LSMRTreeImmutableComponent mergedComponent = createDiskComponent(componentFactory,
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
index b93eb1b..dd31165 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
@@ -36,6 +36,10 @@
reset();
}
+ public ILSMIndexOperationContext getOpCtx() {
+ return opCtx;
+ }
+
@Override
public void reset() throws HyracksDataException {
depletedRtreeCursors = new boolean[numberOfTrees];
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 6ccd681..0054dea 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -48,8 +48,8 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractMutableLSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
@@ -148,12 +148,9 @@
}
@Override
- public boolean scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
ILSMComponent flushingComponent = ctx.getComponentHolder().get(0);
- if (!((AbstractMutableLSMComponent) flushingComponent).isModified()) {
- return false;
- }
LSMRTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE);
LSMComponentFileReferences relFlushFileRefs = fileManager.getRelFlushFileReference();
opCtx.setOperation(IndexOperation.FLUSH);
@@ -161,7 +158,6 @@
ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, opCtx);
ioScheduler.scheduleOperation(new LSMRTreeFlushOperation(accessor, flushingComponent, relFlushFileRefs
.getInsertIndexFileReference(), null, null, callback));
- return true;
}
@Override
@@ -254,8 +250,6 @@
LSMRTreeOpContext rctx = createOpContext(NoOpOperationCallback.INSTANCE);
rctx.getComponentHolder().addAll(mergingComponents);
ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(ctx);
- ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
- search(rctx, cursor, (SearchPredicate) rtreeSearchPred);
rctx.setOperation(IndexOperation.MERGE);
LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents);
ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, rctx);
@@ -268,6 +262,9 @@
throws HyracksDataException, IndexException {
LSMRTreeMergeOperation mergeOp = (LSMRTreeMergeOperation) operation;
ITreeIndexCursor cursor = mergeOp.getCursor();
+ ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
+ search(((LSMIndexSearchCursor) cursor).getOpCtx(), cursor, (SearchPredicate) rtreeSearchPred);
+
mergedComponents.addAll(mergeOp.getMergingComponents());
// Nothing to merge.