support variable frame size in btree loading
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index ce1a34d..8f0019e 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -173,6 +173,7 @@
new BTreeDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, btreeCreate);
+ spec.setFrameSize(frameSize);
return spec;
}
@@ -242,6 +243,7 @@
new WritableSerializerDeserializerFactory(vertexIdClass));
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
+ spec.setFrameSize(frameSize);
return spec;
}
@@ -319,6 +321,7 @@
spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
comparatorFactories), sorter, 0, writer, 0);
+ spec.setFrameSize(frameSize);
return spec;
}
@@ -444,6 +447,7 @@
*/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
+ spec.setFrameSize(frameSize);
return spec;
}
@@ -462,6 +466,7 @@
ClusterConfig.setLocationConstraint(spec, drop);
spec.addRoot(drop);
+ spec.setFrameSize(frameSize);
return spec;
}