fix for issue #732
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 64c4ea3..bde94a9 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -227,7 +227,7 @@
@Override
public JobSpecification generateCreatingJob() throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
@@ -261,7 +261,7 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
/**
@@ -325,7 +325,7 @@
public JobSpecification scanIndexPrintGraph(String nodeName, String path) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
@@ -465,7 +465,7 @@
* generate a "clear state" job
*/
public JobSpecification generateClearState() throws HyracksException {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
ClearStateOperatorDescriptor clearState = new ClearStateOperatorDescriptor(spec, jobId);
setLocationConstraint(spec, clearState);
spec.addRoot(clearState);
@@ -479,7 +479,7 @@
* @throws HyracksException
*/
protected JobSpecification dropIndex(String indexName) throws HyracksException {
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, indexName);
IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
@@ -519,7 +519,7 @@
Configuration conf = job.getConfiguration();
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
/**
@@ -586,7 +586,7 @@
HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
@@ -674,7 +674,7 @@
/** generate plan specific state checkpointing */
protected JobSpecification[] generateStateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* source aggregate
@@ -727,7 +727,7 @@
}
Configuration conf = tmpJob.getConfiguration();
Class vertexIdClass = BspUtils.getVertexIndexClass(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/***
* HDFS read operator
@@ -826,7 +826,7 @@
private JobSpecification bulkLoadLiveVertexBTree(int iteration) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index f838fb8..a0da2a3 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -108,7 +108,7 @@
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
@@ -311,7 +311,7 @@
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* source aggregate
@@ -567,7 +567,7 @@
private JobSpecification generateSecondaryBTreeCheckpointLoad(int lastSuccessfulIteration, PregelixJob job)
throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
@@ -650,7 +650,7 @@
Class<? extends Writable> msgListClass = MsgList.class;
String readFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
IFileSplitProvider secondaryFileSplitProviderRead = getFileSplitProvider(jobId, readFile);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
*/
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 39a56bf..2853fd0 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -84,7 +84,7 @@
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
@@ -266,7 +266,7 @@
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* source aggregate
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index c10e6c2..a72777b 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -84,7 +84,7 @@
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
@@ -257,7 +257,7 @@
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* source aggregate
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 953d82c..e28b06b 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -80,7 +80,7 @@
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
@@ -267,7 +267,7 @@
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
- JobSpecification spec = new JobSpecification();
+ JobSpecification spec = new JobSpecification(frameSize);
/**
* source aggregate