Support for full compation.
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
index 91207e8..b5c0a1d 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
@@ -25,5 +25,6 @@
PHYSICALDELETE,
NOOP,
MERGE,
+ FULL_MERGE,
FLUSH
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index e9327c6..4330313 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -245,18 +245,18 @@
int cmc = currentMutableComponentId.get();
ctx.setCurrentMutableComponentId(cmc);
int numMutableComponents = memoryComponents.size();
+ operationalComponents.clear();
switch (ctx.getOperation()) {
case UPDATE:
case UPSERT:
case PHYSICALDELETE:
case FLUSH:
case DELETE:
- operationalComponents.clear();
operationalComponents.add(memoryComponents.get(cmc));
break;
case SEARCH:
case INSERT:
- operationalComponents.clear();
+
for (int i = 0; i < numMutableComponents - 1; i++) {
ILSMComponent c = memoryComponents.get((cmc + i + 1) % numMutableComponents);
LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
@@ -270,7 +270,10 @@
operationalComponents.addAll(immutableComponents);
break;
case MERGE:
- // The merger is responsible of choosing and adding the components that are targeted for the merge operation.
+ operationalComponents.addAll(ctx.getComponentsToBeMerged());
+ break;
+ case FULL_MERGE:
+ operationalComponents.addAll(immutableComponents);
break;
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index 6d2d7c0..cb7bae7 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@ -48,6 +48,7 @@
public final IModificationOperationCallback modificationCallback;
public final ISearchOperationCallback searchCallback;
private final List<ILSMComponent> componentHolder;
+ private final List<ILSMComponent> componentsToBeMerged;
public LSMBTreeOpContext(List<ILSMComponent> mutableComponents, ITreeIndexFrameFactory insertLeafFrameFactory,
ITreeIndexFrameFactory deleteLeafFrameFactory, IModificationOperationCallback modificationCallback,
@@ -84,6 +85,7 @@
deleteLeafFrame.setMultiComparator(cmp);
}
this.componentHolder = new LinkedList<ILSMComponent>();
+ this.componentsToBeMerged = new LinkedList<ILSMComponent>();
this.modificationCallback = modificationCallback;
this.searchCallback = searchCallback;
}
@@ -107,6 +109,7 @@
@Override
public void reset() {
componentHolder.clear();
+ componentsToBeMerged.clear();
}
public IndexOperation getOperation() {
@@ -153,4 +156,9 @@
break;
}
}
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeMerged() {
+ return componentsToBeMerged;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index bc7cbf7..1903998 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -37,6 +37,9 @@
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException;
+ public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException;
+
public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
IndexException;
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index fd7b82f..36a2ca1 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -35,6 +35,8 @@
public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
throws HyracksDataException, IndexException;
+ public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException;
+
/**
* Deletes the tuple from the memory component only.
*
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
index fcd4037..80264bc 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
@@ -22,6 +22,8 @@
public interface ILSMIndexOperationContext extends IIndexOperationContext {
public List<ILSMComponent> getComponentHolder();
+
+ public List<ILSMComponent> getComponentsToBeMerged();
public ISearchOperationCallback getSearchOperationCallback();
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
index 1473071..ebd7020 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
@@ -19,5 +19,6 @@
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
public interface ILSMMergePolicy {
- public void diskComponentAdded(ILSMIndex index) throws HyracksDataException, IndexException;
+ public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
+ IndexException;
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
new file mode 100644
index 0000000..a61dec4
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+
+public class LSMIndexCompactOperatorNodePushable extends AbstractOperatorNodePushable {
+ private final IIndexDataflowHelper indexHelper;
+
+ public LSMIndexCompactOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition) {
+ this.indexHelper = opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(opDesc, ctx, partition);
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+ indexHelper.close();
+ }
+
+ @Override
+ public int getInputArity() {
+ return 0;
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ return null;
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ indexHelper.open();
+ ILSMIndex index = (ILSMIndex) indexHelper.getIndexInstance();
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ try {
+ accessor.scheduleFullMerge(NoOpIOOperationCallback.INSTANCE);
+ } catch (Exception e) {
+ indexHelper.close();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
new file mode 100644
index 0000000..3c40f94
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
+
+public class LSMTreeIndexCompactOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public LSMTreeIndexCompactOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+ IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields,
+ IIndexDataflowHelperFactory dataflowHelperFactory,
+ IModificationOperationCallbackFactory modificationOpCallbackProvider) {
+ super(spec, 0, 0, null, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, null, false,
+ NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE,
+ modificationOpCallbackProvider);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new LSMIndexCompactOperatorNodePushable(this, ctx, partition);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index a4af2b7..c90e093 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -15,13 +15,13 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-import java.util.ArrayList;
import java.util.List;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
@@ -35,9 +35,19 @@
}
@Override
- public void diskComponentAdded(final ILSMIndex index) throws HyracksDataException, IndexException {
- List<ILSMComponent> immutableComponents = new ArrayList<ILSMComponent>(index.getImmutableComponents());
- if (immutableComponents.size() >= threshold) {
+ public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
+ IndexException {
+ List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+ for (ILSMComponent c : immutableComponents) {
+ if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+ return;
+ }
+ }
+ if (fullMergeIsRequested) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFullMerge(NoOpIOOperationCallback.INSTANCE);
+ } else if (immutableComponents.size() >= threshold) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, immutableComponents);
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index cd66937..526db2a 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -16,6 +16,7 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -41,11 +42,13 @@
private final ILSMIndexInternal lsmIndex;
private final ILSMMergePolicy mergePolicy;
private final ILSMOperationTracker opTracker;
+ private final AtomicBoolean fullMergeIsRequested;
public LSMHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker) {
this.lsmIndex = lsmIndex;
this.opTracker = opTracker;
this.mergePolicy = mergePolicy;
+ fullMergeIsRequested = new AtomicBoolean();
}
private boolean getAndEnterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, boolean isTryOperation)
@@ -177,14 +180,14 @@
// newComponent is null if the flush op. was not performed.
if (newComponent != null) {
lsmIndex.addComponent(newComponent);
- mergePolicy.diskComponentAdded(lsmIndex);
+ mergePolicy.diskComponentAdded(lsmIndex, false);
}
break;
case MERGE:
// newComponent is null if the merge op. was not performed.
if (newComponent != null) {
lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder());
- mergePolicy.diskComponentAdded(lsmIndex);
+ mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
}
break;
default:
@@ -288,7 +291,6 @@
@Override
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException {
- // Merge should always be a try operation, because it should never fail to enter the components unless the merge policy is erroneous.
if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
callback.beforeOperation();
callback.afterOperation(null, null);
@@ -299,6 +301,22 @@
}
@Override
+ public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException {
+ fullMergeIsRequested.set(true);
+ if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
+ // If the merge cannot be scheduled because there is already an ongoing merge on subset/all of the components, then
+ // whenever the current merge has finished, it will schedule the full merge again.
+ callback.beforeOperation();
+ callback.afterOperation(null, null);
+ callback.afterFinalize(null);
+ return;
+ }
+ fullMergeIsRequested.set(false);
+ lsmIndex.scheduleMerge(ctx, callback);
+ }
+
+ @Override
public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
IndexException {
if (LOGGER.isLoggable(Level.INFO)) {
@@ -324,7 +342,7 @@
public void addBulkLoadedComponent(ILSMComponent c) throws HyracksDataException, IndexException {
lsmIndex.markAsValid(c);
lsmIndex.addComponent(c);
- mergePolicy.diskComponentAdded(lsmIndex);
+ mergePolicy.diskComponentAdded(lsmIndex, false);
}
@Override
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index b0c87f1..c828bd2 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -122,12 +122,18 @@
public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.MERGE);
- ctx.getComponentHolder().clear();
- ctx.getComponentHolder().addAll(components);
+ ctx.getComponentsToBeMerged().clear();
+ ctx.getComponentsToBeMerged().addAll(components);
lsmHarness.scheduleMerge(ctx, callback);
}
@Override
+ public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.FULL_MERGE);
+ lsmHarness.scheduleFullMerge(ctx, callback);
+ }
+
+ @Override
public void forcePhysicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.PHYSICALDELETE);
lsmHarness.forceModify(ctx, tuple);
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
index 17d1b17..02bae37 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
@@ -21,7 +21,7 @@
INSTANCE;
@Override
- public void diskComponentAdded(ILSMIndex index) {
+ public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested) {
// Do nothing
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexCompactOperator.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexCompactOperator.java
new file mode 100644
index 0000000..22e7505
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexCompactOperator.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMIndexCompactOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
+
+public class LSMInvertedIndexCompactOperator extends AbstractLSMInvertedIndexOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public LSMInvertedIndexCompactOperator(IOperatorDescriptorRegistry spec, IStorageManagerInterface storageManager,
+ IFileSplitProvider fileSplitProvider, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+ ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenComparatorFactories,
+ ITypeTraits[] invListsTypeTraits, IBinaryComparatorFactory[] invListComparatorFactories,
+ IBinaryTokenizerFactory tokenizerFactory, IIndexDataflowHelperFactory dataflowHelperFactory,
+ IModificationOperationCallbackFactory modificationOpCallbackFactory) {
+ super(spec, 1, 1, null, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
+ tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
+ dataflowHelperFactory, null, false, NoOpLocalResourceFactoryProvider.INSTANCE,
+ NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackFactory);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new LSMIndexCompactOperatorNodePushable(this, ctx, partition);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 59200aa..3f50c87 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -271,15 +271,14 @@
int cmc = currentMutableComponentId.get();
ctx.setCurrentMutableComponentId(cmc);
int numMutableComponents = memoryComponents.size();
+ operationalComponents.clear();
switch (ctx.getOperation()) {
case FLUSH:
case DELETE:
case INSERT:
- operationalComponents.clear();
operationalComponents.add(memoryComponents.get(cmc));
break;
case SEARCH:
- operationalComponents.clear();
for (int i = 0; i < numMutableComponents - 1; i++) {
ILSMComponent c = memoryComponents.get((cmc + i + 1) % numMutableComponents);
LSMInvertedIndexMemoryComponent mutableComponent = (LSMInvertedIndexMemoryComponent) c;
@@ -293,8 +292,10 @@
operationalComponents.addAll(immutableComponents);
break;
case MERGE:
- // The merger is responsible of choosing and adding the components that are targeted for the merge operation.
+ operationalComponents.addAll(ctx.getComponentsToBeMerged());
break;
+ case FULL_MERGE:
+ operationalComponents.addAll(immutableComponents);
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index 1d4a313..7e34dfe 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -90,12 +90,18 @@
public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.MERGE);
- ctx.getComponentHolder().clear();
- ctx.getComponentHolder().addAll(components);
+ ctx.getComponentsToBeMerged().clear();
+ ctx.getComponentsToBeMerged().addAll(components);
lsmHarness.scheduleMerge(ctx, callback);
}
@Override
+ public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.FULL_MERGE);
+ lsmHarness.scheduleFullMerge(ctx, callback);
+ }
+
+ @Override
public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException {
lsmHarness.merge(ctx, operation);
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
index 1a9303f..671e3f8 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
@@ -35,7 +35,8 @@
private IndexOperation op;
private final List<ILSMComponent> componentHolder;
-
+ private final List<ILSMComponent> componentsToBeMerged;
+
public final IModificationOperationCallback modificationCallback;
public final ISearchOperationCallback searchCallback;
@@ -54,6 +55,7 @@
IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback)
throws HyracksDataException {
this.componentHolder = new LinkedList<ILSMComponent>();
+ this.componentsToBeMerged = new LinkedList<ILSMComponent>();
this.modificationCallback = modificationCallback;
this.searchCallback = searchCallback;
@@ -84,6 +86,7 @@
@Override
public void reset() {
componentHolder.clear();
+ componentsToBeMerged.clear();
}
@Override
@@ -118,4 +121,9 @@
currentMutableInvIndexAccessors = mutableInvIndexAccessors[currentMutableComponentId];
currentDeletedKeysBTreeAccessors = deletedKeysBTreeAccessors[currentMutableComponentId];
}
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeMerged() {
+ return componentsToBeMerged;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 00a011c..0d9293a 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -201,15 +201,14 @@
int cmc = currentMutableComponentId.get();
ctx.setCurrentMutableComponentId(cmc);
int numMutableComponents = memoryComponents.size();
+ operationalComponents.clear();
switch (ctx.getOperation()) {
case INSERT:
case DELETE:
case FLUSH:
- operationalComponents.clear();
operationalComponents.add(memoryComponents.get(cmc));
break;
case SEARCH:
- operationalComponents.clear();
for (int i = 0; i < numMutableComponents - 1; i++) {
ILSMComponent c = memoryComponents.get((cmc + i + 1) % numMutableComponents);
LSMRTreeMemoryComponent mutableComponent = (LSMRTreeMemoryComponent) c;
@@ -223,8 +222,10 @@
operationalComponents.addAll(immutableComponents);
break;
case MERGE:
- // The merger is responsible of choosing and adding the components that are targeted for the merge operation.
+ operationalComponents.addAll(ctx.getComponentsToBeMerged());
break;
+ case FULL_MERGE:
+ operationalComponents.addAll(immutableComponents);
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
index b94feba..132e55b 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
@@ -48,6 +48,7 @@
private IndexOperation op;
public final List<ILSMComponent> componentHolder;
+ private final List<ILSMComponent> componentsToBeMerged;
public final IModificationOperationCallback modificationCallback;
public final ISearchOperationCallback searchCallback;
@@ -76,6 +77,7 @@
currentRTreeOpContext = rtreeOpContexts[0];
currentBTreeOpContext = btreeOpContexts[0];
this.componentHolder = new LinkedList<ILSMComponent>();
+ this.componentsToBeMerged = new LinkedList<ILSMComponent>();
this.modificationCallback = modificationCallback;
this.searchCallback = searchCallback;
}
@@ -101,6 +103,7 @@
@Override
public void reset() {
componentHolder.clear();
+ componentsToBeMerged.clear();
}
@Override
@@ -126,4 +129,9 @@
public IModificationOperationCallback getModificationCallback() {
return modificationCallback;
}
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeMerged() {
+ return componentsToBeMerged;
+ }
}
\ No newline at end of file