Bug fixes.
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index 77bd040..5e07e0c 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -62,67 +62,74 @@
}
public void create() throws HyracksDataException {
- long resourceID = getResourceID();
- index = lcManager.getIndex(resourceID);
- if (index != null) {
- lcManager.unregister(resourceID);
- } else {
- index = createIndexInstance();
- }
+ synchronized (lcManager) {
+ long resourceID = getResourceID();
+ index = lcManager.getIndex(resourceID);
+ if (index != null) {
+ lcManager.unregister(resourceID);
+ } else {
+ index = createIndexInstance();
+ }
- // The previous resource ID needs to be removed since calling IIndex.create() may possibly destroy
- // any physical artifact that the LocalResourceRepository is managing (e.g. a file containing the resource ID).
- // Once the index has been created, a new resource ID can be generated.
- if (resourceID != -1) {
- localResourceRepository.deleteResourceByName(file.getFile().getPath());
+ // The previous resource ID needs to be removed since calling IIndex.create() may possibly destroy
+ // any physical artifact that the LocalResourceRepository is managing (e.g. a file containing the resource ID).
+ // Once the index has been created, a new resource ID can be generated.
+ if (resourceID != -1) {
+ localResourceRepository.deleteResourceByName(file.getFile().getPath());
+ }
+ index.create();
+ try {
+ //TODO Create LocalResource through LocalResourceFactory interface
+ resourceID = resourceIdFactory.createId();
+ ILocalResourceFactory localResourceFactory = opDesc.getLocalResourceFactoryProvider()
+ .getLocalResourceFactory();
+ localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, file.getFile()
+ .getPath(), partition));
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ lcManager.register(resourceID, index);
}
- index.create();
- try {
- //TODO Create LocalResource through LocalResourceFactory interface
- resourceID = resourceIdFactory.createId();
- ILocalResourceFactory localResourceFactory = opDesc.getLocalResourceFactoryProvider()
- .getLocalResourceFactory();
- localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, file.getFile()
- .getPath(), partition));
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- lcManager.register(resourceID, index);
-
}
public void open() throws HyracksDataException {
- long resourceID = getResourceID();
+ synchronized (lcManager) {
+ long resourceID = getResourceID();
- if (resourceID == -1) {
- throw new HyracksDataException("Index does not have a valid resource ID. Has it been created yet?");
- }
+ if (resourceID == -1) {
+ throw new HyracksDataException("Index does not have a valid resource ID. Has it been created yet?");
+ }
- index = lcManager.getIndex(resourceID);
- if (index == null) {
- index = createIndexInstance();
- lcManager.register(resourceID, index);
+ index = lcManager.getIndex(resourceID);
+ if (index == null) {
+ index = createIndexInstance();
+ lcManager.register(resourceID, index);
+ }
+ lcManager.open(resourceID);
}
- lcManager.open(resourceID);
}
public void close() throws HyracksDataException {
- lcManager.close(getResourceID());
+ synchronized (lcManager) {
+ lcManager.close(getResourceID());
+ }
}
public void destroy() throws HyracksDataException {
- long resourceID = getResourceID();
- index = lcManager.getIndex(resourceID);
- if (index != null) {
- lcManager.unregister(resourceID);
- } else {
- index = createIndexInstance();
- }
+ synchronized (lcManager) {
+ long resourceID = getResourceID();
+ index = lcManager.getIndex(resourceID);
+ if (index != null) {
+ lcManager.unregister(resourceID);
+ } else {
+ index = createIndexInstance();
+ }
- if (resourceID != -1) {
- localResourceRepository.deleteResourceByName(file.getFile().getPath());
+ if (resourceID != -1) {
+ localResourceRepository.deleteResourceByName(file.getFile().getPath());
+ }
+ index.destroy();
}
- index.destroy();
}
public FileReference getFileReference() {
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index a79daef..83fb5ee 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -119,7 +119,8 @@
FrameUtils.flushFrame(writeBuffer, writer);
appender.reset(writeBuffer, true);
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
+ throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size ("
+ + appender.getBuffer().capacity() + ")");
}
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 7c95480..660d5ea 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -438,11 +438,10 @@
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException {
LSMBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ opCtx.setOperation(IndexOperation.MERGE);
List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
- opCtx.getComponentHolder().addAll(mergingComponents);
ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx);
- opCtx.setOperation(IndexOperation.MERGE);
BTree firstBTree = (BTree) ((LSMBTreeImmutableComponent) mergingComponents.get(0)).getBTree();
BTree lastBTree = (BTree) ((LSMBTreeImmutableComponent) mergingComponents.get(mergingComponents.size() - 1))
.getBTree();
@@ -461,7 +460,9 @@
LSMBTreeMergeOperation mergeOp = (LSMBTreeMergeOperation) operation;
ITreeIndexCursor cursor = mergeOp.getCursor();
RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
- search(((LSMIndexSearchCursor) cursor).getOpCtx(), cursor, rangePred);
+ ILSMIndexOperationContext opCtx = ((LSMIndexSearchCursor) cursor).getOpCtx();
+ opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents());
+ search(opCtx, cursor, rangePred);
mergedComponents.addAll(mergeOp.getMergingComponents());
long numElements = 0L;
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index bb868dd..d832583 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -67,7 +67,6 @@
proceed = false;
}
- int kk = 0;
protected void checkPriorityQueue() throws HyracksDataException, IndexException {
while (!outputPriorityQueue.isEmpty() || needPush == true) {
if (!outputPriorityQueue.isEmpty()) {
@@ -128,8 +127,6 @@
// If there is no previous tuple or the previous tuple can be ignored
if (outputElement == null) {
if (isDeleted(checkElement)) {
- kk++;
- System.out.println(kk);
// If the key has been deleted then pop it and set needPush to true.
// We cannot push immediately because the tuple may be
// modified if hasNext() is called
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java
index 98ec6b9..14b86c9 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java
@@ -24,8 +24,7 @@
DISK
}
- public boolean threadEnter(LSMOperationType opType, boolean firstComponent) throws InterruptedException,
- HyracksDataException;
+ public boolean threadEnter(LSMOperationType opType, boolean firstComponent) throws HyracksDataException;
public void threadExit(LSMOperationType opType, boolean failedOperation, boolean firstComponent)
throws HyracksDataException;
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IMutableComponentAdderCallback.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IMutableComponentAdderCallback.java
deleted file mode 100644
index ba37740..0000000
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IMutableComponentAdderCallback.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.storage.am.lsm.common.api;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface IMutableComponentAdderCallback {
-
- public void addComponent() throws HyracksDataException;
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractImmutableLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractImmutableLSMComponent.java
index 31b96d2..f4091de 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractImmutableLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractImmutableLSMComponent.java
@@ -70,7 +70,6 @@
case MODIFICATION:
case SEARCH:
readerCount--;
-
if (readerCount == 0 && state == ComponentState.READABLE_MERGING) {
destroy();
state = ComponentState.KILLED;
@@ -81,6 +80,13 @@
}
}
+ public boolean isMergable() {
+ if (state == ComponentState.READABLE) {
+ return true;
+ }
+ return false;
+ }
+
@Override
public LSMComponentType getType() {
return LSMComponentType.DISK;
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java
index 4870baa..497bb3f 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java
@@ -18,7 +18,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.IMutableComponentAdderCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IMutableComponentSwitcherCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
@@ -29,7 +28,6 @@
private ComponentState state;
private final IVirtualBufferCache vbc;
- private IMutableComponentAdderCallback adderCallback;
private IMutableComponentSwitcherCallback switcherCallback;
private final AtomicBoolean isModified;
@@ -40,7 +38,7 @@
READABLE_UNWRITABLE,
READABLE_UNWRITABLE_FLUSHING,
UNREADABLE_UNWRITABLE,
- INACTIVE_READABLE_WRITABLE
+ INACTIVE
}
public AbstractMutableLSMComponent(IVirtualBufferCache vbc, boolean isActive) {
@@ -50,15 +48,14 @@
if (isActive) {
state = ComponentState.READABLE_WRITABLE;
} else {
- state = ComponentState.INACTIVE_READABLE_WRITABLE;
+ state = ComponentState.INACTIVE;
}
isModified = new AtomicBoolean();
}
@Override
- public boolean threadEnter(LSMOperationType opType, boolean firstComponent) throws InterruptedException,
- HyracksDataException {
- if (state == ComponentState.INACTIVE_READABLE_WRITABLE && requestedToBeActive) {
+ public boolean threadEnter(LSMOperationType opType, boolean firstComponent) throws HyracksDataException {
+ if (state == ComponentState.INACTIVE && requestedToBeActive) {
state = ComponentState.READABLE_WRITABLE;
requestedToBeActive = false;
}
@@ -96,27 +93,23 @@
}
break;
case SEARCH:
- if (state == ComponentState.UNREADABLE_UNWRITABLE) {
+ if (state == ComponentState.READABLE_WRITABLE || state == ComponentState.READABLE_UNWRITABLE
+ || state == ComponentState.READABLE_UNWRITABLE_FLUSHING) {
+ readerCount++;
+ } else {
return false;
}
- readerCount++;
break;
case FLUSH:
- if (state == ComponentState.READABLE_UNWRITABLE_FLUSHING
- || state == ComponentState.UNREADABLE_UNWRITABLE
- || state == ComponentState.INACTIVE_READABLE_WRITABLE) {
+ if (state == ComponentState.READABLE_WRITABLE || state == ComponentState.READABLE_UNWRITABLE) {
+ assert writerCount == 0;
+ state = ComponentState.READABLE_UNWRITABLE_FLUSHING;
+ switcherCallback.requestFlush(false);
+ switcherCallback.switchComponents();
+ readerCount++;
+ } else {
return false;
}
-
- state = ComponentState.READABLE_UNWRITABLE_FLUSHING;
- switcherCallback.requestFlush(false);
- synchronized (this) {
- while (writerCount > 0) {
- wait();
- }
- }
- switcherCallback.switchComponents();
- readerCount++;
break;
default:
throw new UnsupportedOperationException("Unsupported operation " + opType);
@@ -140,8 +133,7 @@
readerCount--;
if (state == ComponentState.UNREADABLE_UNWRITABLE && readerCount == 0) {
reset();
- adderCallback.addComponent();
- state = ComponentState.INACTIVE_READABLE_WRITABLE;
+ state = ComponentState.INACTIVE;
}
}
break;
@@ -149,8 +141,7 @@
readerCount--;
if (state == ComponentState.UNREADABLE_UNWRITABLE && readerCount == 0) {
reset();
- adderCallback.addComponent();
- state = ComponentState.INACTIVE_READABLE_WRITABLE;
+ state = ComponentState.INACTIVE;
}
break;
case FLUSH:
@@ -158,8 +149,7 @@
readerCount--;
if (readerCount == 0) {
reset();
- adderCallback.addComponent();
- state = ComponentState.INACTIVE_READABLE_WRITABLE;
+ state = ComponentState.INACTIVE;
} else {
state = ComponentState.UNREADABLE_UNWRITABLE;
}
@@ -167,13 +157,10 @@
default:
throw new UnsupportedOperationException("Unsupported operation " + opType);
}
- synchronized (this) {
- notifyAll();
- }
}
public boolean isReadable() {
- if (state == ComponentState.INACTIVE_READABLE_WRITABLE || state == ComponentState.UNREADABLE_UNWRITABLE) {
+ if (state == ComponentState.INACTIVE || state == ComponentState.UNREADABLE_UNWRITABLE) {
return false;
}
return true;
@@ -188,10 +175,6 @@
requestedToBeActive = true;
}
- public void registerOnResetCallback(IMutableComponentAdderCallback adderCallback) {
- this.adderCallback = adderCallback;
- }
-
public void registerOnFlushCallback(IMutableComponentSwitcherCallback switcherCallback) {
this.switcherCallback = switcherCallback;
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 7d9fc00..220dbf0 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -34,7 +34,6 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.IMutableComponentAdderCallback;
public class LSMHarness implements ILSMHarness {
private static final Logger LOGGER = Logger.getLogger(LSMHarness.class.getName());
@@ -74,9 +73,6 @@
opTracker.beforeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
ctx.getModificationCallback());
}
- } catch (InterruptedException e) {
- entranceSuccessful = false;
- throw new HyracksDataException(e);
} finally {
if (!entranceSuccessful) {
int i = 0;
@@ -137,7 +133,6 @@
} finally {
exitComponents(ctx, opType, false);
}
-
return true;
}
@@ -148,10 +143,7 @@
getAndEnterComponents(ctx, opType, false);
try {
lsmIndex.search(ctx, cursor, pred);
- } catch (HyracksDataException e) {
- exitComponents(ctx, opType, true);
- throw e;
- } catch (IndexException e) {
+ } catch (HyracksDataException | IndexException e) {
exitComponents(ctx, opType, true);
throw e;
}
@@ -175,15 +167,6 @@
if (!((AbstractMutableLSMComponent) flushingComponent).isModified()) {
callback.beforeOperation();
callback.afterOperation(null, null);
- AbstractMutableLSMComponent mutableComponent = (AbstractMutableLSMComponent) ctx.getComponentHolder()
- .get(0);
- mutableComponent.registerOnResetCallback(new IMutableComponentAdderCallback() {
-
- @Override
- public void addComponent() throws HyracksDataException {
- // Do nothing
- }
- });
exitComponents(ctx, opType, false);
callback.afterFinalize(null);
} else {
@@ -194,31 +177,21 @@
@Override
public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
IndexException {
- operation.getCallback().beforeOperation();
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(lsmIndex + ": flushing");
}
- final ILSMComponent newComponent = lsmIndex.flush(operation);
+ operation.getCallback().beforeOperation();
+ ILSMComponent newComponent = lsmIndex.flush(operation);
operation.getCallback().afterOperation(null, newComponent);
lsmIndex.markAsValid(newComponent);
- AbstractMutableLSMComponent flushingComponent = (AbstractMutableLSMComponent) ctx.getComponentHolder().get(0);
- flushingComponent.registerOnResetCallback(new IMutableComponentAdderCallback() {
-
- @Override
- public void addComponent() throws HyracksDataException {
- lsmIndex.addComponent(newComponent);
- try {
- mergePolicy.diskComponentAdded(lsmIndex);
- } catch (IndexException e) {
- throw new HyracksDataException(e);
- }
- }
- });
-
- exitComponents(ctx, LSMOperationType.FLUSH, false);
- operation.getCallback().afterFinalize(newComponent);
+ synchronized (opTracker) {
+ lsmIndex.addComponent(newComponent);
+ mergePolicy.diskComponentAdded(lsmIndex);
+ exitComponents(ctx, LSMOperationType.FLUSH, false);
+ operation.getCallback().afterFinalize(newComponent);
+ }
}
@Override
@@ -238,13 +211,13 @@
@Override
public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
IndexException {
- List<ILSMComponent> mergedComponents = new ArrayList<ILSMComponent>();
- operation.getCallback().beforeOperation();
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(lsmIndex + ": merging");
}
+
+ List<ILSMComponent> mergedComponents = new ArrayList<ILSMComponent>();
+ operation.getCallback().beforeOperation();
ILSMComponent newComponent = lsmIndex.merge(mergedComponents, operation);
- ctx.getComponentHolder().addAll(mergedComponents);
operation.getCallback().afterOperation(mergedComponents, newComponent);
lsmIndex.markAsValid(newComponent);
lsmIndex.subsumeMergedComponents(newComponent, mergedComponents);
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index b966426..5d4b88e 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -540,11 +540,11 @@
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException {
LSMInvertedIndexOpContext ictx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ ictx.setOperation(IndexOperation.MERGE);
List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
ictx.getComponentHolder().addAll(mergingComponents);
IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(ictx);
- ictx.setOperation(IndexOperation.MERGE);
LSMInvertedIndexImmutableComponent firstComponent = (LSMInvertedIndexImmutableComponent) mergingComponents
.get(0);
OnDiskInvertedIndex firstInvIndex = (OnDiskInvertedIndex) firstComponent.getInvIndex();
@@ -569,8 +569,10 @@
IIndexCursor cursor = mergeOp.getCursor();
RangePredicate mergePred = new RangePredicate(null, null, true, true, null, null);
+ ILSMIndexOperationContext opCtx = ((LSMIndexSearchCursor) cursor).getOpCtx();
+ opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents());
// Scan diskInvertedIndexes ignoring the memoryInvertedIndex.
- search(((LSMIndexSearchCursor) cursor).getOpCtx(), cursor, mergePred);
+ search(opCtx, cursor, mergePred);
// Create an inverted index instance.
LSMInvertedIndexImmutableComponent component = createDiskInvIndexComponent(componentFactory,
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 6bf07d4..cb6e166 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -55,6 +55,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
@@ -287,11 +288,11 @@
@Override
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException {
- List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
ILSMIndexOperationContext rctx = createOpContext(NoOpOperationCallback.INSTANCE);
+ rctx.setOperation(IndexOperation.MERGE);
+ List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
rctx.getComponentHolder().addAll(mergingComponents);
ITreeIndexCursor cursor = new LSMRTreeSortedCursor(rctx, linearizer);
- rctx.setOperation(IndexOperation.MERGE);
LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents);
ILSMIndexAccessorInternal accessor = new LSMRTreeAccessor(lsmHarness, rctx);
ioScheduler.scheduleOperation(new LSMRTreeMergeOperation((ILSMIndexAccessorInternal) accessor,
@@ -305,8 +306,9 @@
LSMRTreeMergeOperation mergeOp = (LSMRTreeMergeOperation) operation;
ITreeIndexCursor cursor = mergeOp.getCursor();
ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
- search(((LSMRTreeSortedCursor) cursor).getOpCtx(), cursor, rtreeSearchPred);
-
+ ILSMIndexOperationContext opCtx = ((LSMRTreeSortedCursor) cursor).getOpCtx();
+ opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents());
+ search(opCtx, cursor, rtreeSearchPred);
mergedComponents.addAll(mergeOp.getMergingComponents());
LSMRTreeImmutableComponent mergedComponent = createDiskComponent(componentFactory,
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 0054dea..965f39b 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -246,11 +246,11 @@
@Override
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException {
- List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
LSMRTreeOpContext rctx = createOpContext(NoOpOperationCallback.INSTANCE);
+ rctx.setOperation(IndexOperation.MERGE);
+ List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
rctx.getComponentHolder().addAll(mergingComponents);
ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(ctx);
- rctx.setOperation(IndexOperation.MERGE);
LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents);
ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, rctx);
ioScheduler.scheduleOperation(new LSMRTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
@@ -263,16 +263,11 @@
LSMRTreeMergeOperation mergeOp = (LSMRTreeMergeOperation) operation;
ITreeIndexCursor cursor = mergeOp.getCursor();
ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
- search(((LSMIndexSearchCursor) cursor).getOpCtx(), cursor, (SearchPredicate) rtreeSearchPred);
-
+ ILSMIndexOperationContext opCtx = ((LSMIndexSearchCursor) cursor).getOpCtx();
+ opCtx.getComponentHolder().addAll(mergeOp.getMergingComponents());
+ search(opCtx, cursor, rtreeSearchPred);
mergedComponents.addAll(mergeOp.getMergingComponents());
- // Nothing to merge.
- if (mergedComponents.size() <= 1) {
- cursor.close();
- return null;
- }
-
// Bulk load the tuples from all on-disk RTrees into the new RTree.
LSMRTreeImmutableComponent component = createDiskComponent(componentFactory, mergeOp.getRTreeMergeTarget(),
null, null, true);