Merge branch 'master' into raman/master_feeds
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
index 1bd22d7..5ac2961 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
@@ -19,6 +19,8 @@
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
import org.junit.Before;
import org.junit.Test;
@@ -83,7 +85,11 @@
TestStorageManagerComponentHolder.init(8192, 20, 20);
}
- protected static final int MERGE_THRESHOLD = 3;
+ protected static final Map<String, String> MERGE_POLICY_PROPERTIES;
+ static {
+ MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+ MERGE_POLICY_PROPERTIES.put("num-components", "3");
+ }
protected IVirtualBufferCacheProvider virtualBufferCacheProvider = new TestVirtualBufferCacheProvider(
DEFAULT_MEM_PAGE_SIZE, DEFAULT_MEM_NUM_PAGES);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
index fc932a3..8addae7 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
@@ -26,7 +26,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -52,7 +52,7 @@
PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
invertedIndexDataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
- virtualBufferCacheProvider, new ConstantMergePolicyProvider(MERGE_THRESHOLD),
+ virtualBufferCacheProvider, new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES,
ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
NoOpIOOperationCallback.INSTANCE, DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
index 75045b4..82de0c9 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
@@ -24,7 +24,7 @@
import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -47,9 +47,9 @@
PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
invertedIndexDataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(virtualBufferCacheProvider,
- new ConstantMergePolicyProvider(MERGE_THRESHOLD), ThreadCountingOperationTrackerProvider.INSTANCE,
- SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE,
- DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
+ new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES,
+ ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
+ NoOpIOOperationCallback.INSTANCE, DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
}
@Override
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
index e98ecde..a4fd4c1 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
@@ -15,10 +15,13 @@
package edu.uci.ics.hyracks.tests.am.lsm.btree;
+import java.util.HashMap;
+import java.util.Map;
+
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -26,15 +29,19 @@
public class LSMBTreeOperatorTestHelper extends LSMTreeOperatorTestHelper {
- private static final int MERGE_THRESHOLD = 3;
-
+ private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+ static {
+ MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+ MERGE_POLICY_PROPERTIES.put("num-components", "3");
+ }
+
public LSMBTreeOperatorTestHelper(IOManager ioManager) {
super(ioManager);
}
public IIndexDataflowHelperFactory createDataFlowHelperFactory() {
- return new LSMBTreeDataflowHelperFactory(virtualBufferCacheProvider, new ConstantMergePolicyProvider(
- MERGE_THRESHOLD), ThreadCountingOperationTrackerProvider.INSTANCE,
+ return new LSMBTreeDataflowHelperFactory(virtualBufferCacheProvider, new ConstantMergePolicyFactory(),
+ MERGE_POLICY_PROPERTIES, ThreadCountingOperationTrackerProvider.INSTANCE,
SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE,
DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
index e7478d6..79b8eb7 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
@@ -15,12 +15,15 @@
package edu.uci.ics.hyracks.tests.am.lsm.rtree;
+import java.util.HashMap;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -30,7 +33,11 @@
public class LSMRTreeOperatorTestHelper extends LSMTreeOperatorTestHelper {
- private static final int MERGE_THRESHOLD = 3;
+ private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+ static {
+ MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+ MERGE_POLICY_PROPERTIES.put("num-components", "3");
+ }
public LSMRTreeOperatorTestHelper(IOManager ioManager) {
super(ioManager);
@@ -40,7 +47,7 @@
IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
IBinaryComparatorFactory[] btreeComparatorFactories, ILinearizeComparatorFactory linearizerCmpFactory) {
return new LSMRTreeDataflowHelperFactory(valueProviderFactories, rtreePolicyType, btreeComparatorFactories,
- virtualBufferCacheProvider, new ConstantMergePolicyProvider(MERGE_THRESHOLD),
+ virtualBufferCacheProvider, new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES,
ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
NoOpIOOperationCallback.INSTANCE, linearizerCmpFactory, DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
index f883d90..fd6171a 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
@@ -15,12 +15,15 @@
package edu.uci.ics.hyracks.tests.am.lsm.rtree;
+import java.util.HashMap;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -30,7 +33,11 @@
public class LSMRTreeWithAntiMatterTuplesOperatorTestHelper extends LSMTreeOperatorTestHelper {
- private static final int MERGE_THRESHOLD = 3;
+ private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+ static {
+ MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+ MERGE_POLICY_PROPERTIES.put("num-components", "3");
+ }
public LSMRTreeWithAntiMatterTuplesOperatorTestHelper(IOManager ioManager) {
super(ioManager);
@@ -40,9 +47,8 @@
IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
IBinaryComparatorFactory[] btreeComparatorFactories, ILinearizeComparatorFactory linearizerCmpFactory) {
return new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(valueProviderFactories, rtreePolicyType,
- btreeComparatorFactories, virtualBufferCacheProvider, new ConstantMergePolicyProvider(MERGE_THRESHOLD),
- ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
- NoOpIOOperationCallback.INSTANCE, linearizerCmpFactory);
+ btreeComparatorFactories, virtualBufferCacheProvider, new ConstantMergePolicyFactory(),
+ MERGE_POLICY_PROPERTIES, ThreadCountingOperationTrackerProvider.INSTANCE,
+ SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE, linearizerCmpFactory);
}
-
}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
index 91207e8..b5c0a1d 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
@@ -25,5 +25,6 @@
PHYSICALDELETE,
NOOP,
MERGE,
+ FULL_MERGE,
FLUSH
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
index 104a70d..0fdef13 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
@@ -15,12 +15,14 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -30,11 +32,11 @@
private static final long serialVersionUID = 1L;
public LSMBTreeDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
- ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerFactory,
- ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
- double bloomFilterFalsePositiveRate) {
- super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerFactory, ioSchedulerProvider,
- ioOpCallbackFactory, bloomFilterFalsePositiveRate);
+ ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+ ILSMOperationTrackerProvider opTrackerFactory, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory, double bloomFilterFalsePositiveRate) {
+ super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerFactory,
+ ioSchedulerProvider, ioOpCallbackFactory, bloomFilterFalsePositiveRate);
}
@Override
@@ -42,7 +44,7 @@
int partition) {
return new LSMBTreeDataflowHelper(opDesc, ctx, partition,
virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
- mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
- ioOpCallbackFactory);
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+ ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory);
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 33ae7bb..3e51e20 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -242,10 +242,10 @@
public void getOperationalComponents(ILSMIndexOperationContext ctx) {
List<ILSMComponent> immutableComponents = diskComponents;
List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
- operationalComponents.clear();
int cmc = currentMutableComponentId.get();
ctx.setCurrentMutableComponentId(cmc);
int numMutableComponents = memoryComponents.size();
+ operationalComponents.clear();
switch (ctx.getOperation()) {
case UPDATE:
case UPSERT:
@@ -256,6 +256,7 @@
break;
case SEARCH:
case INSERT:
+
for (int i = 0; i < numMutableComponents - 1; i++) {
ILSMComponent c = memoryComponents.get((cmc + i + 1) % numMutableComponents);
LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
@@ -269,6 +270,9 @@
operationalComponents.addAll(immutableComponents);
break;
case MERGE:
+ operationalComponents.addAll(ctx.getComponentsToBeMerged());
+ break;
+ case FULL_MERGE:
operationalComponents.addAll(immutableComponents);
break;
default:
@@ -423,8 +427,12 @@
LSMBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
opCtx.setOperation(IndexOperation.MERGE);
List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
- ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx);
-
+ boolean returnDeletedTuples = false;
+ if (ctx.getComponentHolder().get(ctx.getComponentHolder().size() - 1) != diskComponents.get(diskComponents
+ .size() - 1)) {
+ returnDeletedTuples = true;
+ }
+ ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples);
BTree firstBTree = (BTree) ((LSMBTreeDiskComponent) mergingComponents.get(0)).getBTree();
BTree lastBTree = (BTree) ((LSMBTreeDiskComponent) mergingComponents.get(mergingComponents.size() - 1))
.getBTree();
@@ -455,8 +463,8 @@
int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
BloomFilterSpecification bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement,
bloomFilterFalsePositiveRate);
- LSMBTreeDiskComponent mergedComponent = createDiskComponent(componentFactory,
- mergeOp.getBTreeMergeTarget(), mergeOp.getBloomFilterMergeTarget(), true);
+ LSMBTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getBTreeMergeTarget(),
+ mergeOp.getBloomFilterMergeTarget(), true);
IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements, false);
IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements,
@@ -476,9 +484,8 @@
return mergedComponent;
}
- private LSMBTreeDiskComponent createDiskComponent(LSMBTreeDiskComponentFactory factory,
- FileReference btreeFileRef, FileReference bloomFilterFileRef, boolean createComponent)
- throws HyracksDataException, IndexException {
+ private LSMBTreeDiskComponent createDiskComponent(LSMBTreeDiskComponentFactory factory, FileReference btreeFileRef,
+ FileReference bloomFilterFileRef, boolean createComponent) throws HyracksDataException, IndexException {
// Create new BTree instance.
LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) factory
.createLSMComponentInstance(new LSMComponentFileReferences(btreeFileRef, null, bloomFilterFileRef));
@@ -541,8 +548,8 @@
} catch (HyracksDataException | IndexException e) {
throw new TreeIndexException(e);
}
- bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(
- fillFactor, verifyInput, numElementsHint, false);
+ bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor,
+ verifyInput, numElementsHint, false);
int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
BloomFilterSpecification bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement,
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index 0b2d7cf..381b012 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -43,4 +43,9 @@
public BloomFilter getBloomFilter() {
return bloomFilter;
}
+
+ @Override
+ public long getComponentSize() {
+ return btree.getFileReference().getFile().length() + bloomFilter.getFileReference().getFile().length();
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index 6d2d7c0..cb7bae7 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@ -48,6 +48,7 @@
public final IModificationOperationCallback modificationCallback;
public final ISearchOperationCallback searchCallback;
private final List<ILSMComponent> componentHolder;
+ private final List<ILSMComponent> componentsToBeMerged;
public LSMBTreeOpContext(List<ILSMComponent> mutableComponents, ITreeIndexFrameFactory insertLeafFrameFactory,
ITreeIndexFrameFactory deleteLeafFrameFactory, IModificationOperationCallback modificationCallback,
@@ -84,6 +85,7 @@
deleteLeafFrame.setMultiComparator(cmp);
}
this.componentHolder = new LinkedList<ILSMComponent>();
+ this.componentsToBeMerged = new LinkedList<ILSMComponent>();
this.modificationCallback = modificationCallback;
this.searchCallback = searchCallback;
}
@@ -107,6 +109,7 @@
@Override
public void reset() {
componentHolder.clear();
+ componentsToBeMerged.clear();
}
public IndexOperation getOperation() {
@@ -153,4 +156,9 @@
break;
}
}
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeMerged() {
+ return componentsToBeMerged;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 6eada4b..668a12c 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -49,7 +49,11 @@
private boolean proceed = true;
public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx) {
- super(opCtx);
+ this(opCtx, false);
+ }
+
+ public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
+ super(opCtx, returnDeletedTuples);
this.copyTuple = new ArrayTupleReference();
this.reusablePred = new RangePredicate(null, null, true, true, null, null);
}
@@ -126,7 +130,7 @@
}
// If there is no previous tuple or the previous tuple can be ignored
if (outputElement == null) {
- if (isDeleted(checkElement)) {
+ if (isDeleted(checkElement) && !returnDeletedTuples) {
// If the key has been deleted then pop it and set needPush to true.
// We cannot push immediately because the tuple may be
// modified if hasNext() is called
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index bc7cbf7..1903998 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -37,6 +37,9 @@
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException;
+ public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException;
+
public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
IndexException;
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index 3405b60..36a2ca1 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
@@ -30,7 +32,10 @@
public interface ILSMIndexAccessor extends IIndexAccessor {
public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException;
- public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException;
+ public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
+ throws HyracksDataException, IndexException;
+
+ public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException;
/**
* Deletes the tuple from the memory component only.
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
index fcd4037..80264bc 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
@@ -22,6 +22,8 @@
public interface ILSMIndexOperationContext extends IIndexOperationContext {
public List<ILSMComponent> getComponentHolder();
+
+ public List<ILSMComponent> getComponentsToBeMerged();
public ISearchOperationCallback getSearchOperationCallback();
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
index 1473071..279605f 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
@@ -15,9 +15,14 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
public interface ILSMMergePolicy {
- public void diskComponentAdded(ILSMIndex index) throws HyracksDataException, IndexException;
+ public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
+ IndexException;
+
+ public void configure(Map<String, String> properties);
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
similarity index 73%
rename from hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java
rename to hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
index cf56750..c90b5f7 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
@@ -15,9 +15,13 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
import java.io.Serializable;
+import java.util.Map;
+import java.util.Set;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+public interface ILSMMergePolicyFactory extends Serializable {
+ public ILSMMergePolicy createMergePolicy(Map<String, String> configuration);
-public interface ILSMMergePolicyProvider extends Serializable {
- public ILSMMergePolicy getMergePolicy(IHyracksTaskContext ctx);
-}
+ public String getName();
+
+ public Set<String> getPropertiesNames();
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java
index 2c082bb..4276ba7 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java
@@ -15,10 +15,12 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
@@ -26,21 +28,23 @@
protected static final long serialVersionUID = 1L;
protected final IVirtualBufferCacheProvider virtualBufferCacheProvider;
- protected final ILSMMergePolicyProvider mergePolicyProvider;
+ protected final ILSMMergePolicyFactory mergePolicyFactory;
+ protected final Map<String, String> mergePolicyProperties;
protected final ILSMOperationTrackerProvider opTrackerFactory;
protected final ILSMIOOperationSchedulerProvider ioSchedulerProvider;
protected final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
protected final double bloomFilterFalsePositiveRate;
public AbstractLSMIndexDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
- ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerFactory,
- ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
- double bloomFilterFalsePositiveRate) {
+ ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+ ILSMOperationTrackerProvider opTrackerFactory, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory, double bloomFilterFalsePositiveRate) {
this.virtualBufferCacheProvider = virtualBufferCacheProvider;
- this.mergePolicyProvider = mergePolicyProvider;
+ this.mergePolicyFactory = mergePolicyFactory;
this.opTrackerFactory = opTrackerFactory;
this.ioSchedulerProvider = ioSchedulerProvider;
this.ioOpCallbackFactory = ioOpCallbackFactory;
this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
+ this.mergePolicyProperties = mergePolicyProperties;
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
new file mode 100644
index 0000000..a61dec4
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+
+public class LSMIndexCompactOperatorNodePushable extends AbstractOperatorNodePushable {
+ private final IIndexDataflowHelper indexHelper;
+
+ public LSMIndexCompactOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition) {
+ this.indexHelper = opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(opDesc, ctx, partition);
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+ indexHelper.close();
+ }
+
+ @Override
+ public int getInputArity() {
+ return 0;
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ return null;
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ indexHelper.open();
+ ILSMIndex index = (ILSMIndex) indexHelper.getIndexInstance();
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ try {
+ accessor.scheduleFullMerge(NoOpIOOperationCallback.INSTANCE);
+ } catch (Exception e) {
+ indexHelper.close();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
new file mode 100644
index 0000000..3c40f94
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
+
+public class LSMTreeIndexCompactOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public LSMTreeIndexCompactOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+ IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields,
+ IIndexDataflowHelperFactory dataflowHelperFactory,
+ IModificationOperationCallbackFactory modificationOpCallbackProvider) {
+ super(spec, 0, 0, null, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, null, false,
+ NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE,
+ modificationOpCallbackProvider);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new LSMIndexCompactOperatorNodePushable(this, ctx, partition);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
index bc6baeb..ec12c23 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
@@ -85,4 +85,6 @@
protected abstract void destroy() throws HyracksDataException;
+ public abstract long getComponentSize();
+
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index b6f5657..8a6b8bd 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -15,27 +15,43 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import java.util.List;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
public class ConstantMergePolicy implements ILSMMergePolicy {
+ private int numComponents;
- private final int threshold;
-
- public ConstantMergePolicy(int threshold) {
- this.threshold = threshold;
+ @Override
+ public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
+ IndexException {
+ List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+ for (ILSMComponent c : immutableComponents) {
+ if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+ return;
+ }
+ }
+ if (fullMergeIsRequested) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFullMerge(index.getIOOperationCallback());
+ } else if (immutableComponents.size() >= numComponents) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents);
+ }
}
@Override
- public void diskComponentAdded(final ILSMIndex index) throws HyracksDataException, IndexException {
- if (index.getImmutableComponents().size() >= threshold) {
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
- }
+ public void configure(Map<String, String> properties) {
+ numComponents = Integer.parseInt(properties.get("num-components"));
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java
new file mode 100644
index 0000000..13f5ad9
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+public class ConstantMergePolicyFactory implements ILSMMergePolicyFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String[] SET_VALUES = new String[] { "num-components" };
+ private static final Set<String> PROPERTIES_NAMES = new HashSet<String>(Arrays.asList(SET_VALUES));
+
+ @Override
+ public ILSMMergePolicy createMergePolicy(Map<String, String> properties) {
+ ILSMMergePolicy policy = new ConstantMergePolicy();
+ policy.configure(properties);
+ return policy;
+ }
+
+ @Override
+ public String getName() {
+ return "constant";
+ }
+
+ @Override
+ public Set<String> getPropertiesNames() {
+ return PROPERTIES_NAMES;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
deleted file mode 100644
index a7383c1..0000000
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
-
-public class ConstantMergePolicyProvider implements ILSMMergePolicyProvider {
-
- private static final long serialVersionUID = 1L;
-
- private final int threshold;
-
- public ConstantMergePolicyProvider(int threshold) {
- this.threshold = threshold;
- }
-
- @Override
- public ILSMMergePolicy getMergePolicy(IHyracksTaskContext ctx) {
- return new ConstantMergePolicy(threshold);
- }
-
-}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index ca775b7..443ad2b 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -16,6 +16,7 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -41,11 +42,13 @@
private final ILSMIndexInternal lsmIndex;
private final ILSMMergePolicy mergePolicy;
private final ILSMOperationTracker opTracker;
+ private final AtomicBoolean fullMergeIsRequested;
public LSMHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker) {
this.lsmIndex = lsmIndex;
this.opTracker = opTracker;
this.mergePolicy = mergePolicy;
+ fullMergeIsRequested = new AtomicBoolean();
}
private boolean getAndEnterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, boolean isTryOperation)
@@ -180,13 +183,14 @@
// newComponent is null if the flush op. was not performed.
if (newComponent != null) {
lsmIndex.addComponent(newComponent);
- mergePolicy.diskComponentAdded(lsmIndex);
+ mergePolicy.diskComponentAdded(lsmIndex, false);
}
break;
case MERGE:
// newComponent is null if the merge op. was not performed.
if (newComponent != null) {
lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder());
+ mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
}
break;
default:
@@ -287,7 +291,6 @@
@Override
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException {
- // Merge should always be a try operation, because it should never fail to enter the components unless the merge policy is erroneous.
if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
callback.afterFinalize(LSMOperationType.MERGE, null);
return;
@@ -296,6 +299,20 @@
}
@Override
+ public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException {
+ fullMergeIsRequested.set(true);
+ if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
+ // If the merge cannot be scheduled because there is already an ongoing merge on subset/all of the components, then
+ // whenever the current merge has finished, it will schedule the full merge again.
+ callback.afterFinalize(LSMOperationType.MERGE, null);
+ return;
+ }
+ fullMergeIsRequested.set(false);
+ lsmIndex.scheduleMerge(ctx, callback);
+ }
+
+ @Override
public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
IndexException {
if (LOGGER.isLoggable(Level.INFO)) {
@@ -320,7 +337,7 @@
public void addBulkLoadedComponent(ILSMComponent c) throws HyracksDataException, IndexException {
lsmIndex.markAsValid(c);
lsmIndex.addComponent(c);
- mergePolicy.diskComponentAdded(lsmIndex);
+ mergePolicy.diskComponentAdded(lsmIndex, false);
}
@Override
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index 45cc69b..2bc45a9 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -42,11 +42,13 @@
protected boolean includeMutableComponent;
protected ILSMHarness lsmHarness;
protected final ILSMIndexOperationContext opCtx;
+ protected final boolean returnDeletedTuples;
protected List<ILSMComponent> operationalComponents;
- public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx) {
+ public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
this.opCtx = opCtx;
+ this.returnDeletedTuples = returnDeletedTuples;
outputElement = null;
needPush = false;
}
@@ -110,14 +112,14 @@
@Override
public void close() throws HyracksDataException {
- if (lsmHarness != null) {
- try {
- outputPriorityQueue.clear();
- for (int i = 0; i < rangeCursors.length; i++) {
- rangeCursors[i].close();
- }
- rangeCursors = null;
- } finally {
+ try {
+ outputPriorityQueue.clear();
+ for (int i = 0; i < rangeCursors.length; i++) {
+ rangeCursors[i].close();
+ }
+ rangeCursors = null;
+ } finally {
+ if (lsmHarness != null) {
lsmHarness.endSearch(opCtx);
}
}
@@ -154,7 +156,50 @@
return ((ILSMTreeTupleReference) checkElement.getTuple()).isAntimatter();
}
- abstract protected void checkPriorityQueue() throws HyracksDataException, IndexException;
+ protected void checkPriorityQueue() throws HyracksDataException, IndexException {
+ while (!outputPriorityQueue.isEmpty() || needPush == true) {
+ if (!outputPriorityQueue.isEmpty()) {
+ PriorityQueueElement checkElement = outputPriorityQueue.peek();
+ // If there is no previous tuple or the previous tuple can be ignored
+ if (outputElement == null) {
+ if (isDeleted(checkElement) && !returnDeletedTuples) {
+ // If the key has been deleted then pop it and set needPush to true.
+ // We cannot push immediately because the tuple may be
+ // modified if hasNext() is called
+ outputElement = outputPriorityQueue.poll();
+ needPush = true;
+ } else {
+ break;
+ }
+ } else {
+ // Compare the previous tuple and the head tuple in the PQ
+ if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
+ // If the previous tuple and the head tuple are
+ // identical
+ // then pop the head tuple and push the next tuple from
+ // the tree of head tuple
+
+ // the head element of PQ is useless now
+ PriorityQueueElement e = outputPriorityQueue.poll();
+ pushIntoPriorityQueue(e);
+ } else {
+ // If the previous tuple and the head tuple are different
+ // the info of previous tuple is useless
+ if (needPush == true) {
+ pushIntoPriorityQueue(outputElement);
+ needPush = false;
+ }
+ outputElement = null;
+ }
+ }
+ } else {
+ // the priority queue is empty and needPush
+ pushIntoPriorityQueue(outputElement);
+ needPush = false;
+ outputElement = null;
+ }
+ }
+ }
@Override
public boolean exclusiveLatchNodes() {
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index f11a061..c828bd2 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -15,12 +15,15 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -116,12 +119,21 @@
}
@Override
- public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+ public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
+ throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.MERGE);
+ ctx.getComponentsToBeMerged().clear();
+ ctx.getComponentsToBeMerged().addAll(components);
lsmHarness.scheduleMerge(ctx, callback);
}
@Override
+ public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.FULL_MERGE);
+ lsmHarness.scheduleFullMerge(ctx, callback);
+ }
+
+ @Override
public void forcePhysicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.PHYSICALDELETE);
lsmHarness.forceModify(ctx, tuple);
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
index 17d1b17..ca22268 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import java.util.Map;
+
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
@@ -21,8 +23,12 @@
INSTANCE;
@Override
- public void diskComponentAdded(ILSMIndex index) {
+ public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested) {
// Do nothing
}
+ @Override
+ public void configure(Map<String, String> properties) {
+ // Do nothing
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
new file mode 100644
index 0000000..fe04db6
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+
+public class PrefixMergePolicy implements ILSMMergePolicy {
+
+ private long maxMergableComponentSize;
+ private int maxTolernaceComponentCount;
+
+ @Override
+ public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
+ IndexException {
+ // 1. Look at the candidate components for merging in oldest-first order. If one exists, identify the prefix of the sequence of
+ // all such components for which the sum of their sizes exceeds MaxMrgCompSz. Schedule a merge of those components into a new component.
+ // 2. If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt. If so, schedule
+ // a merge all of the current candidates into a new single component.
+ List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+ for (ILSMComponent c : immutableComponents) {
+ if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+ return;
+ }
+ }
+ if (fullMergeIsRequested) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFullMerge(index.getIOOperationCallback());
+ return;
+ }
+ long totalSize = 0;
+ int startIndex = -1;
+ for (int i = 0; i < immutableComponents.size(); i++) {
+ ILSMComponent c = immutableComponents.get(i);
+ long componentSize = ((AbstractDiskLSMComponent) c).getComponentSize();
+ if (componentSize > maxMergableComponentSize) {
+ startIndex = i;
+ totalSize = 0;
+ continue;
+ }
+ totalSize += componentSize;
+ boolean isLastComponent = i + 1 == immutableComponents.size() ? true : false;
+ if (totalSize > maxMergableComponentSize
+ || (isLastComponent && i - startIndex >= maxTolernaceComponentCount)) {
+ List<ILSMComponent> mergableCopments = new ArrayList<ILSMComponent>();
+ for (int j = startIndex + 1; j <= i; j++) {
+ mergableCopments.add(immutableComponents.get(j));
+ }
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleMerge(index.getIOOperationCallback(), mergableCopments);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void configure(Map<String, String> properties) {
+ maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size"));
+ maxTolernaceComponentCount = Integer.parseInt(properties.get("max-tolernace-component-count"));
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java
new file mode 100644
index 0000000..981ec6c
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+public class PrefixMergePolicyFactory implements ILSMMergePolicyFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String[] SET_VALUES = new String[] { "max-mergable-component-size",
+ "max-tolernace-component-count" };
+ private static final Set<String> PROPERTIES_NAMES = new HashSet<String>(Arrays.asList(SET_VALUES));
+
+ @Override
+ public ILSMMergePolicy createMergePolicy(Map<String, String> properties) {
+ ILSMMergePolicy policy = new PrefixMergePolicy();
+ policy.configure(properties);
+ return policy;
+ }
+
+ @Override
+ public String getName() {
+ return "prefix";
+ }
+
+ @Override
+ public Set<String> getPropertiesNames() {
+ return PROPERTIES_NAMES;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexCompactOperator.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexCompactOperator.java
new file mode 100644
index 0000000..22e7505
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexCompactOperator.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMIndexCompactOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
+
+public class LSMInvertedIndexCompactOperator extends AbstractLSMInvertedIndexOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public LSMInvertedIndexCompactOperator(IOperatorDescriptorRegistry spec, IStorageManagerInterface storageManager,
+ IFileSplitProvider fileSplitProvider, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+ ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenComparatorFactories,
+ ITypeTraits[] invListsTypeTraits, IBinaryComparatorFactory[] invListComparatorFactories,
+ IBinaryTokenizerFactory tokenizerFactory, IIndexDataflowHelperFactory dataflowHelperFactory,
+ IModificationOperationCallbackFactory modificationOpCallbackFactory) {
+ super(spec, 1, 1, null, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
+ tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
+ dataflowHelperFactory, null, false, NoOpLocalResourceFactoryProvider.INSTANCE,
+ NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackFactory);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new LSMIndexCompactOperatorNodePushable(this, ctx, partition);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java
index 84c7150..c018f16 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java
@@ -15,12 +15,14 @@
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -30,11 +32,11 @@
private static final long serialVersionUID = 1L;
public LSMInvertedIndexDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
- ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerProvider,
- ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
- double bloomFilterFalsePositiveRate) {
- super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerProvider, ioSchedulerProvider,
- ioOpCallbackFactory, bloomFilterFalsePositiveRate);
+ ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+ ILSMOperationTrackerProvider opTrackerProvider, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory, double bloomFilterFalsePositiveRate) {
+ super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerProvider,
+ ioSchedulerProvider, ioOpCallbackFactory, bloomFilterFalsePositiveRate);
}
@Override
@@ -42,8 +44,8 @@
int partition) {
return new LSMInvertedIndexDataflowHelper(opDesc, ctx, partition,
virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
- mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
- ioOpCallbackFactory);
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+ ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory);
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
index d78ae7e..aef0863 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
@@ -15,12 +15,14 @@
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -30,11 +32,11 @@
private static final long serialVersionUID = 1L;
public PartitionedLSMInvertedIndexDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
- ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerProvider,
- ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
- double bloomFilterFalsePositiveRate) {
- super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerProvider, ioSchedulerProvider,
- ioOpCallbackFactory, bloomFilterFalsePositiveRate);
+ ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+ ILSMOperationTrackerProvider opTrackerProvider, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory, double bloomFilterFalsePositiveRate) {
+ super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerProvider,
+ ioSchedulerProvider, ioOpCallbackFactory, bloomFilterFalsePositiveRate);
}
@Override
@@ -42,7 +44,7 @@
int partition) {
return new PartitionedLSMInvertedIndexDataflowHelper(opDesc, ctx, partition,
virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
- mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
- ioOpCallbackFactory);
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+ ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory);
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index cef6aee..0600754 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -267,10 +267,10 @@
public void getOperationalComponents(ILSMIndexOperationContext ctx) {
List<ILSMComponent> immutableComponents = diskComponents;
List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
- operationalComponents.clear();
int cmc = currentMutableComponentId.get();
ctx.setCurrentMutableComponentId(cmc);
int numMutableComponents = memoryComponents.size();
+ operationalComponents.clear();
switch (ctx.getOperation()) {
case FLUSH:
case DELETE:
@@ -291,8 +291,10 @@
operationalComponents.addAll(immutableComponents);
break;
case MERGE:
- operationalComponents.addAll(immutableComponents);
+ operationalComponents.addAll(ctx.getComponentsToBeMerged());
break;
+ case FULL_MERGE:
+ operationalComponents.addAll(immutableComponents);
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
}
@@ -561,6 +563,31 @@
mergeOp.getBloomFilterMergeTarget(), true);
IInvertedIndex mergedDiskInvertedIndex = component.getInvIndex();
+
+ // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the inverted indexes so that
+ // lsmHarness.endSearch() is called once when the inverted indexes have been merged.
+ if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents
+ .get(diskComponents.size() - 1)) {
+ // Keep the deleted tuples since the oldest disk component is not included in the merge operation
+
+ LSMInvertedIndexDeletedKeysBTreeMergeCursor btreeCursor = new LSMInvertedIndexDeletedKeysBTreeMergeCursor(
+ opCtx);
+ search(opCtx, btreeCursor, mergePred);
+
+ BTree btree = component.getDeletedKeysBTree();
+ IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
+ try {
+ while (btreeCursor.hasNext()) {
+ btreeCursor.next();
+ ITupleReference tuple = btreeCursor.getTuple();
+ btreeBulkLoader.add(tuple);
+ }
+ } finally {
+ btreeCursor.close();
+ }
+ btreeBulkLoader.end();
+ }
+
IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true, 0L, false);
try {
while (cursor.hasNext()) {
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index e31af9a..7e34dfe 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -15,12 +15,15 @@
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -84,12 +87,21 @@
}
@Override
- public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+ public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
+ throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.MERGE);
+ ctx.getComponentsToBeMerged().clear();
+ ctx.getComponentsToBeMerged().addAll(components);
lsmHarness.scheduleMerge(ctx, callback);
}
@Override
+ public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.FULL_MERGE);
+ lsmHarness.scheduleFullMerge(ctx, callback);
+ }
+
+ @Override
public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException {
lsmHarness.merge(ctx, operation);
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
new file mode 100644
index 0000000..3efaaba
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
+
+public class LSMInvertedIndexDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
+
+ public LSMInvertedIndexDeletedKeysBTreeMergeCursor(ILSMIndexOperationContext opCtx) {
+ super(opCtx, true);
+ }
+
+ @Override
+ protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException, IndexException {
+ return false;
+ }
+
+ @Override
+ public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException,
+ IndexException {
+ LSMInvertedIndexRangeSearchCursorInitialState lsmInitialState = (LSMInvertedIndexRangeSearchCursorInitialState) initialState;
+ cmp = lsmInitialState.getOriginalKeyComparator();
+ operationalComponents = lsmInitialState.getOperationalComponents();
+ // We intentionally set the lsmHarness to null so that we don't call lsmHarness.endSearch() because we already do that when we merge the inverted indexes.
+ lsmHarness = null;
+ int numBTrees = operationalComponents.size();
+ rangeCursors = new IIndexCursor[numBTrees];
+
+ MultiComparator keyCmp = lsmInitialState.getKeyComparator();
+ RangePredicate btreePredicate = new RangePredicate(null, null, true, true, keyCmp, keyCmp);
+ ArrayList<IIndexAccessor> btreeAccessors = lsmInitialState.getDeletedKeysBTreeAccessors();
+ for (int i = 0; i < numBTrees; i++) {
+ rangeCursors[i] = btreeAccessors.get(i).createSearchCursor();
+ btreeAccessors.get(i).search(rangeCursors[i], btreePredicate);
+ }
+ setPriorityQueueComparator();
+ initPriorityQueue();
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
index 323edd1..446e807 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
@@ -19,6 +19,7 @@
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractDiskLSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
public class LSMInvertedIndexDiskComponent extends AbstractDiskLSMComponent {
@@ -53,4 +54,12 @@
public BloomFilter getBloomFilter() {
return bloomFilter;
}
+
+ @Override
+ public long getComponentSize() {
+ return ((OnDiskInvertedIndex) invIndex).getInvListsFile().getFile().length()
+ + ((OnDiskInvertedIndex) invIndex).getBTree().getFileReference().getFile().length()
+ + deletedKeysBTree.getFileReference().getFile().length()
+ + bloomFilter.getFileReference().getFile().length();
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
index 1a9303f..671e3f8 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
@@ -35,7 +35,8 @@
private IndexOperation op;
private final List<ILSMComponent> componentHolder;
-
+ private final List<ILSMComponent> componentsToBeMerged;
+
public final IModificationOperationCallback modificationCallback;
public final ISearchOperationCallback searchCallback;
@@ -54,6 +55,7 @@
IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback)
throws HyracksDataException {
this.componentHolder = new LinkedList<ILSMComponent>();
+ this.componentsToBeMerged = new LinkedList<ILSMComponent>();
this.modificationCallback = modificationCallback;
this.searchCallback = searchCallback;
@@ -84,6 +86,7 @@
@Override
public void reset() {
componentHolder.clear();
+ componentsToBeMerged.clear();
}
@Override
@@ -118,4 +121,9 @@
currentMutableInvIndexAccessors = mutableInvIndexAccessors[currentMutableComponentId];
currentDeletedKeysBTreeAccessors = deletedKeysBTreeAccessors[currentMutableComponentId];
}
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeMerged() {
+ return componentsToBeMerged;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
index cd0dde3..a6ff07e 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
@@ -28,8 +28,8 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingTupleReference;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BloomFilterAwareBTreePointSearchCursor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
@@ -43,7 +43,7 @@
protected RangePredicate keySearchPred;
public LSMInvertedIndexRangeSearchCursor(ILSMIndexOperationContext opCtx) {
- super(opCtx);
+ super(opCtx, false);
}
@Override
@@ -76,12 +76,12 @@
for (int i = 0; i < operationalComponents.size(); i++) {
ILSMComponent component = operationalComponents.get(i);
if (component.getType() == LSMComponentType.MEMORY) {
- // No need for a bloom filter for the in-memory BTree.
+ // No need for a bloom filter for the in-memory BTree.
deletedKeysBTreeCursors[i] = deletedKeysBTreeAccessors.get(i).createSearchCursor();
} else {
- deletedKeysBTreeCursors[i] = new BloomFilterAwareBTreePointSearchCursor((IBTreeLeafFrame) lsmInitState
- .getgetDeletedKeysBTreeLeafFrameFactory().createFrame(), false,
- ((LSMInvertedIndexDiskComponent) operationalComponents.get(i)).getBloomFilter());
+ deletedKeysBTreeCursors[i] = new BloomFilterAwareBTreePointSearchCursor(
+ (IBTreeLeafFrame) lsmInitState.getgetDeletedKeysBTreeLeafFrameFactory().createFrame(),
+ false, ((LSMInvertedIndexDiskComponent) operationalComponents.get(i)).getBloomFilter());
}
}
}
@@ -114,50 +114,4 @@
}
return false;
}
-
- @Override
- protected void checkPriorityQueue() throws HyracksDataException, IndexException {
- while (!outputPriorityQueue.isEmpty() || needPush == true) {
- if (!outputPriorityQueue.isEmpty()) {
- PriorityQueueElement checkElement = outputPriorityQueue.peek();
- // If there is no previous tuple or the previous tuple can be ignored
- if (outputElement == null) {
- if (isDeleted(checkElement)) {
- // If the key has been deleted then pop it and set needPush to true.
- // We cannot push immediately because the tuple may be
- // modified if hasNext() is called
- outputElement = outputPriorityQueue.poll();
- needPush = true;
- } else {
- break;
- }
- } else {
- // Compare the previous tuple and the head tuple in the PQ
- if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
- // If the previous tuple and the head tuple are
- // identical
- // then pop the head tuple and push the next tuple from
- // the tree of head tuple
-
- // the head element of PQ is useless now
- PriorityQueueElement e = outputPriorityQueue.poll();
- pushIntoPriorityQueue(e);
- } else {
- // If the previous tuple and the head tuple are different
- // the info of previous tuple is useless
- if (needPush == true) {
- pushIntoPriorityQueue(outputElement);
- needPush = false;
- }
- outputElement = null;
- }
- }
- } else {
- // the priority queue is empty and needPush
- pushIntoPriorityQueue(outputElement);
- needPush = false;
- outputElement = null;
- }
- }
- }
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 0ab2f4d..67d1796 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -462,6 +462,10 @@
public BTree getBTree() {
return btree;
}
+
+ public FileReference getInvListsFile() {
+ return invListsFile;
+ }
public class OnDiskInvertedIndexAccessor implements IInvertedIndexAccessor {
private final OnDiskInvertedIndex index;
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
index 8254689..7b54f19 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -23,7 +25,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -40,12 +42,12 @@
public LSMRTreeDataflowHelperFactory(IPrimitiveValueProviderFactory[] valueProviderFactories,
RTreePolicyType rtreePolicyType, IBinaryComparatorFactory[] btreeComparatorFactories,
- IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyProvider mergePolicyProvider,
- ILSMOperationTrackerProvider opTrackerFactory, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
- ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILinearizeComparatorFactory linearizeCmpFactory,
- double bloomFilterFalsePositiveRate) {
- super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerFactory, ioSchedulerProvider,
- ioOpCallbackFactory, bloomFilterFalsePositiveRate);
+ IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyFactory mergePolicyFactory,
+ Map<String, String> mergePolicyProperties, ILSMOperationTrackerProvider opTrackerFactory,
+ ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
+ ILinearizeComparatorFactory linearizeCmpFactory, double bloomFilterFalsePositiveRate) {
+ super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerFactory,
+ ioSchedulerProvider, ioOpCallbackFactory, bloomFilterFalsePositiveRate);
this.btreeComparatorFactories = btreeComparatorFactories;
this.valueProviderFactories = valueProviderFactories;
this.rtreePolicyType = rtreePolicyType;
@@ -58,7 +60,7 @@
return new LSMRTreeDataflowHelper(opDesc, ctx, partition,
virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
btreeComparatorFactories, valueProviderFactories, rtreePolicyType,
- mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
- ioOpCallbackFactory, linearizeCmpFactory);
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+ ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory, linearizeCmpFactory);
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
index ef34876..2e1cfaa 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
@@ -15,47 +15,42 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
-public class LSMRTreeWithAntiMatterTuplesDataflowHelperFactory implements IIndexDataflowHelperFactory {
+public class LSMRTreeWithAntiMatterTuplesDataflowHelperFactory extends AbstractLSMIndexDataflowHelperFactory {
private static final long serialVersionUID = 1L;
- private final IVirtualBufferCacheProvider virtualBufferCacheProvider;
private final IBinaryComparatorFactory[] btreeComparatorFactories;
private final IPrimitiveValueProviderFactory[] valueProviderFactories;
private final RTreePolicyType rtreePolicyType;
- private final ILSMMergePolicyProvider mergePolicyProvider;
- private final ILSMOperationTrackerProvider opTrackerProvider;
- private final ILSMIOOperationSchedulerProvider ioSchedulerProvider;
- private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
private final ILinearizeComparatorFactory linearizeCmpFactory;
public LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(IPrimitiveValueProviderFactory[] valueProviderFactories,
RTreePolicyType rtreePolicyType, IBinaryComparatorFactory[] btreeComparatorFactories,
- IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyProvider mergePolicyProvider,
- ILSMOperationTrackerProvider opTrackerProvider, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
- ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILinearizeComparatorFactory linearizeCmpFactory) {
- this.virtualBufferCacheProvider = virtualBufferCacheProvider;
+ IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyFactory mergePolicyFactory,
+ Map<String, String> mergePolicyProperties, ILSMOperationTrackerProvider opTrackerFactory,
+ ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
+ ILinearizeComparatorFactory linearizeCmpFactory) {
+ super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerFactory, ioSchedulerProvider,
+ ioOpCallbackFactory, 1.0);
this.btreeComparatorFactories = btreeComparatorFactories;
this.valueProviderFactories = valueProviderFactories;
this.rtreePolicyType = rtreePolicyType;
- this.mergePolicyProvider = mergePolicyProvider;
- this.ioSchedulerProvider = ioSchedulerProvider;
- this.opTrackerProvider = opTrackerProvider;
- this.ioOpCallbackFactory = ioOpCallbackFactory;
this.linearizeCmpFactory = linearizeCmpFactory;
}
@@ -64,7 +59,7 @@
int partition) {
return new LSMRTreeWithAntiMatterTuplesDataflowHelper(opDesc, ctx, partition,
virtualBufferCacheProvider.getVirtualBufferCaches(ctx), btreeComparatorFactories,
- valueProviderFactories, rtreePolicyType, mergePolicyProvider.getMergePolicy(ctx), opTrackerProvider,
- ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory, linearizeCmpFactory);
+ valueProviderFactories, rtreePolicyType, mergePolicyFactory.createMergePolicy(mergePolicyProperties),
+ opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory, linearizeCmpFactory);
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index b09e115..673c7ae 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -24,18 +24,15 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -200,11 +197,11 @@
@Override
public void getOperationalComponents(ILSMIndexOperationContext ctx) {
List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
- operationalComponents.clear();
List<ILSMComponent> immutableComponents = diskComponents;
int cmc = currentMutableComponentId.get();
ctx.setCurrentMutableComponentId(cmc);
int numMutableComponents = memoryComponents.size();
+ operationalComponents.clear();
switch (ctx.getOperation()) {
case INSERT:
case DELETE:
@@ -225,8 +222,10 @@
operationalComponents.addAll(immutableComponents);
break;
case MERGE:
- operationalComponents.addAll(immutableComponents);
+ operationalComponents.addAll(ctx.getComponentsToBeMerged());
break;
+ case FULL_MERGE:
+ operationalComponents.addAll(immutableComponents);
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
}
@@ -332,38 +331,11 @@
ctx.modificationCallback.before(tuple);
ctx.modificationCallback.found(null, tuple);
if (ctx.getOperation() == IndexOperation.INSERT) {
- // Before each insert, we must check whether there exist a killer
- // tuple in the memBTree. If we find a killer tuple, we must truly
- // delete the existing tuple from the BTree, and then insert it to
- // memRTree. Otherwise, the old killer tuple will kill the newly
- // added RTree tuple.
- RangePredicate btreeRangePredicate = new RangePredicate(tuple, tuple, true, true,
- ctx.getBTreeMultiComparator(), ctx.getBTreeMultiComparator());
- ITreeIndexCursor cursor = ctx.currentMutableBTreeAccessor.createSearchCursor();
- ctx.currentMutableBTreeAccessor.search(cursor, btreeRangePredicate);
- boolean foundTupleInMemoryBTree = false;
- try {
- if (cursor.hasNext()) {
- foundTupleInMemoryBTree = true;
- }
- } finally {
- cursor.close();
- }
- if (foundTupleInMemoryBTree) {
- try {
- ctx.currentMutableBTreeAccessor.delete(tuple);
- } catch (TreeIndexNonExistentKeyException e) {
- // Tuple has been deleted in the meantime. Do nothing.
- // This normally shouldn't happen if we are dealing with
- // good citizens since LSMRTree is used as a secondary
- // index and a tuple shouldn't be deleted twice without
- // insert between them.
- }
- } else {
- ctx.currentMutableRTreeAccessor.insert(tuple);
- }
-
+ ctx.currentMutableRTreeAccessor.insert(tuple);
} else {
+ // First remove all entries in the in-memory rtree (if any).
+ ctx.currentMutableRTreeAccessor.delete(tuple);
+ // Insert key into the deleted-keys BTree.
try {
ctx.currentMutableBTreeAccessor.insert(tuple);
} catch (TreeIndexDuplicateKeyException e) {
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index a4b6139..662aa02 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -309,8 +309,31 @@
LSMRTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getRTreeMergeTarget(),
mergeOp.getBTreeMergeTarget(), mergeOp.getBloomFilterMergeTarget(), true);
- IIndexBulkLoader bulkLoader = mergedComponent.getRTree().createBulkLoader(1.0f, false, 0L, false);
+ // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the r-trees so that
+ // lsmHarness.endSearch() is called once when the r-trees have been merged.
+ if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents
+ .get(diskComponents.size() - 1)) {
+ // Keep the deleted tuples since the oldest disk component is not included in the merge operation
+
+ LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor = new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx);
+ search(opCtx, btreeCursor, rtreeSearchPred);
+
+ BTree btree = mergedComponent.getBTree();
+ IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
+ try {
+ while (btreeCursor.hasNext()) {
+ btreeCursor.next();
+ ITupleReference tuple = btreeCursor.getTuple();
+ btreeBulkLoader.add(tuple);
+ }
+ } finally {
+ btreeCursor.close();
+ }
+ btreeBulkLoader.end();
+ }
+
+ IIndexBulkLoader bulkLoader = mergedComponent.getRTree().createBulkLoader(1.0f, false, 0L, false);
try {
while (cursor.hasNext()) {
cursor.next();
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
new file mode 100644
index 0000000..fe2ed96
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
+
+public class LSMRTreeDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
+
+ public LSMRTreeDeletedKeysBTreeMergeCursor(ILSMIndexOperationContext opCtx) {
+ super(opCtx, true);
+ }
+
+ @Override
+ protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException, IndexException {
+ return false;
+ }
+
+ @Override
+ public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException,
+ IndexException {
+ LSMRTreeCursorInitialState lsmInitialState = (LSMRTreeCursorInitialState) initialState;
+ cmp = lsmInitialState.getBTreeCmp();
+ operationalComponents = lsmInitialState.getOperationalComponents();
+ // We intentionally set the lsmHarness to null so that we don't call lsmHarness.endSearch() because we already do that when we merge r-trees.
+ lsmHarness = null;
+ int numBTrees = operationalComponents.size();
+ rangeCursors = new IIndexCursor[numBTrees];
+
+ RangePredicate btreePredicate = new RangePredicate(null, null, true, true, cmp, cmp);
+ IIndexAccessor[] btreeAccessors = new ITreeIndexAccessor[numBTrees];
+ for (int i = 0; i < numBTrees; i++) {
+ ILSMComponent component = operationalComponents.get(i);
+ IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) lsmInitialState.getBTreeLeafFrameFactory().createFrame();
+ rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
+ BTree btree = (BTree) ((LSMRTreeDiskComponent) component).getBTree();
+ btreeAccessors[i] = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ btreeAccessors[i].search(rangeCursors[i], btreePredicate);
+ }
+ setPriorityQueueComparator();
+ initPriorityQueue();
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
index 7bc3f79..c3c0a55 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
@@ -54,4 +54,14 @@
public BloomFilter getBloomFilter() {
return bloomFilter;
}
+
+ @Override
+ public long getComponentSize() {
+ long size = rtree.getFileReference().getFile().length();
+ if (btree != null) {
+ size += btree.getFileReference().getFile().length();
+ size += bloomFilter.getFileReference().getFile().length();
+ }
+ return size;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
index b94feba..132e55b 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
@@ -48,6 +48,7 @@
private IndexOperation op;
public final List<ILSMComponent> componentHolder;
+ private final List<ILSMComponent> componentsToBeMerged;
public final IModificationOperationCallback modificationCallback;
public final ISearchOperationCallback searchCallback;
@@ -76,6 +77,7 @@
currentRTreeOpContext = rtreeOpContexts[0];
currentBTreeOpContext = btreeOpContexts[0];
this.componentHolder = new LinkedList<ILSMComponent>();
+ this.componentsToBeMerged = new LinkedList<ILSMComponent>();
this.modificationCallback = modificationCallback;
this.searchCallback = searchCallback;
}
@@ -101,6 +103,7 @@
@Override
public void reset() {
componentHolder.clear();
+ componentsToBeMerged.clear();
}
@Override
@@ -126,4 +129,9 @@
public IModificationOperationCallback getModificationCallback() {
return modificationCallback;
}
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeMerged() {
+ return componentsToBeMerged;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
index 30dd467..f669585 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
@@ -79,7 +79,7 @@
ITupleReference currentTuple = rtreeCursors[currentCursor].getTuple();
boolean killerTupleFound = false;
- for (int i = 0; i <= currentCursor; i++) {
+ for (int i = 0; i < currentCursor; i++) {
btreeCursors[i].reset();
btreeRangePredicate.setHighKey(currentTuple, true);
btreeRangePredicate.setLowKey(currentTuple, true);
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
index dd31165..2e6fc78 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
@@ -25,6 +25,8 @@
public class LSMRTreeSortedCursor extends LSMRTreeAbstractCursor {
+ // TODO: This class can be removed and instead use a search cursor that uses a logic similar
+ // to the one in LSMRTreeWithAntiMatterTuplesSearchCursor
private ILinearizeComparator linearizeCmp;
private boolean[] depletedRtreeCursors;
private int foundIn = -1;
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 812e942..ec66a39a 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -40,7 +40,6 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
@@ -76,9 +75,9 @@
ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
ILSMIOOperationCallback ioOpCallback) {
super(virtualBufferCaches, rtreeInteriorFrameFactory, rtreeLeafFrameFactory, btreeInteriorFrameFactory,
- btreeLeafFrameFactory, fileManager, new LSMRTreeWithAntiMatterTuplesDiskComponentFactory(diskRTreeFactory),
- diskFileMapProvider, fieldCount, rtreeCmpFactories, btreeCmpFactories, linearizer, comparatorFields,
- linearizerArray, 0, mergePolicy, opTracker, ioScheduler, ioOpCallback);
+ btreeLeafFrameFactory, fileManager, new LSMRTreeWithAntiMatterTuplesDiskComponentFactory(
+ diskRTreeFactory), diskFileMapProvider, fieldCount, rtreeCmpFactories, btreeCmpFactories,
+ linearizer, comparatorFields, linearizerArray, 0, mergePolicy, opTracker, ioScheduler, ioOpCallback);
bulkLoaComponentFactory = new LSMRTreeWithAntiMatterTuplesDiskComponentFactory(bulkLoadRTreeFactory);
this.bTreeTupleSorter = null;
}
@@ -248,7 +247,12 @@
LSMRTreeOpContext rctx = createOpContext(NoOpOperationCallback.INSTANCE);
rctx.setOperation(IndexOperation.MERGE);
List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
- ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(rctx);
+ boolean returnDeletedTuples = false;
+ if (ctx.getComponentHolder().get(ctx.getComponentHolder().size() - 1) != diskComponents.get(diskComponents
+ .size() - 1)) {
+ returnDeletedTuples = true;
+ }
+ ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(rctx, returnDeletedTuples);
LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents);
ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, rctx);
ioScheduler.scheduleOperation(new LSMRTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
index cbaf3b3..7099d7d 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
@@ -54,7 +54,11 @@
private int numMutableComponents;
public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx) {
- super(opCtx);
+ this(opCtx, false);
+ }
+
+ public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
+ super(opCtx, returnDeletedTuples);
currentCursor = 0;
}
@@ -152,7 +156,7 @@
while (super.hasNext()) {
super.next();
ITupleReference diskRTreeTuple = super.getTuple();
- if (searchMemBTrees(diskRTreeTuple, numMutableComponents - 1)) {
+ if (searchMemBTrees(diskRTreeTuple, numMutableComponents)) {
foundNext = true;
frameTuple = diskRTreeTuple;
return true;
@@ -216,7 +220,7 @@
private boolean searchMemBTrees(ITupleReference tuple, int lastBTreeToSearch) throws HyracksDataException,
IndexException {
- for (int i = 0; i <= lastBTreeToSearch; i++) {
+ for (int i = 0; i < lastBTreeToSearch; i++) {
btreeCursors[i].reset();
btreeRangePredicate.setHighKey(tuple, true);
btreeRangePredicate.setLowKey(tuple, true);
@@ -261,50 +265,4 @@
}
}
}
-
- @Override
- protected void checkPriorityQueue() throws HyracksDataException, IndexException {
- while (!outputPriorityQueue.isEmpty() || needPush == true) {
- if (!outputPriorityQueue.isEmpty()) {
- PriorityQueueElement checkElement = outputPriorityQueue.peek();
- // If there is no previous tuple or the previous tuple can be ignored
- if (outputElement == null) {
- if (isDeleted(checkElement)) {
- // If the key has been deleted then pop it and set needPush to true.
- // We cannot push immediately because the tuple may be
- // modified if hasNext() is called
- outputElement = outputPriorityQueue.poll();
- needPush = true;
- } else {
- break;
- }
- } else {
- // Compare the previous tuple and the head tuple in the PQ
- if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
- // If the previous tuple and the head tuple are
- // identical
- // then pop the head tuple and push the next tuple from
- // the tree of head tuple
-
- // the head element of PQ is useless now
- PriorityQueueElement e = outputPriorityQueue.poll();
- pushIntoPriorityQueue(e);
- } else {
- // If the previous tuple and the head tuple are different
- // the info of previous tuple is useless
- if (needPush == true) {
- pushIntoPriorityQueue(outputElement);
- needPush = false;
- }
- outputElement = null;
- }
- }
- } else {
- // the priority queue is empty and needPush
- pushIntoPriorityQueue(outputElement);
- needPush = false;
- outputElement = null;
- }
- }
- }
}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 8090564..2992dfe 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -119,7 +119,8 @@
@Override
public ICachedPage tryPin(long dpid) throws HyracksDataException {
- pinSanityCheck(dpid);
+ // Calling the pinSanityCheck should be used only for debugging, since the synchronized block over the fileInfoMap is a hot spot.
+ //pinSanityCheck(dpid);
CachedPage cPage = null;
int hash = hash(dpid);
CacheBucket bucket = pageMap[hash];
@@ -142,7 +143,8 @@
@Override
public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
- pinSanityCheck(dpid);
+ // Calling the pinSanityCheck should be used only for debugging, since the synchronized block over the fileInfoMap is a hot spot.
+ //pinSanityCheck(dpid);
CachedPage cPage = findPage(dpid, newPage);
if (!newPage) {
// Resolve race of multiple threads trying to read the page from
@@ -760,8 +762,8 @@
BufferedFileHandle fInfo = null;
synchronized (fileInfoMap) {
fInfo = fileInfoMap.get(fileId);
- ioManager.sync(fInfo.getFileHandle(), metadata);
}
+ ioManager.sync(fInfo.getFileHandle(), metadata);
}
@Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
index f7aa7f4..9729d51 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.storage.am.btree.OrderedIndexTestUtils;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
@@ -70,7 +71,8 @@
}
ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+ ((LSMBTree) ctx.getIndex()).getImmutableComponents());
orderedIndexTestUtils.checkPointSearches(ctx);
orderedIndexTestUtils.checkScan(ctx);
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
index 4a96131..35ecc20 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
@@ -106,7 +106,7 @@
break;
case MERGE:
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, lsmBTree.getImmutableComponents());
break;
default:
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
index 5115b7e..3713498 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
@@ -25,6 +25,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndex;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestUtils;
@@ -54,7 +55,8 @@
invIndex.activate();
}
// Perform merge.
- invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+ ((LSMInvertedIndex) invIndex).getImmutableComponents());
validateAndCheckIndex(testCtx);
runTinySearchWorkload(testCtx, tupleGen);
}
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
index 523557d..53076fa 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
@@ -25,6 +25,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndex;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestUtils;
@@ -54,7 +55,8 @@
invIndex.activate();
}
// Perform merge.
- invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+ ((LSMInvertedIndex) invIndex).getImmutableComponents());
validateAndCheckIndex(testCtx);
runTinySearchWorkload(testCtx, tupleGen);
}
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
index e1570af..35afd09 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
@@ -109,7 +109,7 @@
}
case MERGE: {
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, invIndex.getImmutableComponents());
break;
}
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
index 18528c4..a410b2a 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.AbstractLSMRTree;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestContext;
import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestDriver;
import edu.uci.ics.hyracks.storage.am.rtree.RTreeTestUtils;
@@ -72,7 +73,8 @@
}
ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+ ((AbstractLSMRTree) ctx.getIndex()).getImmutableComponents());
rTreeTestUtils.checkScan(ctx);
rTreeTestUtils.checkDiskOrderScan(ctx);
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
index b3fddc8..c7a86d9 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
@@ -73,7 +73,7 @@
break;
case MERGE:
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, lsmRTree.getImmutableComponents());
break;
default:
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
index 12d7742..383cbc4 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
@@ -25,6 +25,7 @@
import edu.uci.ics.hyracks.storage.am.common.datagen.DataGenThread;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.AbstractLSMRTree;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeWithAntiMatterTuples.LSMRTreeWithAntiMatterTuplesAccessor;
import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
@@ -61,7 +62,8 @@
break;
case MERGE:
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+ ((AbstractLSMRTree) lsmRTree).getImmutableComponents());
break;
default:
diff --git a/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java b/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
index 7e71aea..c67386e 100644
--- a/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
@@ -96,8 +96,9 @@
}
bufferCache.closeFile(fileId);
-
- boolean exceptionThrown = false;
+
+ // This code is commented because the method pinSanityCheck in the BufferCache is commented.
+ /*boolean exceptionThrown = false;
// tryPin should fail since file is not open
try {
@@ -114,7 +115,7 @@
} catch (HyracksDataException e) {
exceptionThrown = true;
}
- Assert.assertTrue(exceptionThrown);
+ Assert.assertTrue(exceptionThrown);*/
// open file again
bufferCache.openFile(fileId);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 4863378..a577bdb 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -25,7 +25,9 @@
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.logging.Logger;
@@ -74,7 +76,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
@@ -134,6 +136,12 @@
protected int frameSize = ClusterConfig.getFrameSize();
protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+ private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+ static {
+ MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+ MERGE_POLICY_PROPERTIES.put("num-components", "3");
+ }
+
protected static final String SECONDARY_INDEX_ODD = "secondary1";
protected static final String SECONDARY_INDEX_EVEN = "secondary2";
@@ -480,9 +488,9 @@
protected IIndexDataflowHelperFactory getIndexDataflowHelperFactory() {
if (BspUtils.useLSM(conf)) {
- return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(), new ConstantMergePolicyProvider(
- 3), NoOpOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
- NoOpIOOperationCallback.INSTANCE, 0.01);
+ return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(),
+ new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES, NoOpOperationTrackerProvider.INSTANCE,
+ SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE, 0.01);
} else {
return new BTreeDataflowHelperFactory();
}
@@ -739,8 +747,7 @@
* connect operator descriptors
*/
ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
- sort, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sort, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);