fix for issue #732
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index aa47041..961e527 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -44,7 +44,7 @@
}
public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema outerPlanSchema, IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(context.getFrameSize());
if (jobEventListenerFactory != null) {
spec.setJobletEventListenerFactory(jobEventListenerFactory);
}
diff --git a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 974dc4d..2b29448 100644
--- a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -101,6 +101,7 @@
private static final String PATH_ACTUAL = "rttest";
private static final String PATH_BASE = "src" + SEPARATOR + "test" + SEPARATOR + "resources";
private static final String PATH_EXPECTED = PATH_BASE + SEPARATOR + "results";
+ private static final int FRAME_SIZE = 32768;
private static final String[] DEFAULT_NODES = new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID };
@@ -123,7 +124,7 @@
@Test
public void etsAssignPrint() throws Exception {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(FRAME_SIZE);
IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
@@ -147,7 +148,7 @@
@Test
public void etsAssignWrite() throws Exception {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(FRAME_SIZE);
IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
@@ -178,7 +179,7 @@
@Test
public void scanSelectWrite() throws Exception {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(FRAME_SIZE);
// the scanner
FileSplit[] intFileSplits = new FileSplit[1];
@@ -224,7 +225,7 @@
@Test
public void etsAssignProjectWrite() throws Exception {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(FRAME_SIZE);
IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
@@ -261,7 +262,7 @@
@Test
public void scanLimitWrite() throws Exception {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(FRAME_SIZE);
// the scanner
FileSplit[] fileSplits = new FileSplit[1];
@@ -312,7 +313,7 @@
@Test
public void etsUnnestWrite() throws Exception {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(FRAME_SIZE);
EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
@@ -342,7 +343,7 @@
@Test
public void scanAggregateWrite() throws Exception {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(FRAME_SIZE);
// the scanner
FileSplit[] fileSplits = new FileSplit[1];
@@ -394,7 +395,7 @@
@Test
public void scanSortGbySelectWrite() throws Exception {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(FRAME_SIZE);
// the scanner
FileSplit[] fileSplits = new FileSplit[1];
@@ -479,7 +480,7 @@
@Test
public void scanHashGbySelectWrite() throws Exception {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(FRAME_SIZE);
// the scanner
FileSplit[] fileSplits = new FileSplit[1];
@@ -548,7 +549,7 @@
@Test
public void etsUnnestRunningaggregateWrite() throws Exception {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(FRAME_SIZE);
EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
@@ -587,7 +588,7 @@
@Test
public void etsAssignScriptWrite() throws Exception {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(FRAME_SIZE);
IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
@@ -643,7 +644,7 @@
public void scanSplitWrite() throws Exception {
final int outputArity = 2;
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(FRAME_SIZE);
String inputFileName = "data/tpch0.001/customer.tbl";
File inputFile = new File(inputFileName);
@@ -696,7 +697,7 @@
@Test
public void scanMicroSortWrite() throws Exception {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(FRAME_SIZE);
// the scanner
FileSplit[] fileSplits = new FileSplit[1];
@@ -744,7 +745,7 @@
@Test
public void etsAssignSubplanProjectWrite() throws Exception {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(FRAME_SIZE);
IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
@@ -804,7 +805,7 @@
@Test
public void scanMicroSortGbySelectWrite() throws Exception {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(FRAME_SIZE);
// the scanner
FileSplit[] fileSplits = new FileSplit[1];
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index 0873582..1915a54 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -59,7 +59,7 @@
private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
- private int frameSize;
+ private int frameSize = 32768;
private int maxReattempts;
@@ -73,6 +73,8 @@
private transient int connectorIdCounter;
+ // This constructor uses the default frame size. It is for test purposes only.
+ // For other use cases, use the one which sets the frame size.
public JobSpecification() {
roots = new ArrayList<OperatorDescriptorId>();
resultSetIds = new ArrayList<ResultSetId>();
@@ -85,10 +87,14 @@
userConstraints = new HashSet<Constraint>();
operatorIdCounter = 0;
connectorIdCounter = 0;
- frameSize = 32768;
maxReattempts = 2;
useConnectorPolicyForScheduling = true;
}
+
+ public JobSpecification(int frameSize) {
+ this();
+ setFrameSize(frameSize);
+ }
@Override
public OperatorDescriptorId createOperatorDescriptorId(IOperatorDescriptor op) {
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
index cde561e..a18d496 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -53,7 +53,7 @@
// This example will insert tuples into the primary and secondary index using an insert pipeline
-public class InsertPipelineExample {
+public class InsertPipelineExample {
private static class Options {
@Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
public String host;
@@ -72,6 +72,9 @@
@Option(name = "-secondary-btreename", usage = "B-Tree file name of secondary index", required = true)
public String secondaryBTreeName;
+
+ @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+ public int frameSize = 32768;
}
public static void main(String[] args) throws Exception {
@@ -92,7 +95,7 @@
private static JobSpecification createJob(Options options) {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(options.frameSize);
String[] splitNCs = options.ncs.split(",");
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index a027570..1c1901d 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -51,7 +51,7 @@
// This example will load a primary index from randomly generated data
-public class PrimaryIndexBulkLoadExample {
+public class PrimaryIndexBulkLoadExample {
private static class Options {
@Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
public String host;
@@ -70,6 +70,9 @@
@Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 32768)", required = false)
public int sbSize = 32768;
+
+ @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+ public int frameSize = 32768;
}
public static void main(String[] args) throws Exception {
@@ -90,7 +93,7 @@
private static JobSpecification createJob(Options options) {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(options.frameSize);
String[] splitNCs = options.ncs.split(",");
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
index a36fbdd..01004e6f 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
@@ -63,6 +63,9 @@
@Option(name = "-btreename", usage = "B-Tree file name to search", required = true)
public String btreeName;
+
+ @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+ public int frameSize = 32768;
}
public static void main(String[] args) throws Exception {
@@ -83,7 +86,7 @@
private static JobSpecification createJob(Options options) throws HyracksDataException {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(options.frameSize);
String[] splitNCs = options.ncs.split(",");
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index 805660f..4826808 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -65,6 +65,9 @@
@Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 32768)", required = false)
public int sbSize = 32768;
+
+ @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+ public int frameSize = 32768;
}
public static void main(String[] args) throws Exception {
@@ -85,7 +88,7 @@
private static JobSpecification createJob(Options options) {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(options.frameSize);
String[] splitNCs = options.ncs.split(",");
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
index fc5a37a..5ff5003 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
@@ -66,6 +66,9 @@
@Option(name = "-secondary-btreename", usage = "Secondary B-Tree file name to search", required = true)
public String secondaryBTreeName;
+
+ @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+ public int frameSize = 32768;
}
public static void main(String[] args) throws Exception {
@@ -86,7 +89,7 @@
private static JobSpecification createJob(Options options) throws HyracksDataException {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(options.frameSize);
String[] splitNCs = options.ncs.split(",");
diff --git a/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index be8e986..1629a1e 100644
--- a/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -83,8 +83,8 @@
@Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
public int htSize = 8191;
- @Option(name = "-frames-limit", usage = "Frame size (default: 32768)", required = false)
- public int framesLimit = 32768;
+ @Option(name = "-frame-size", usage = "Frame size (default: 32768)", required = false)
+ public int frameSize = 32768;
@Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 512)", required = false)
public int sbSize = 512;
@@ -114,7 +114,7 @@
for (int i = 0; i < 6; i++) {
long start = System.currentTimeMillis();
job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits, i),
- options.htSize, options.sbSize, options.framesLimit, options.sortOutput, options.algo,
+ options.htSize, options.sbSize, options.frameSize, options.sortOutput, options.algo,
options.outPlain);
System.out.print(i + "\t" + (System.currentTimeMillis() - start));
@@ -155,8 +155,8 @@
}
private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, int htSize, int sbSize,
- int framesLimit, boolean sortOutput, int alg, boolean outPlain) {
- JobSpecification spec = new JobSpecification();
+ int frameSize, boolean sortOutput, int alg, boolean outPlain) {
+ JobSpecification spec = new JobSpecification(frameSize);
IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
RecordDescriptor inDesc = new RecordDescriptor(new ISerializerDeserializer[] {
@@ -198,7 +198,7 @@
switch (alg) {
case 0: // new external hash graph
grouper = new edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor(spec,
- keys, framesLimit, new IBinaryComparatorFactory[] {
+ keys, frameSize, new IBinaryComparatorFactory[] {
// PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
new IntegerNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
@@ -221,7 +221,7 @@
break;
case 1: // External-sort + new-precluster
- ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, framesLimit, keys,
+ ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, frameSize, keys,
new IBinaryComparatorFactory[] {
// PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, inDesc);
@@ -271,7 +271,7 @@
break;
default:
grouper = new edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor(spec,
- keys, framesLimit, new IBinaryComparatorFactory[] {
+ keys, frameSize, new IBinaryComparatorFactory[] {
// PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
new IntegerNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
diff --git a/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
index d342082..82421ae 100644
--- a/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
@@ -85,6 +85,9 @@
@Option(name = "-runtime-profiling", usage = "Indicates if runtime profiling should be enabled. (default: false)")
public boolean runtimeProfiling = false;
+
+ @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+ public int frameSize = 32768;
}
public static void main(String[] args) throws Exception {
@@ -95,7 +98,7 @@
IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
JobSpecification job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits),
- options.algo, options.htSize, options.sbSize, options.format);
+ options.algo, options.htSize, options.sbSize, options.format, options.frameSize);
long start = System.currentTimeMillis();
JobId jobId = hcc.startJob(job,
@@ -120,8 +123,8 @@
}
private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, String algo, int htSize,
- int sbSize, String format) {
- JobSpecification spec = new JobSpecification();
+ int sbSize, String format, int frameSize) {
+ JobSpecification spec = new JobSpecification(frameSize);
IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
RecordDescriptor wordDesc = new RecordDescriptor(
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index 6017e8f..a54e940 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -109,6 +109,9 @@
// Whether group-by is processed after the join
@Option(name = "-has-groupby", usage = "Whether to have group-by operation after join (default: disabled)", required = false)
public boolean hasGroupBy = false;
+
+ @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+ public int frameSize = 32768;
}
public static void main(String[] args) throws Exception {
@@ -121,7 +124,7 @@
JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
options.numJoinPartitions, options.algo, options.graceInputSize, options.graceRecordsPerFrame,
- options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy);
+ options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy, options.frameSize);
long start = System.currentTimeMillis();
JobId jobId = hcc.startJob(job,
@@ -147,8 +150,8 @@
private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
FileSplit[] resultSplits, int numJoinPartitions, String algo, int graceInputSize, int graceRecordsPerFrame,
- double graceFactor, int memSize, int tableSize, boolean hasGroupBy) throws HyracksDataException {
- JobSpecification spec = new JobSpecification();
+ double graceFactor, int memSize, int tableSize, boolean hasGroupBy, int frameSize) throws HyracksDataException {
+ JobSpecification spec = new JobSpecification(frameSize);
IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 64c4ea3..bde94a9 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -227,7 +227,7 @@
@Override
public JobSpecification generateCreatingJob() throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
@@ -261,7 +261,7 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
/**
@@ -325,7 +325,7 @@
public JobSpecification scanIndexPrintGraph(String nodeName, String path) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
@@ -465,7 +465,7 @@
* generate a "clear state" job
*/
public JobSpecification generateClearState() throws HyracksException {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
ClearStateOperatorDescriptor clearState = new ClearStateOperatorDescriptor(spec, jobId);
setLocationConstraint(spec, clearState);
spec.addRoot(clearState);
@@ -479,7 +479,7 @@
* @throws HyracksException
*/
protected JobSpecification dropIndex(String indexName) throws HyracksException {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, indexName);
IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
@@ -519,7 +519,7 @@
Configuration conf = job.getConfiguration();
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
/**
@@ -586,7 +586,7 @@
HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
@@ -674,7 +674,7 @@
/** generate plan specific state checkpointing */
protected JobSpecification[] generateStateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* source aggregate
@@ -727,7 +727,7 @@
}
Configuration conf = tmpJob.getConfiguration();
Class vertexIdClass = BspUtils.getVertexIndexClass(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/***
* HDFS read operator
@@ -826,7 +826,7 @@
private JobSpecification bulkLoadLiveVertexBTree(int iteration) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index f838fb8..a0da2a3 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -108,7 +108,7 @@
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
@@ -311,7 +311,7 @@
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* source aggregate
@@ -567,7 +567,7 @@
private JobSpecification generateSecondaryBTreeCheckpointLoad(int lastSuccessfulIteration, PregelixJob job)
throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
@@ -650,7 +650,7 @@
Class<? extends Writable> msgListClass = MsgList.class;
String readFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
IFileSplitProvider secondaryFileSplitProviderRead = getFileSplitProvider(jobId, readFile);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
*/
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 39a56bf..2853fd0 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -84,7 +84,7 @@
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
@@ -266,7 +266,7 @@
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* source aggregate
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index c10e6c2..a72777b 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -84,7 +84,7 @@
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
@@ -257,7 +257,7 @@
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* source aggregate
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 953d82c..e28b06b 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -80,7 +80,7 @@
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
@@ -267,7 +267,7 @@
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* source aggregate