Allowed user to specify the merging policy
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
index 1bd22d7..5ac2961 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
@@ -19,6 +19,8 @@
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
import org.junit.Before;
import org.junit.Test;
@@ -83,7 +85,11 @@
TestStorageManagerComponentHolder.init(8192, 20, 20);
}
- protected static final int MERGE_THRESHOLD = 3;
+ protected static final Map<String, String> MERGE_POLICY_PROPERTIES;
+ static {
+ MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+ MERGE_POLICY_PROPERTIES.put("num-components", "3");
+ }
protected IVirtualBufferCacheProvider virtualBufferCacheProvider = new TestVirtualBufferCacheProvider(
DEFAULT_MEM_PAGE_SIZE, DEFAULT_MEM_NUM_PAGES);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
index fc932a3..8addae7 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
@@ -26,7 +26,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -52,7 +52,7 @@
PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
invertedIndexDataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
- virtualBufferCacheProvider, new ConstantMergePolicyProvider(MERGE_THRESHOLD),
+ virtualBufferCacheProvider, new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES,
ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
NoOpIOOperationCallback.INSTANCE, DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
index 75045b4..82de0c9 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
@@ -24,7 +24,7 @@
import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -47,9 +47,9 @@
PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
invertedIndexDataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(virtualBufferCacheProvider,
- new ConstantMergePolicyProvider(MERGE_THRESHOLD), ThreadCountingOperationTrackerProvider.INSTANCE,
- SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE,
- DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
+ new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES,
+ ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
+ NoOpIOOperationCallback.INSTANCE, DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
}
@Override
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
index e98ecde..a4fd4c1 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
@@ -15,10 +15,13 @@
package edu.uci.ics.hyracks.tests.am.lsm.btree;
+import java.util.HashMap;
+import java.util.Map;
+
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -26,15 +29,19 @@
public class LSMBTreeOperatorTestHelper extends LSMTreeOperatorTestHelper {
- private static final int MERGE_THRESHOLD = 3;
-
+ private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+ static {
+ MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+ MERGE_POLICY_PROPERTIES.put("num-components", "3");
+ }
+
public LSMBTreeOperatorTestHelper(IOManager ioManager) {
super(ioManager);
}
public IIndexDataflowHelperFactory createDataFlowHelperFactory() {
- return new LSMBTreeDataflowHelperFactory(virtualBufferCacheProvider, new ConstantMergePolicyProvider(
- MERGE_THRESHOLD), ThreadCountingOperationTrackerProvider.INSTANCE,
+ return new LSMBTreeDataflowHelperFactory(virtualBufferCacheProvider, new ConstantMergePolicyFactory(),
+ MERGE_POLICY_PROPERTIES, ThreadCountingOperationTrackerProvider.INSTANCE,
SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE,
DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
index e7478d6..79b8eb7 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
@@ -15,12 +15,15 @@
package edu.uci.ics.hyracks.tests.am.lsm.rtree;
+import java.util.HashMap;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -30,7 +33,11 @@
public class LSMRTreeOperatorTestHelper extends LSMTreeOperatorTestHelper {
- private static final int MERGE_THRESHOLD = 3;
+ private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+ static {
+ MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+ MERGE_POLICY_PROPERTIES.put("num-components", "3");
+ }
public LSMRTreeOperatorTestHelper(IOManager ioManager) {
super(ioManager);
@@ -40,7 +47,7 @@
IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
IBinaryComparatorFactory[] btreeComparatorFactories, ILinearizeComparatorFactory linearizerCmpFactory) {
return new LSMRTreeDataflowHelperFactory(valueProviderFactories, rtreePolicyType, btreeComparatorFactories,
- virtualBufferCacheProvider, new ConstantMergePolicyProvider(MERGE_THRESHOLD),
+ virtualBufferCacheProvider, new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES,
ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
NoOpIOOperationCallback.INSTANCE, linearizerCmpFactory, DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
index f883d90..fd6171a 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
@@ -15,12 +15,15 @@
package edu.uci.ics.hyracks.tests.am.lsm.rtree;
+import java.util.HashMap;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -30,7 +33,11 @@
public class LSMRTreeWithAntiMatterTuplesOperatorTestHelper extends LSMTreeOperatorTestHelper {
- private static final int MERGE_THRESHOLD = 3;
+ private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+ static {
+ MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+ MERGE_POLICY_PROPERTIES.put("num-components", "3");
+ }
public LSMRTreeWithAntiMatterTuplesOperatorTestHelper(IOManager ioManager) {
super(ioManager);
@@ -40,9 +47,8 @@
IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
IBinaryComparatorFactory[] btreeComparatorFactories, ILinearizeComparatorFactory linearizerCmpFactory) {
return new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(valueProviderFactories, rtreePolicyType,
- btreeComparatorFactories, virtualBufferCacheProvider, new ConstantMergePolicyProvider(MERGE_THRESHOLD),
- ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
- NoOpIOOperationCallback.INSTANCE, linearizerCmpFactory);
+ btreeComparatorFactories, virtualBufferCacheProvider, new ConstantMergePolicyFactory(),
+ MERGE_POLICY_PROPERTIES, ThreadCountingOperationTrackerProvider.INSTANCE,
+ SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE, linearizerCmpFactory);
}
-
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
index 35c9bcb..577fc2c 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
@@ -41,9 +41,9 @@
}
public LSMBTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
- List<IVirtualBufferCache> virtualBufferCaches, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy,
- ILSMOperationTrackerProvider opTrackerFactory, ILSMIOOperationScheduler ioScheduler,
- ILSMIOOperationCallbackProvider ioOpCallbackProvider) {
+ List<IVirtualBufferCache> virtualBufferCaches, double bloomFilterFalsePositiveRate,
+ ILSMMergePolicy mergePolicy, ILSMOperationTrackerProvider opTrackerFactory,
+ ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackProvider ioOpCallbackProvider) {
super(opDesc, ctx, partition, virtualBufferCaches, bloomFilterFalsePositiveRate, mergePolicy, opTrackerFactory,
ioScheduler, ioOpCallbackProvider);
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
index f4be88f..37ccda0 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
@@ -15,12 +15,14 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -30,11 +32,11 @@
private static final long serialVersionUID = 1L;
public LSMBTreeDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
- ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerFactory,
- ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackProvider ioOpCallbackProvider,
- double bloomFilterFalsePositiveRate) {
- super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerFactory, ioSchedulerProvider,
- ioOpCallbackProvider, bloomFilterFalsePositiveRate);
+ ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+ ILSMOperationTrackerProvider opTrackerFactory, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+ ILSMIOOperationCallbackProvider ioOpCallbackProvider, double bloomFilterFalsePositiveRate) {
+ super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerFactory,
+ ioSchedulerProvider, ioOpCallbackProvider, bloomFilterFalsePositiveRate);
}
@Override
@@ -42,7 +44,7 @@
int partition) {
return new LSMBTreeDataflowHelper(opDesc, ctx, partition,
virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
- mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
- ioOpCallbackProvider);
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+ ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackProvider);
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
index ebd7020..279605f 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
@@ -15,10 +15,14 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
public interface ILSMMergePolicy {
public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
IndexException;
+
+ public void configure(Map<String, String> properties);
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
similarity index 73%
rename from hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java
rename to hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
index cf56750..c90b5f7 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
@@ -15,9 +15,13 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
import java.io.Serializable;
+import java.util.Map;
+import java.util.Set;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+public interface ILSMMergePolicyFactory extends Serializable {
+ public ILSMMergePolicy createMergePolicy(Map<String, String> configuration);
-public interface ILSMMergePolicyProvider extends Serializable {
- public ILSMMergePolicy getMergePolicy(IHyracksTaskContext ctx);
-}
+ public String getName();
+
+ public Set<String> getPropertiesNames();
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java
index 205bf27..8b56be6 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java
@@ -15,10 +15,12 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
@@ -26,21 +28,23 @@
protected static final long serialVersionUID = 1L;
protected final IVirtualBufferCacheProvider virtualBufferCacheProvider;
- protected final ILSMMergePolicyProvider mergePolicyProvider;
+ protected final ILSMMergePolicyFactory mergePolicyFactory;
+ protected final Map<String, String> mergePolicyProperties;
protected final ILSMOperationTrackerProvider opTrackerFactory;
protected final ILSMIOOperationSchedulerProvider ioSchedulerProvider;
protected final ILSMIOOperationCallbackProvider ioOpCallbackProvider;
protected final double bloomFilterFalsePositiveRate;
public AbstractLSMIndexDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
- ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerFactory,
- ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackProvider ioOpCallbackProvider,
- double bloomFilterFalsePositiveRate) {
+ ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+ ILSMOperationTrackerProvider opTrackerFactory, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+ ILSMIOOperationCallbackProvider ioOpCallbackProvider, double bloomFilterFalsePositiveRate) {
this.virtualBufferCacheProvider = virtualBufferCacheProvider;
- this.mergePolicyProvider = mergePolicyProvider;
+ this.mergePolicyFactory = mergePolicyFactory;
this.opTrackerFactory = opTrackerFactory;
this.ioSchedulerProvider = ioSchedulerProvider;
this.ioOpCallbackProvider = ioOpCallbackProvider;
this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
+ this.mergePolicyProperties = mergePolicyProperties;
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index c90e093..e49432c 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -16,6 +16,7 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
import java.util.List;
+import java.util.Map;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
@@ -27,12 +28,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
public class ConstantMergePolicy implements ILSMMergePolicy {
-
- private final int threshold;
-
- public ConstantMergePolicy(int threshold) {
- this.threshold = threshold;
- }
+ private int numComponents;
@Override
public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
@@ -47,10 +43,15 @@
ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
accessor.scheduleFullMerge(NoOpIOOperationCallback.INSTANCE);
- } else if (immutableComponents.size() >= threshold) {
+ } else if (immutableComponents.size() >= numComponents) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, immutableComponents);
}
}
+
+ @Override
+ public void configure(Map<String, String> properties) {
+ numComponents = Integer.parseInt(properties.get("num-components"));
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java
new file mode 100644
index 0000000..13f5ad9
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+public class ConstantMergePolicyFactory implements ILSMMergePolicyFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String[] SET_VALUES = new String[] { "num-components" };
+ private static final Set<String> PROPERTIES_NAMES = new HashSet<String>(Arrays.asList(SET_VALUES));
+
+ @Override
+ public ILSMMergePolicy createMergePolicy(Map<String, String> properties) {
+ ILSMMergePolicy policy = new ConstantMergePolicy();
+ policy.configure(properties);
+ return policy;
+ }
+
+ @Override
+ public String getName() {
+ return "constant";
+ }
+
+ @Override
+ public Set<String> getPropertiesNames() {
+ return PROPERTIES_NAMES;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
deleted file mode 100644
index a7383c1..0000000
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
-
-public class ConstantMergePolicyProvider implements ILSMMergePolicyProvider {
-
- private static final long serialVersionUID = 1L;
-
- private final int threshold;
-
- public ConstantMergePolicyProvider(int threshold) {
- this.threshold = threshold;
- }
-
- @Override
- public ILSMMergePolicy getMergePolicy(IHyracksTaskContext ctx) {
- return new ConstantMergePolicy(threshold);
- }
-
-}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
index 02bae37..ca22268 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import java.util.Map;
+
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
@@ -25,4 +27,8 @@
// Do nothing
}
+ @Override
+ public void configure(Map<String, String> properties) {
+ // Do nothing
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
new file mode 100644
index 0000000..65d80f6
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+
+public class PrefixMergePolicy implements ILSMMergePolicy {
+
+ private long maxMergableComponentSize;
+ private int maxTolernaceComponentCount;
+
+ @Override
+ public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
+ IndexException {
+ // 1. Look at the candidate components for merging in oldest-first order. If one exists, identify the prefix of the sequence of
+ // all such components for which the sum of their sizes exceeds MaxMrgCompSz. Schedule a merge of those components into a new component.
+ // 2. If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt. If so, schedule
+ // a merge all of the current candidates into a new single component.
+ List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+ for (ILSMComponent c : immutableComponents) {
+ if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+ return;
+ }
+ }
+ if (fullMergeIsRequested) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFullMerge(NoOpIOOperationCallback.INSTANCE);
+ return;
+ }
+ long totalSize = 0;
+ int startIndex = -1;
+ for (int i = 0; i < immutableComponents.size(); i++) {
+ ILSMComponent c = immutableComponents.get(i);
+ long componentSize = ((AbstractDiskLSMComponent) c).getComponentSize();
+ if (componentSize > maxMergableComponentSize) {
+ startIndex = i;
+ totalSize = 0;
+ continue;
+ }
+ totalSize += componentSize;
+ boolean isLastComponent = i + 1 == immutableComponents.size() ? true : false;
+ if (totalSize > maxMergableComponentSize
+ || (isLastComponent && i - startIndex >= maxTolernaceComponentCount)) {
+ List<ILSMComponent> mergableCopments = new ArrayList<ILSMComponent>();
+ for (int j = startIndex + 1; j <= i; j++) {
+ mergableCopments.add(immutableComponents.get(j));
+ }
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, mergableCopments);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void configure(Map<String, String> properties) {
+ maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size"));
+ maxTolernaceComponentCount = Integer.parseInt(properties.get("max-tolernace-component-count"));
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java
new file mode 100644
index 0000000..981ec6c
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+public class PrefixMergePolicyFactory implements ILSMMergePolicyFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String[] SET_VALUES = new String[] { "max-mergable-component-size",
+ "max-tolernace-component-count" };
+ private static final Set<String> PROPERTIES_NAMES = new HashSet<String>(Arrays.asList(SET_VALUES));
+
+ @Override
+ public ILSMMergePolicy createMergePolicy(Map<String, String> properties) {
+ ILSMMergePolicy policy = new PrefixMergePolicy();
+ policy.configure(properties);
+ return policy;
+ }
+
+ @Override
+ public String getName() {
+ return "prefix";
+ }
+
+ @Override
+ public Set<String> getPropertiesNames() {
+ return PROPERTIES_NAMES;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java
index 85aa312..aa86023 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java
@@ -15,12 +15,14 @@
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -30,11 +32,11 @@
private static final long serialVersionUID = 1L;
public LSMInvertedIndexDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
- ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerProvider,
- ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackProvider ioOpCallbackProvider,
- double bloomFilterFalsePositiveRate) {
- super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerProvider, ioSchedulerProvider,
- ioOpCallbackProvider, bloomFilterFalsePositiveRate);
+ ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+ ILSMOperationTrackerProvider opTrackerProvider, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+ ILSMIOOperationCallbackProvider ioOpCallbackProvider, double bloomFilterFalsePositiveRate) {
+ super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerProvider,
+ ioSchedulerProvider, ioOpCallbackProvider, bloomFilterFalsePositiveRate);
}
@Override
@@ -42,8 +44,8 @@
int partition) {
return new LSMInvertedIndexDataflowHelper(opDesc, ctx, partition,
virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
- mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
- ioOpCallbackProvider);
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+ ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackProvider);
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
index 63d99aa..49a4201 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
@@ -15,12 +15,14 @@
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -30,11 +32,11 @@
private static final long serialVersionUID = 1L;
public PartitionedLSMInvertedIndexDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
- ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerProvider,
- ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackProvider ioOpCallbackProvider,
- double bloomFilterFalsePositiveRate) {
- super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerProvider, ioSchedulerProvider,
- ioOpCallbackProvider, bloomFilterFalsePositiveRate);
+ ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+ ILSMOperationTrackerProvider opTrackerProvider, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+ ILSMIOOperationCallbackProvider ioOpCallbackProvider, double bloomFilterFalsePositiveRate) {
+ super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerProvider,
+ ioSchedulerProvider, ioOpCallbackProvider, bloomFilterFalsePositiveRate);
}
@Override
@@ -42,7 +44,7 @@
int partition) {
return new PartitionedLSMInvertedIndexDataflowHelper(opDesc, ctx, partition,
virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
- mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
- ioOpCallbackProvider);
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+ ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackProvider);
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
index 99b4219..d653a4a 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -23,7 +25,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -40,12 +42,12 @@
public LSMRTreeDataflowHelperFactory(IPrimitiveValueProviderFactory[] valueProviderFactories,
RTreePolicyType rtreePolicyType, IBinaryComparatorFactory[] btreeComparatorFactories,
- IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyProvider mergePolicyProvider,
- ILSMOperationTrackerProvider opTrackerFactory, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
- ILSMIOOperationCallbackProvider ioOpCallbackProvider, ILinearizeComparatorFactory linearizeCmpFactory,
- double bloomFilterFalsePositiveRate) {
- super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerFactory, ioSchedulerProvider,
- ioOpCallbackProvider, bloomFilterFalsePositiveRate);
+ IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyFactory mergePolicyFactory,
+ Map<String, String> mergePolicyProperties, ILSMOperationTrackerProvider opTrackerFactory,
+ ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackProvider ioOpCallbackProvider,
+ ILinearizeComparatorFactory linearizeCmpFactory, double bloomFilterFalsePositiveRate) {
+ super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerFactory,
+ ioSchedulerProvider, ioOpCallbackProvider, bloomFilterFalsePositiveRate);
this.btreeComparatorFactories = btreeComparatorFactories;
this.valueProviderFactories = valueProviderFactories;
this.rtreePolicyType = rtreePolicyType;
@@ -58,7 +60,7 @@
return new LSMRTreeDataflowHelper(opDesc, ctx, partition,
virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
btreeComparatorFactories, valueProviderFactories, rtreePolicyType,
- mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
- ioOpCallbackProvider, linearizeCmpFactory);
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+ ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackProvider, linearizeCmpFactory);
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
index 00268b3..d473425 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
@@ -15,47 +15,42 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow;
+import java.util.Map;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
-public class LSMRTreeWithAntiMatterTuplesDataflowHelperFactory implements IIndexDataflowHelperFactory {
+public class LSMRTreeWithAntiMatterTuplesDataflowHelperFactory extends AbstractLSMIndexDataflowHelperFactory {
private static final long serialVersionUID = 1L;
- private final IVirtualBufferCacheProvider virtualBufferCacheProvider;
private final IBinaryComparatorFactory[] btreeComparatorFactories;
private final IPrimitiveValueProviderFactory[] valueProviderFactories;
private final RTreePolicyType rtreePolicyType;
- private final ILSMMergePolicyProvider mergePolicyProvider;
- private final ILSMOperationTrackerProvider opTrackerProvider;
- private final ILSMIOOperationSchedulerProvider ioSchedulerProvider;
- private final ILSMIOOperationCallbackProvider ioOpCallbackProvider;
private final ILinearizeComparatorFactory linearizeCmpFactory;
public LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(IPrimitiveValueProviderFactory[] valueProviderFactories,
RTreePolicyType rtreePolicyType, IBinaryComparatorFactory[] btreeComparatorFactories,
- IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyProvider mergePolicyProvider,
- ILSMOperationTrackerProvider opTrackerProvider, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
- ILSMIOOperationCallbackProvider ioOpCallbackProvider, ILinearizeComparatorFactory linearizeCmpFactory) {
- this.virtualBufferCacheProvider = virtualBufferCacheProvider;
+ IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyFactory mergePolicyFactory,
+ Map<String, String> mergePolicyProperties, ILSMOperationTrackerProvider opTrackerFactory,
+ ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackProvider ioOpCallbackProvider,
+ ILinearizeComparatorFactory linearizeCmpFactory) {
+ super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, null, ioSchedulerProvider,
+ ioOpCallbackProvider, 1.0);
this.btreeComparatorFactories = btreeComparatorFactories;
this.valueProviderFactories = valueProviderFactories;
this.rtreePolicyType = rtreePolicyType;
- this.mergePolicyProvider = mergePolicyProvider;
- this.ioSchedulerProvider = ioSchedulerProvider;
- this.opTrackerProvider = opTrackerProvider;
- this.ioOpCallbackProvider = ioOpCallbackProvider;
this.linearizeCmpFactory = linearizeCmpFactory;
}
@@ -64,7 +59,7 @@
int partition) {
return new LSMRTreeWithAntiMatterTuplesDataflowHelper(opDesc, ctx, partition,
virtualBufferCacheProvider.getVirtualBufferCaches(ctx), btreeComparatorFactories,
- valueProviderFactories, rtreePolicyType, mergePolicyProvider.getMergePolicy(ctx), opTrackerProvider,
- ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackProvider, linearizeCmpFactory);
+ valueProviderFactories, rtreePolicyType, mergePolicyFactory.createMergePolicy(mergePolicyProperties),
+ opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackProvider, linearizeCmpFactory);
}
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 4863378..a577bdb 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -25,7 +25,9 @@
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.logging.Logger;
@@ -74,7 +76,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
@@ -134,6 +136,12 @@
protected int frameSize = ClusterConfig.getFrameSize();
protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+ private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+ static {
+ MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+ MERGE_POLICY_PROPERTIES.put("num-components", "3");
+ }
+
protected static final String SECONDARY_INDEX_ODD = "secondary1";
protected static final String SECONDARY_INDEX_EVEN = "secondary2";
@@ -480,9 +488,9 @@
protected IIndexDataflowHelperFactory getIndexDataflowHelperFactory() {
if (BspUtils.useLSM(conf)) {
- return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(), new ConstantMergePolicyProvider(
- 3), NoOpOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
- NoOpIOOperationCallback.INSTANCE, 0.01);
+ return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(),
+ new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES, NoOpOperationTrackerProvider.INSTANCE,
+ SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE, 0.01);
} else {
return new BTreeDataflowHelperFactory();
}
@@ -739,8 +747,7 @@
* connect operator descriptors
*/
ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
- sort, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sort, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);