Allowed user to specify the merging policy
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 4863378..a577bdb 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -25,7 +25,9 @@
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.logging.Logger;
@@ -74,7 +76,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
@@ -134,6 +136,12 @@
protected int frameSize = ClusterConfig.getFrameSize();
protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+ private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+ static {
+ MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+ MERGE_POLICY_PROPERTIES.put("num-components", "3");
+ }
+
protected static final String SECONDARY_INDEX_ODD = "secondary1";
protected static final String SECONDARY_INDEX_EVEN = "secondary2";
@@ -480,9 +488,9 @@
protected IIndexDataflowHelperFactory getIndexDataflowHelperFactory() {
if (BspUtils.useLSM(conf)) {
- return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(), new ConstantMergePolicyProvider(
- 3), NoOpOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
- NoOpIOOperationCallback.INSTANCE, 0.01);
+ return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(),
+ new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES, NoOpOperationTrackerProvider.INSTANCE,
+ SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE, 0.01);
} else {
return new BTreeDataflowHelperFactory();
}
@@ -739,8 +747,7 @@
* connect operator descriptors
*/
ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
- sort, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sort, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);