Merge branch 'master' of https://code.google.com/p/hyracks into icetindil/frame_size
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index aa47041..961e527 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -44,7 +44,7 @@
     }
 
     public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema outerPlanSchema, IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(context.getFrameSize());
         if (jobEventListenerFactory != null) {
             spec.setJobletEventListenerFactory(jobEventListenerFactory);
         }
diff --git a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 974dc4d..2b29448 100644
--- a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -101,6 +101,7 @@
     private static final String PATH_ACTUAL = "rttest";
     private static final String PATH_BASE = "src" + SEPARATOR + "test" + SEPARATOR + "resources";
     private static final String PATH_EXPECTED = PATH_BASE + SEPARATOR + "results";
+    private static final int FRAME_SIZE = 32768;
 
     private static final String[] DEFAULT_NODES = new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID };
 
@@ -123,7 +124,7 @@
 
     @Test
     public void etsAssignPrint() throws Exception {
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
         IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
         IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
 
@@ -147,7 +148,7 @@
 
     @Test
     public void etsAssignWrite() throws Exception {
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
         IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
         IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
 
@@ -178,7 +179,7 @@
 
     @Test
     public void scanSelectWrite() throws Exception {
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
 
         // the scanner
         FileSplit[] intFileSplits = new FileSplit[1];
@@ -224,7 +225,7 @@
     @Test
     public void etsAssignProjectWrite() throws Exception {
 
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
         IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
         IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
 
@@ -261,7 +262,7 @@
 
     @Test
     public void scanLimitWrite() throws Exception {
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
 
         // the scanner
         FileSplit[] fileSplits = new FileSplit[1];
@@ -312,7 +313,7 @@
 
     @Test
     public void etsUnnestWrite() throws Exception {
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
 
         EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
         RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
@@ -342,7 +343,7 @@
 
     @Test
     public void scanAggregateWrite() throws Exception {
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
 
         // the scanner
         FileSplit[] fileSplits = new FileSplit[1];
@@ -394,7 +395,7 @@
 
     @Test
     public void scanSortGbySelectWrite() throws Exception {
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
 
         // the scanner
         FileSplit[] fileSplits = new FileSplit[1];
@@ -479,7 +480,7 @@
 
     @Test
     public void scanHashGbySelectWrite() throws Exception {
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
 
         // the scanner
         FileSplit[] fileSplits = new FileSplit[1];
@@ -548,7 +549,7 @@
 
     @Test
     public void etsUnnestRunningaggregateWrite() throws Exception {
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
 
         EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
         RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
@@ -587,7 +588,7 @@
 
     @Test
     public void etsAssignScriptWrite() throws Exception {
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
         IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
         IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
 
@@ -643,7 +644,7 @@
     public void scanSplitWrite() throws Exception {
         final int outputArity = 2;
 
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
 
         String inputFileName = "data/tpch0.001/customer.tbl";
         File inputFile = new File(inputFileName);
@@ -696,7 +697,7 @@
 
     @Test
     public void scanMicroSortWrite() throws Exception {
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
 
         // the scanner
         FileSplit[] fileSplits = new FileSplit[1];
@@ -744,7 +745,7 @@
 
     @Test
     public void etsAssignSubplanProjectWrite() throws Exception {
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
         IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
         IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
 
@@ -804,7 +805,7 @@
 
     @Test
     public void scanMicroSortGbySelectWrite() throws Exception {
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
 
         // the scanner
         FileSplit[] fileSplits = new FileSplit[1];
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index 0873582..1915a54 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -59,7 +59,7 @@
 
     private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
 
-    private int frameSize;
+    private int frameSize = 32768;
 
     private int maxReattempts;
 
@@ -73,6 +73,8 @@
 
     private transient int connectorIdCounter;
 
+    // This constructor uses the default frame size. It is for test purposes only.
+    // For other use cases, use the one which sets the frame size.
     public JobSpecification() {
         roots = new ArrayList<OperatorDescriptorId>();
         resultSetIds = new ArrayList<ResultSetId>();
@@ -85,10 +87,14 @@
         userConstraints = new HashSet<Constraint>();
         operatorIdCounter = 0;
         connectorIdCounter = 0;
-        frameSize = 32768;
         maxReattempts = 2;
         useConnectorPolicyForScheduling = true;
     }
+    
+    public JobSpecification(int frameSize) {
+        this();
+        setFrameSize(frameSize);
+    }
 
     @Override
     public OperatorDescriptorId createOperatorDescriptorId(IOperatorDescriptor op) {
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
index cde561e..a18d496 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -53,7 +53,7 @@
 
 // This example will insert tuples into the primary and secondary index using an insert pipeline
 
-public class InsertPipelineExample {
+public class InsertPipelineExample { 
     private static class Options {
         @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
         public String host;
@@ -72,6 +72,9 @@
 
         @Option(name = "-secondary-btreename", usage = "B-Tree file name of secondary index", required = true)
         public String secondaryBTreeName;
+        
+        @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+        public int frameSize = 32768;
     }
 
     public static void main(String[] args) throws Exception {
@@ -92,7 +95,7 @@
 
     private static JobSpecification createJob(Options options) {
 
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(options.frameSize);
 
         String[] splitNCs = options.ncs.split(",");
 
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index a027570..1c1901d 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -51,7 +51,7 @@
 
 // This example will load a primary index from randomly generated data
 
-public class PrimaryIndexBulkLoadExample {
+public class PrimaryIndexBulkLoadExample {   
     private static class Options {
         @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
         public String host;
@@ -70,6 +70,9 @@
 
         @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 32768)", required = false)
         public int sbSize = 32768;
+        
+        @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+        public int frameSize = 32768;
     }
 
     public static void main(String[] args) throws Exception {
@@ -90,7 +93,7 @@
 
     private static JobSpecification createJob(Options options) {
 
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(options.frameSize);
 
         String[] splitNCs = options.ncs.split(",");
 
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
index a36fbdd..01004e6f 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
@@ -63,6 +63,9 @@
 
         @Option(name = "-btreename", usage = "B-Tree file name to search", required = true)
         public String btreeName;
+        
+        @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+        public int frameSize = 32768;
     }
 
     public static void main(String[] args) throws Exception {
@@ -83,7 +86,7 @@
 
     private static JobSpecification createJob(Options options) throws HyracksDataException {
 
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(options.frameSize);
 
         String[] splitNCs = options.ncs.split(",");
 
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index 805660f..4826808 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -65,6 +65,9 @@
 
         @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 32768)", required = false)
         public int sbSize = 32768;
+        
+        @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+        public int frameSize = 32768;
     }
 
     public static void main(String[] args) throws Exception {
@@ -85,7 +88,7 @@
 
     private static JobSpecification createJob(Options options) {
 
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(options.frameSize);
 
         String[] splitNCs = options.ncs.split(",");
 
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
index fc5a37a..5ff5003 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
@@ -66,6 +66,9 @@
 
         @Option(name = "-secondary-btreename", usage = "Secondary B-Tree file name to search", required = true)
         public String secondaryBTreeName;
+        
+        @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+        public int frameSize = 32768;
     }
 
     public static void main(String[] args) throws Exception {
@@ -86,7 +89,7 @@
 
     private static JobSpecification createJob(Options options) throws HyracksDataException {
 
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(options.frameSize);
 
         String[] splitNCs = options.ncs.split(",");
 
diff --git a/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index be8e986..1629a1e 100644
--- a/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -83,8 +83,8 @@
         @Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
         public int htSize = 8191;
 
-        @Option(name = "-frames-limit", usage = "Frame size (default: 32768)", required = false)
-        public int framesLimit = 32768;
+        @Option(name = "-frame-size", usage = "Frame size (default: 32768)", required = false)
+        public int frameSize = 32768;
 
         @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 512)", required = false)
         public int sbSize = 512;
@@ -114,7 +114,7 @@
         for (int i = 0; i < 6; i++) {
             long start = System.currentTimeMillis();
             job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits, i),
-                    options.htSize, options.sbSize, options.framesLimit, options.sortOutput, options.algo,
+                    options.htSize, options.sbSize, options.frameSize, options.sortOutput, options.algo,
                     options.outPlain);
 
             System.out.print(i + "\t" + (System.currentTimeMillis() - start));
@@ -155,8 +155,8 @@
     }
 
     private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, int htSize, int sbSize,
-            int framesLimit, boolean sortOutput, int alg, boolean outPlain) {
-        JobSpecification spec = new JobSpecification();
+            int frameSize, boolean sortOutput, int alg, boolean outPlain) {
+        JobSpecification spec = new JobSpecification(frameSize);
         IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
 
         RecordDescriptor inDesc = new RecordDescriptor(new ISerializerDeserializer[] {
@@ -198,7 +198,7 @@
         switch (alg) {
             case 0: // new external hash graph
                 grouper = new edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor(spec,
-                        keys, framesLimit, new IBinaryComparatorFactory[] {
+                        keys, frameSize, new IBinaryComparatorFactory[] {
                         // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
                         PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
                         new IntegerNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
@@ -221,7 +221,7 @@
 
                 break;
             case 1: // External-sort + new-precluster
-                ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, framesLimit, keys,
+                ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, frameSize, keys,
                         new IBinaryComparatorFactory[] {
                         // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
                         PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, inDesc);
@@ -271,7 +271,7 @@
                 break;
             default:
                 grouper = new edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor(spec,
-                        keys, framesLimit, new IBinaryComparatorFactory[] {
+                        keys, frameSize, new IBinaryComparatorFactory[] {
                         // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
                         PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
                         new IntegerNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
diff --git a/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
index d342082..82421ae 100644
--- a/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
@@ -85,6 +85,9 @@
 
         @Option(name = "-runtime-profiling", usage = "Indicates if runtime profiling should be enabled. (default: false)")
         public boolean runtimeProfiling = false;
+        
+        @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+        public int frameSize = 32768;
     }
 
     public static void main(String[] args) throws Exception {
@@ -95,7 +98,7 @@
         IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits),
-                options.algo, options.htSize, options.sbSize, options.format);
+                options.algo, options.htSize, options.sbSize, options.format, options.frameSize);
 
         long start = System.currentTimeMillis();
         JobId jobId = hcc.startJob(job,
@@ -120,8 +123,8 @@
     }
 
     private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, String algo, int htSize,
-            int sbSize, String format) {
-        JobSpecification spec = new JobSpecification();
+            int sbSize, String format, int frameSize) {
+        JobSpecification spec = new JobSpecification(frameSize);
 
         IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
         RecordDescriptor wordDesc = new RecordDescriptor(
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index 6017e8f..a54e940 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -109,6 +109,9 @@
         // Whether group-by is processed after the join
         @Option(name = "-has-groupby", usage = "Whether to have group-by operation after join (default: disabled)", required = false)
         public boolean hasGroupBy = false;
+        
+        @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
+        public int frameSize = 32768;
     }
 
     public static void main(String[] args) throws Exception {
@@ -121,7 +124,7 @@
         JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
                 parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
                 options.numJoinPartitions, options.algo, options.graceInputSize, options.graceRecordsPerFrame,
-                options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy);
+                options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy, options.frameSize);
 
         long start = System.currentTimeMillis();
         JobId jobId = hcc.startJob(job,
@@ -147,8 +150,8 @@
 
     private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
             FileSplit[] resultSplits, int numJoinPartitions, String algo, int graceInputSize, int graceRecordsPerFrame,
-            double graceFactor, int memSize, int tableSize, boolean hasGroupBy) throws HyracksDataException {
-        JobSpecification spec = new JobSpecification();
+            double graceFactor, int memSize, int tableSize, boolean hasGroupBy, int frameSize) throws HyracksDataException {
+        JobSpecification spec = new JobSpecification(frameSize);
 
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 64c4ea3..bde94a9 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -227,7 +227,7 @@
     @Override
     public JobSpecification generateCreatingJob() throws HyracksException {
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
@@ -261,7 +261,7 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
         IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         /**
@@ -325,7 +325,7 @@
     public JobSpecification scanIndexPrintGraph(String nodeName, String path) throws HyracksException {
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
 
         /**
          * construct empty tuple operator
@@ -465,7 +465,7 @@
      * generate a "clear state" job
      */
     public JobSpecification generateClearState() throws HyracksException {
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
         ClearStateOperatorDescriptor clearState = new ClearStateOperatorDescriptor(spec, jobId);
         setLocationConstraint(spec, clearState);
         spec.addRoot(clearState);
@@ -479,7 +479,7 @@
      * @throws HyracksException
      */
     protected JobSpecification dropIndex(String indexName) throws HyracksException {
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
 
         IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, indexName);
         IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
@@ -519,7 +519,7 @@
         Configuration conf = job.getConfiguration();
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
         IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         /**
@@ -586,7 +586,7 @@
             HyracksException {
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
 
         /**
          * construct empty tuple operator
@@ -674,7 +674,7 @@
     /** generate plan specific state checkpointing */
     protected JobSpecification[] generateStateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
 
         /**
          * source aggregate
@@ -727,7 +727,7 @@
         }
         Configuration conf = tmpJob.getConfiguration();
         Class vertexIdClass = BspUtils.getVertexIndexClass(conf);
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
 
         /***
          * HDFS read operator
@@ -826,7 +826,7 @@
     private JobSpecification bulkLoadLiveVertexBTree(int iteration) throws HyracksException {
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
 
         /**
          * construct empty tuple operator
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index f838fb8..a0da2a3 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -108,7 +108,7 @@
         Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
         String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
 
         /**
          * construct empty tuple operator
@@ -311,7 +311,7 @@
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
         String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
 
         /**
          * source aggregate
@@ -567,7 +567,7 @@
     private JobSpecification generateSecondaryBTreeCheckpointLoad(int lastSuccessfulIteration, PregelixJob job)
             throws HyracksException {
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
 
         String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
         PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
@@ -650,7 +650,7 @@
         Class<? extends Writable> msgListClass = MsgList.class;
         String readFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
         IFileSplitProvider secondaryFileSplitProviderRead = getFileSplitProvider(jobId, readFile);
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
         /**
          * construct empty tuple operator
          */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 39a56bf..2853fd0 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -84,7 +84,7 @@
         Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
         String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
 
         /**
          * construct empty tuple operator
@@ -266,7 +266,7 @@
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
         String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
 
         /**
          * source aggregate
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index c10e6c2..a72777b 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -84,7 +84,7 @@
         Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
         String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
 
         /**
          * construct empty tuple operator
@@ -257,7 +257,7 @@
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
         String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
 
         /**
          * source aggregate
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 953d82c..e28b06b 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -80,7 +80,7 @@
         Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
         String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
 
         /**
          * construct empty tuple operator
@@ -267,7 +267,7 @@
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
         String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
-        JobSpecification spec = new JobSpecification();
+        JobSpecification spec = new JobSpecification(frameSize);
 
         /**
          * source aggregate