support job specific frame size in Pregelix

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2682 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index bba2229..8b6d1b6 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -64,6 +64,8 @@
     public static final String INCREASE_STATE_LENGTH = "pregelix.incStateLength";
     /** job id */
     public static final String JOB_ID = "pregelix.jobid";
+    /** frame size */
+    public static final String FRAME_SIZE = "pregelix.framesize";
 
     /**
      * Constructor that will instantiate the configuration
@@ -154,4 +156,14 @@
     final public void setDynamicVertexValueSize(boolean incStateLengthDynamically) {
         getConfiguration().setBoolean(INCREASE_STATE_LENGTH, incStateLengthDynamically);
     }
+
+    /**
+     * Set the frame size for a job
+     * 
+     * @param frameSize
+     *            the desired frame size
+     */
+    final public void setFrameSize(int frameSize) {
+        getConfiguration().setInt(FRAME_SIZE, frameSize);
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 6066dfb..ff9724d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -421,4 +421,15 @@
     public static boolean getDynamicVertexValueSize(Configuration conf) {
         return conf.getBoolean(PregelixJob.INCREASE_STATE_LENGTH, false);
     }
+
+    /**
+     * Get the specified frame size
+     * 
+     * @param conf
+     *            the job configuration
+     * @return the specified frame size; -1 if it is not set by users
+     */
+    public static int getFrameSize(Configuration conf) {
+        return conf.getInt(PregelixJob.FRAME_SIZE, -1);
+    }
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index de29dbc..991d969 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -86,377 +86,462 @@
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
 
 public abstract class JobGen implements IJobGen {
-    private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
-    protected static final int MB = 1048576;
-    protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
-    protected static final int frameSize = ClusterConfig.getFrameSize();
-    protected static final int maxFrameSize = (int) (((long) 32 * MB) / frameSize);
-    protected static final int tableSize = 10485767;
-    protected static final String PRIMARY_INDEX = "primary";
-    protected final Configuration conf;
-    protected final PregelixJob giraphJob;
-    protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
-    protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
-    protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+	private static final Logger LOGGER = Logger.getLogger(JobGen.class
+			.getName());
+	protected static final int MB = 1048576;
+	protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
+	protected static final int tableSize = 10485767;
+	protected static final String PRIMARY_INDEX = "primary";
+	protected final Configuration conf;
+	protected final PregelixJob giraphJob;
+	protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
+	protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
+	protected String jobId = new UUID(System.currentTimeMillis(),
+			System.nanoTime()).toString();
+	protected int frameSize = ClusterConfig.getFrameSize();
+	protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
 
-    protected static final String SECONDARY_INDEX_ODD = "secondary1";
-    protected static final String SECONDARY_INDEX_EVEN = "secondary2";
+	protected static final String SECONDARY_INDEX_ODD = "secondary1";
+	protected static final String SECONDARY_INDEX_EVEN = "secondary2";
 
-    public JobGen(PregelixJob job) {
-        this.conf = job.getConfiguration();
-        this.giraphJob = job;
-        this.initJobConfiguration();
-        job.setJobId(jobId);
-    }
+	public JobGen(PregelixJob job) {
+		this.conf = job.getConfiguration();
+		this.giraphJob = job;
+		this.initJobConfiguration();
+		job.setJobId(jobId);
+		
+		//set the frame size to be the one user specified if the user did specify.
+		int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
+		if (specifiedFrameSize > 0) {
+			frameSize = specifiedFrameSize;
+			maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+		}
+		if (maxFrameNumber <= 0) {
+			maxFrameNumber = 1;
+		}
+	}
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    private void initJobConfiguration() {
-        Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS, Vertex.class);
-        List<Type> parameterTypes = ReflectionUtils.getTypeArguments(Vertex.class, vertexClass);
-        Type vertexIndexType = parameterTypes.get(0);
-        Type vertexValueType = parameterTypes.get(1);
-        Type edgeValueType = parameterTypes.get(2);
-        Type messageValueType = parameterTypes.get(3);
-        conf.setClass(PregelixJob.VERTEX_INDEX_CLASS, (Class<?>) vertexIndexType, WritableComparable.class);
-        conf.setClass(PregelixJob.VERTEX_VALUE_CLASS, (Class<?>) vertexValueType, Writable.class);
-        conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType, Writable.class);
-        conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS, (Class<?>) messageValueType, Writable.class);
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	private void initJobConfiguration() {
+		Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS,
+				Vertex.class);
+		List<Type> parameterTypes = ReflectionUtils.getTypeArguments(
+				Vertex.class, vertexClass);
+		Type vertexIndexType = parameterTypes.get(0);
+		Type vertexValueType = parameterTypes.get(1);
+		Type edgeValueType = parameterTypes.get(2);
+		Type messageValueType = parameterTypes.get(3);
+		conf.setClass(PregelixJob.VERTEX_INDEX_CLASS,
+				(Class<?>) vertexIndexType, WritableComparable.class);
+		conf.setClass(PregelixJob.VERTEX_VALUE_CLASS,
+				(Class<?>) vertexValueType, Writable.class);
+		conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType,
+				Writable.class);
+		conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS,
+				(Class<?>) messageValueType, Writable.class);
 
-        Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
-        if (!aggregatorClass.equals(GlobalAggregator.class)) {
-            List<Type> argTypes = ReflectionUtils.getTypeArguments(GlobalAggregator.class, aggregatorClass);
-            Type partialAggregateValueType = argTypes.get(4);
-            conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS, (Class<?>) partialAggregateValueType,
-                    Writable.class);
-            Type finalAggregateValueType = argTypes.get(5);
-            conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, (Class<?>) finalAggregateValueType, Writable.class);
-        }
+		Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
+		if (!aggregatorClass.equals(GlobalAggregator.class)) {
+			List<Type> argTypes = ReflectionUtils.getTypeArguments(
+					GlobalAggregator.class, aggregatorClass);
+			Type partialAggregateValueType = argTypes.get(4);
+			conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS,
+					(Class<?>) partialAggregateValueType, Writable.class);
+			Type finalAggregateValueType = argTypes.get(5);
+			conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS,
+					(Class<?>) finalAggregateValueType, Writable.class);
+		}
 
-        Class combinerClass = BspUtils.getMessageCombinerClass(conf);
-        if (!combinerClass.equals(MessageCombiner.class)) {
-            List<Type> argTypes = ReflectionUtils.getTypeArguments(MessageCombiner.class, combinerClass);
-            Type partialCombineValueType = argTypes.get(2);
-            conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, (Class<?>) partialCombineValueType, Writable.class);
-        }
-    }
+		Class combinerClass = BspUtils.getMessageCombinerClass(conf);
+		if (!combinerClass.equals(MessageCombiner.class)) {
+			List<Type> argTypes = ReflectionUtils.getTypeArguments(
+					MessageCombiner.class, combinerClass);
+			Type partialCombineValueType = argTypes.get(2);
+			conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS,
+					(Class<?>) partialCombineValueType, Writable.class);
+		}
+	}
 
-    public String getJobId() {
-        return jobId;
-    }
+	public String getJobId() {
+		return jobId;
+	}
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Override
-    public JobSpecification generateCreatingJob() throws HyracksException {
-        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
-        JobSpecification spec = new JobSpecification();
-        ITypeTraits[] typeTraits = new ITypeTraits[2];
-        typeTraits[0] = new TypeTraits(false);
-        typeTraits[1] = new TypeTraits(false);
-        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
-                .getClass());
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Override
+	public JobSpecification generateCreatingJob() throws HyracksException {
+		Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
+				.getVertexIndexClass(conf);
+		JobSpecification spec = new JobSpecification();
+		ITypeTraits[] typeTraits = new ITypeTraits[2];
+		typeTraits[0] = new TypeTraits(false);
+		typeTraits[1] = new TypeTraits(false);
+		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+		comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
+				WritableComparator.get(vertexIdClass).getClass());
 
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
-        TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
-                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, btreeCreate);
-        return spec;
-    }
+		IFileSplitProvider fileSplitProvider = ClusterConfig
+				.getFileSplitProvider(jobId, PRIMARY_INDEX);
+		TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(
+				spec, storageManagerInterface, treeRegistryProvider,
+				fileSplitProvider, typeTraits, comparatorFactories,
+				new BTreeDataflowHelperFactory(),
+				NoOpOperationCallbackProvider.INSTANCE);
+		ClusterConfig.setLocationConstraint(spec, btreeCreate);
+		return spec;
+	}
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Override
-    public JobSpecification generateLoadingJob() throws HyracksException {
-        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
-        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-        JobSpecification spec = new JobSpecification();
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Override
+	public JobSpecification generateLoadingJob() throws HyracksException {
+		Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
+				.getVertexIndexClass(conf);
+		Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+		JobSpecification spec = new JobSpecification();
+		IFileSplitProvider fileSplitProvider = ClusterConfig
+				.getFileSplitProvider(jobId, PRIMARY_INDEX);
 
-        /**
-         * the graph file scan operator and use count constraint first, will use
-         * absolute constraint later
-         */
-        VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
-        List<InputSplit> splits = new ArrayList<InputSplit>();
-        try {
-            splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
-            LOGGER.info("number of splits: " + splits.size());
-            for (InputSplit split : splits)
-                LOGGER.info(split.toString());
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
-                vertexIdClass.getName(), vertexClass.getName());
-        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-        VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
-                confFactory);
-        ClusterConfig.setLocationConstraint(spec, scanner, splits);
+		/**
+		 * the graph file scan operator and use count constraint first, will use
+		 * absolute constraint later
+		 */
+		VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+		List<InputSplit> splits = new ArrayList<InputSplit>();
+		try {
+			splits = inputFormat.getSplits(giraphJob,
+					fileSplitProvider.getFileSplits().length);
+			LOGGER.info("number of splits: " + splits.size());
+			for (InputSplit split : splits)
+				LOGGER.info(split.toString());
+		} catch (Exception e) {
+			throw new HyracksDataException(e);
+		}
+		RecordDescriptor recordDescriptor = DataflowUtils
+				.getRecordDescriptorFromKeyValueClasses(
+						vertexIdClass.getName(), vertexClass.getName());
+		IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+		VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(
+				spec, recordDescriptor, splits, confFactory);
+		ClusterConfig.setLocationConstraint(spec, scanner, splits);
 
-        /**
-         * construct sort operator
-         */
-        int[] sortFields = new int[1];
-        sortFields[0] = 0;
-        INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
-                .getAscINormalizedKeyComputerFactory(vertexIdClass);
-        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
-                .getClass());
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameSize, sortFields,
-                nkmFactory, comparatorFactories, recordDescriptor);
-        ClusterConfig.setLocationConstraint(spec, sorter);
+		/**
+		 * construct sort operator
+		 */
+		int[] sortFields = new int[1];
+		sortFields[0] = 0;
+		INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+				.getAscINormalizedKeyComputerFactory(vertexIdClass);
+		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+		comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
+				WritableComparator.get(vertexIdClass).getClass());
+		ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+				spec, maxFrameNumber, sortFields, nkmFactory,
+				comparatorFactories, recordDescriptor);
+		ClusterConfig.setLocationConstraint(spec, sorter);
 
-        /**
-         * construct tree bulk load operator
-         */
-        int[] fieldPermutation = new int[2];
-        fieldPermutation[0] = 0;
-        fieldPermutation[1] = 1;
-        ITypeTraits[] typeTraits = new ITypeTraits[2];
-        typeTraits[0] = new TypeTraits(false);
-        typeTraits[1] = new TypeTraits(false);
-        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
-                NoOpOperationCallbackProvider.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+		/**
+		 * construct tree bulk load operator
+		 */
+		int[] fieldPermutation = new int[2];
+		fieldPermutation[0] = 0;
+		fieldPermutation[1] = 1;
+		ITypeTraits[] typeTraits = new ITypeTraits[2];
+		typeTraits[0] = new TypeTraits(false);
+		typeTraits[1] = new TypeTraits(false);
+		TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(
+				spec, storageManagerInterface, treeRegistryProvider,
+				fileSplitProvider, typeTraits, comparatorFactories,
+				fieldPermutation, DEFAULT_BTREE_FILL_FACTOR,
+				new BTreeDataflowHelperFactory(),
+				NoOpOperationCallbackProvider.INSTANCE);
+		ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
 
-        /**
-         * connect operator descriptors
-         */
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
-                DatatypeHelper.createSerializerDeserializer(vertexIdClass));
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
-        return spec;
-    }
+		/**
+		 * connect operator descriptors
+		 */
+		ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+				DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+		spec.connect(new MToNPartitioningConnectorDescriptor(spec,
+				hashPartitionComputerFactory), scanner, 0, sorter, 0);
+		spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0,
+				btreeBulkLoad, 0);
+		return spec;
+	}
 
-    @Override
-    public JobSpecification generateJob(int iteration) throws HyracksException {
-        if (iteration <= 0)
-            throw new IllegalStateException("iteration number cannot be less than 1");
-        if (iteration == 1)
-            return generateFirstIteration(iteration);
-        else
-            return generateNonFirstIteration(iteration);
-    }
+	@Override
+	public JobSpecification generateJob(int iteration) throws HyracksException {
+		if (iteration <= 0)
+			throw new IllegalStateException(
+					"iteration number cannot be less than 1");
+		if (iteration == 1)
+			return generateFirstIteration(iteration);
+		else
+			return generateNonFirstIteration(iteration);
+	}
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public JobSpecification scanSortPrintGraph(String nodeName, String path) throws HyracksException {
-        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
-        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-        int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
-        JobSpecification spec = new JobSpecification();
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public JobSpecification scanSortPrintGraph(String nodeName, String path)
+			throws HyracksException {
+		Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
+				.getVertexIndexClass(conf);
+		Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+		int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
+		JobSpecification spec = new JobSpecification();
+		IFileSplitProvider fileSplitProvider = ClusterConfig
+				.getFileSplitProvider(jobId, PRIMARY_INDEX);
 
-        /**
-         * the graph file scan operator and use count constraint first, will use
-         * absolute constraint later
-         */
-        VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
-        List<InputSplit> splits = new ArrayList<InputSplit>();
-        try {
-            splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
-                vertexIdClass.getName(), vertexClass.getName());
-        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-        VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
-                confFactory);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, scanner, splits.size());
+		/**
+		 * the graph file scan operator and use count constraint first, will use
+		 * absolute constraint later
+		 */
+		VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+		List<InputSplit> splits = new ArrayList<InputSplit>();
+		try {
+			splits = inputFormat.getSplits(giraphJob,
+					fileSplitProvider.getFileSplits().length);
+		} catch (Exception e) {
+			throw new HyracksDataException(e);
+		}
+		RecordDescriptor recordDescriptor = DataflowUtils
+				.getRecordDescriptorFromKeyValueClasses(
+						vertexIdClass.getName(), vertexClass.getName());
+		IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+		VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(
+				spec, recordDescriptor, splits, confFactory);
+		PartitionConstraintHelper.addPartitionCountConstraint(spec, scanner,
+				splits.size());
 
-        /**
-         * construct sort operator
-         */
-        int[] sortFields = new int[1];
-        sortFields[0] = 0;
-        INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
-                .getAscINormalizedKeyComputerFactory(vertexIdClass);
-        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
-                .getClass());
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
-                nkmFactory, comparatorFactories, recordDescriptor);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter, splits.size());
+		/**
+		 * construct sort operator
+		 */
+		int[] sortFields = new int[1];
+		sortFields[0] = 0;
+		INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+				.getAscINormalizedKeyComputerFactory(vertexIdClass);
+		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+		comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
+				WritableComparator.get(vertexIdClass).getClass());
+		ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+				spec, maxFrameLimit, sortFields, nkmFactory,
+				comparatorFactories, recordDescriptor);
+		PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter,
+				splits.size());
 
-        /**
-         * construct write file operator
-         */
-        FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
-        FileSplit[] results = new FileSplit[1];
-        results[0] = resultFile;
-        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
-        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
-        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), vertexClass.getName());
-        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, inputRdFactory,
-                resultFileSplitProvider, preHookFactory, null);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+		/**
+		 * construct write file operator
+		 */
+		FileSplit resultFile = new FileSplit(nodeName, new FileReference(
+				new File(path)));
+		FileSplit[] results = new FileSplit[1];
+		results[0] = resultFile;
+		IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(
+				results);
+		IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+		IRecordDescriptorFactory inputRdFactory = DataflowUtils
+				.getWritableRecordDescriptorFactoryFromWritableClasses(
+						vertexIdClass.getName(), vertexClass.getName());
+		FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(
+				spec, inputRdFactory, resultFileSplitProvider, preHookFactory,
+				null);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer,
+				new String[] { "nc1" });
+		PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
 
-        /**
-         * connect operator descriptors
-         */
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
-                DatatypeHelper.createSerializerDeserializer(vertexIdClass));
-        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
-        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
-                comparatorFactories), sorter, 0, writer, 0);
-        return spec;
-    }
+		/**
+		 * connect operator descriptors
+		 */
+		ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+				DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+		spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter,
+				0);
+		spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec,
+				hashPartitionComputerFactory, sortFields, comparatorFactories),
+				sorter, 0, writer, 0);
+		return spec;
+	}
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public JobSpecification scanIndexPrintGraph(String nodeName, String path) throws HyracksException {
-        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
-        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-        JobSpecification spec = new JobSpecification();
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public JobSpecification scanIndexPrintGraph(String nodeName, String path)
+			throws HyracksException {
+		Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
+				.getVertexIndexClass(conf);
+		Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+		JobSpecification spec = new JobSpecification();
 
-        /**
-         * construct empty tuple operator
-         */
-        ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
-        DataOutput dos = tb.getDataOutput();
-        tb.reset();
-        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
-        tb.addFieldEndOffset();
-        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE };
-        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-        ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
-                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+		/**
+		 * construct empty tuple operator
+		 */
+		ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+		DataOutput dos = tb.getDataOutput();
+		tb.reset();
+		UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+		tb.addFieldEndOffset();
+		ISerializerDeserializer[] keyRecDescSers = {
+				UTF8StringSerializerDeserializer.INSTANCE,
+				UTF8StringSerializerDeserializer.INSTANCE };
+		RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+		ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(
+				spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
+				tb.getSize());
+		ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
 
-        /**
-         * construct btree search operator
-         */
-        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
-                vertexIdClass.getName(), vertexClass.getName());
-        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
-                .getClass());
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
-        ITypeTraits[] typeTraits = new ITypeTraits[2];
-        typeTraits[0] = new TypeTraits(false);
-        typeTraits[1] = new TypeTraits(false);
-        BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
-                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+		/**
+		 * construct btree search operator
+		 */
+		IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+		RecordDescriptor recordDescriptor = DataflowUtils
+				.getRecordDescriptorFromKeyValueClasses(
+						vertexIdClass.getName(), vertexClass.getName());
+		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+		comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
+				WritableComparator.get(vertexIdClass).getClass());
+		IFileSplitProvider fileSplitProvider = ClusterConfig
+				.getFileSplitProvider(jobId, PRIMARY_INDEX);
+		ITypeTraits[] typeTraits = new ITypeTraits[2];
+		typeTraits[0] = new TypeTraits(false);
+		typeTraits[1] = new TypeTraits(false);
+		BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(
+				spec, recordDescriptor, storageManagerInterface,
+				treeRegistryProvider, fileSplitProvider, typeTraits,
+				comparatorFactories, null, null, true, true,
+				new BTreeDataflowHelperFactory(), false,
+				NoOpOperationCallbackProvider.INSTANCE);
+		ClusterConfig.setLocationConstraint(spec, scanner);
 
-        /**
-         * construct write file operator
-         */
-        FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
-        FileSplit[] results = new FileSplit[1];
-        results[0] = resultFile;
-        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
-        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
-        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), vertexClass.getName());
-        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, inputRdFactory,
-                resultFileSplitProvider, preHookFactory, null);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+		/**
+		 * construct write file operator
+		 */
+		FileSplit resultFile = new FileSplit(nodeName, new FileReference(
+				new File(path)));
+		FileSplit[] results = new FileSplit[1];
+		results[0] = resultFile;
+		IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(
+				results);
+		IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+		IRecordDescriptorFactory inputRdFactory = DataflowUtils
+				.getWritableRecordDescriptorFactoryFromWritableClasses(
+						vertexIdClass.getName(), vertexClass.getName());
+		FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(
+				spec, inputRdFactory, resultFileSplitProvider, preHookFactory,
+				null);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer,
+				new String[] { "nc1" });
+		PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
 
-        /**
-         * connect operator descriptors
-         */
-        int[] sortFields = new int[1];
-        sortFields[0] = 0;
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
-                DatatypeHelper.createSerializerDeserializer(vertexIdClass));
-        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
-        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
-                comparatorFactories), scanner, 0, writer, 0);
-        spec.setFrameSize(frameSize);
-        return spec;
-    }
+		/**
+		 * connect operator descriptors
+		 */
+		int[] sortFields = new int[1];
+		sortFields[0] = 0;
+		ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+				DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+		spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource,
+				0, scanner, 0);
+		spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec,
+				hashPartitionComputerFactory, sortFields, comparatorFactories),
+				scanner, 0, writer, 0);
+		spec.setFrameSize(frameSize);
+		return spec;
+	}
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public JobSpecification scanIndexWriteGraph() throws HyracksException {
-        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
-        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
-        JobSpecification spec = new JobSpecification();
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public JobSpecification scanIndexWriteGraph() throws HyracksException {
+		Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
+				.getVertexIndexClass(conf);
+		Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+		JobSpecification spec = new JobSpecification();
 
-        /**
-         * construct empty tuple operator
-         */
-        ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
-        DataOutput dos = tb.getDataOutput();
-        tb.reset();
-        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
-        tb.addFieldEndOffset();
-        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE };
-        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-        ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
-                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+		/**
+		 * construct empty tuple operator
+		 */
+		ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+		DataOutput dos = tb.getDataOutput();
+		tb.reset();
+		UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+		tb.addFieldEndOffset();
+		ISerializerDeserializer[] keyRecDescSers = {
+				UTF8StringSerializerDeserializer.INSTANCE,
+				UTF8StringSerializerDeserializer.INSTANCE };
+		RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+		ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(
+				spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
+				tb.getSize());
+		ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
 
-        /**
-         * construct btree search operator
-         */
-        IConfigurationFactory confFactory = new ConfigurationFactory(conf);
-        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
-                vertexIdClass.getName(), vertexClass.getName());
-        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
-                .getClass());
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+		/**
+		 * construct btree search operator
+		 */
+		IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+		RecordDescriptor recordDescriptor = DataflowUtils
+				.getRecordDescriptorFromKeyValueClasses(
+						vertexIdClass.getName(), vertexClass.getName());
+		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+		comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
+				WritableComparator.get(vertexIdClass).getClass());
+		IFileSplitProvider fileSplitProvider = ClusterConfig
+				.getFileSplitProvider(jobId, PRIMARY_INDEX);
 
-        ITypeTraits[] typeTraits = new ITypeTraits[2];
-        typeTraits[0] = new TypeTraits(false);
-        typeTraits[1] = new TypeTraits(false);
-        BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
-                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+		ITypeTraits[] typeTraits = new ITypeTraits[2];
+		typeTraits[0] = new TypeTraits(false);
+		typeTraits[1] = new TypeTraits(false);
+		BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(
+				spec, recordDescriptor, storageManagerInterface,
+				treeRegistryProvider, fileSplitProvider, typeTraits,
+				comparatorFactories, null, null, true, true,
+				new BTreeDataflowHelperFactory(), false,
+				NoOpOperationCallbackProvider.INSTANCE);
+		ClusterConfig.setLocationConstraint(spec, scanner);
 
-        /**
-         * construct write file operator
-         */
-        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
-                vertexIdClass.getName(), vertexClass.getName());
-        HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, confFactory, inputRdFactory);
-        ClusterConfig.setLocationConstraint(spec, writer);
+		/**
+		 * construct write file operator
+		 */
+		IRecordDescriptorFactory inputRdFactory = DataflowUtils
+				.getWritableRecordDescriptorFactoryFromWritableClasses(
+						vertexIdClass.getName(), vertexClass.getName());
+		HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(
+				spec, confFactory, inputRdFactory);
+		ClusterConfig.setLocationConstraint(spec, writer);
 
-        /**
-         * connect operator descriptors
-         */
-        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
-        return spec;
-    }
+		/**
+		 * connect operator descriptors
+		 */
+		spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource,
+				0, scanner, 0);
+		spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer,
+				0);
+		return spec;
+	}
 
-    /***
-     * drop the sindex
-     * 
-     * @return JobSpecification
-     * @throws HyracksException
-     */
-    protected JobSpecification dropIndex(String indexName) throws HyracksException {
-        JobSpecification spec = new JobSpecification();
+	/***
+	 * drop the sindex
+	 * 
+	 * @return JobSpecification
+	 * @throws HyracksException
+	 */
+	protected JobSpecification dropIndex(String indexName)
+			throws HyracksException {
+		JobSpecification spec = new JobSpecification();
 
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
-        TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(spec, storageManagerInterface,
-                treeRegistryProvider, fileSplitProvider);
+		IFileSplitProvider fileSplitProvider = ClusterConfig
+				.getFileSplitProvider(jobId, indexName);
+		TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(
+				spec, storageManagerInterface, treeRegistryProvider,
+				fileSplitProvider);
 
-        ClusterConfig.setLocationConstraint(spec, drop);
-        spec.addRoot(drop);
-        return spec;
-    }
+		ClusterConfig.setLocationConstraint(spec, drop);
+		spec.addRoot(drop);
+		return spec;
+	}
 
-    /** generate non-first iteration job */
-    protected abstract JobSpecification generateNonFirstIteration(int iteration) throws HyracksException;
+	/** generate non-first iteration job */
+	protected abstract JobSpecification generateNonFirstIteration(int iteration)
+			throws HyracksException;
 
-    /** generate first iteration job */
-    protected abstract JobSpecification generateFirstIteration(int iteration) throws HyracksException;
+	/** generate first iteration job */
+	protected abstract JobSpecification generateFirstIteration(int iteration)
+			throws HyracksException;
 
-    /** generate clean-up job */
-    public abstract JobSpecification[] generateCleanup() throws HyracksException;
+	/** generate clean-up job */
+	public abstract JobSpecification[] generateCleanup()
+			throws HyracksException;
 
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 00cdf07..04a6e42 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -173,7 +173,7 @@
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
-        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
@@ -342,7 +342,7 @@
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
-        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 3847aa7..35f5e61 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -139,7 +139,7 @@
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
-        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
@@ -299,7 +299,7 @@
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
-        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index ec783a7..328d7d1 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -140,7 +140,7 @@
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
-        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, globalSort);
 
@@ -290,7 +290,7 @@
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
-        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, globalSort);
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index bb939e3..812e697 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -136,7 +136,7 @@
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
-        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
@@ -152,7 +152,7 @@
         /**
          * construct global sort operator
          */
-        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, globalSort);
 
@@ -303,7 +303,7 @@
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
-        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
@@ -319,7 +319,7 @@
         /**
          * construct global sort operator
          */
-        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, globalSort);