support heterogenous cluster
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
index a527f87..c39cba7 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
@@ -149,7 +149,7 @@
         for (String counterName : RESET_COUNTERS) {
             updateCounter(slave, jo, counterName);
         }
-        
+
         for (String counterName : AGG_COUNTERS) {
             updateCounter(slave, jo, counterName);
         }
@@ -168,7 +168,9 @@
 
     private long extractCounterValue(Object counterObject) {
         long counterValue = 0;
-        if (counterObject instanceof JSONArray) {
+        if (counterObject == null) {
+            return counterValue;
+        } else if (counterObject instanceof JSONArray) {
             JSONArray jArray = (JSONArray) counterObject;
             Object[] values = jArray.toArray();
             for (Object value : values) {
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 2fcaf91..bef9aa9 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -725,15 +725,16 @@
     public static int getRecoveryCount(Configuration conf) {
         return conf.getInt(PregelixJob.RECOVERY_COUNT, 0);
     }
-    
+
     /***
      * Get enable dynamic optimization
      * 
-     * @param conf Configuration
+     * @param conf
+     *            Configuration
      * @return true if enabled; otherwise false
      */
-    public static boolean getEnableDynamicOptimization(Configuration conf){
-        return conf.getBoolean(PregelixJob.DYNAMIC_OPTIMIZATION, true);
+    public static boolean getEnableDynamicOptimization(Configuration conf) {
+        return conf.getBoolean(PregelixJob.DYNAMIC_OPTIMIZATION, false);
     }
 
     /***
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultIterationCompleteReporterHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultIterationCompleteReporterHook.java
index 1881429..7b004d8 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultIterationCompleteReporterHook.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultIterationCompleteReporterHook.java
@@ -14,6 +14,9 @@
  */
 package edu.uci.ics.pregelix.api.util;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 
@@ -24,9 +27,11 @@
  */
 public class DefaultIterationCompleteReporterHook implements IIterationCompleteReporterHook {
 
+    private static final Log LOG = LogFactory.getLog(DefaultIterationCompleteReporterHook.class);
+
     @Override
     public void completeIteration(int superstep, PregelixJob job) {
-        System.out.println("iteration complete reporter for " + superstep + " job " + job);
+        LOG.debug("iteration complete reporter for " + superstep + " job " + job.getJobName());
     }
 
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index fdb4136..7bd5fb0 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -55,6 +55,7 @@
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
 import edu.uci.ics.pregelix.core.optimizer.DynamicOptimizer;
 import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
+import edu.uci.ics.pregelix.core.optimizer.NoOpOptimizer;
 import edu.uci.ics.pregelix.core.util.ExceptionUtilities;
 import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
 
@@ -111,8 +112,11 @@
             boolean failed = false;
             int retryCount = 0;
             int maxRetryCount = 3;
-            jobGen = selectJobGen(planChoice, currentJob);
-            IOptimizer dynamicOptimzier = new DynamicOptimizer();
+
+            IOptimizer dynamicOptimizer = BspUtils.getEnableDynamicOptimization(currentJob.getConfiguration()) == false ? new NoOpOptimizer()
+                    : new DynamicOptimizer(counterContext);
+            jobGen = selectJobGen(planChoice, currentJob, dynamicOptimizer);
+            jobGen = dynamicOptimizer.optimize(jobGen, 0);
 
             do {
                 try {
@@ -140,9 +144,7 @@
                         }
 
                         /** run loop-body jobs with dynamic optimizer if it is enabled */
-                        if (BspUtils.getEnableDynamicOptimization(currentJob.getConfiguration())) {
-                            jobGen = dynamicOptimzier.optimize(counterContext, jobGen, i);
-                        }
+                        jobGen = dynamicOptimizer.optimize(jobGen, i);
                         runLoopBody(deploymentId, currentJob, jobGen, i, lastSnapshotJobIndex, lastSnapshotSuperstep,
                                 ckpHook, failed);
                         runClearState(deploymentId, jobGen);
@@ -197,8 +199,8 @@
                         .equals(currentInputPaths[0])));
     }
 
-    private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob) {
-        return JobGenFactory.createJobGen(planChoice, currentJob);
+    private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob, IOptimizer optimizer) {
+        return JobGenFactory.createJobGen(planChoice, currentJob, optimizer);
     }
 
     private long loadData(PregelixJob currentJob, JobGen jobGen, DeploymentId deploymentId) throws IOException,
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index c1f6aae..163e476 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -47,6 +47,7 @@
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -100,6 +101,7 @@
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ClearStateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -140,6 +142,7 @@
     protected String jobId = UUID.randomUUID().toString();;
     protected int frameSize = ClusterConfig.getFrameSize();
     protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+    protected IOptimizer optimizer;
 
     private static final Map<String, String> MERGE_POLICY_PROPERTIES;
     static {
@@ -150,18 +153,19 @@
     protected static final String SECONDARY_INDEX_ODD = "secondary1";
     protected static final String SECONDARY_INDEX_EVEN = "secondary2";
 
-    public JobGen(PregelixJob job) {
-        init(job);
-    }
-    
-    public JobGen(PregelixJob job, String jobId) {
-        if(jobId!=null){
-            this.jobId = jobId;
-        }
-        init(job);
+    public JobGen(PregelixJob job, IOptimizer optimizer) {
+        init(job, optimizer);
     }
 
-    private void init(PregelixJob job) {
+    public JobGen(PregelixJob job, String jobId, IOptimizer optimizer) {
+        if (jobId != null) {
+            this.jobId = jobId;
+        }
+        init(job, optimizer);
+    }
+
+    private void init(PregelixJob job, IOptimizer optimizer) {
+        this.optimizer = optimizer;
         conf = job.getConfiguration();
         pregelixJob = job;
         initJobConfiguration();
@@ -178,7 +182,7 @@
     }
 
     public void reset(PregelixJob job) {
-        init(job);
+        init(job, this.optimizer);
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -232,12 +236,12 @@
 
         int[] keyFields = new int[1];
         keyFields[0] = 0;
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
         TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
                 storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
                 keyFields, getIndexDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
                 NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, btreeCreate);
+        setLocationConstraint(spec, btreeCreate);
         spec.setFrameSize(frameSize);
         return spec;
     }
@@ -258,7 +262,7 @@
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
         JobSpecification spec = new JobSpecification();
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         /**
          * the graph file scan operator and use count constraint first, will use
@@ -277,7 +281,7 @@
         String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
         VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
                 readSchedule, confFactory);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * construct sort operator
@@ -289,7 +293,7 @@
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
                 nkmFactory, comparatorFactories, recordDescriptor);
-        ClusterConfig.setLocationConstraint(spec, sorter);
+        setLocationConstraint(spec, sorter);
 
         /**
          * construct write file operator
@@ -336,7 +340,7 @@
         RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
         ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
                 keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct btree search operator
@@ -346,14 +350,14 @@
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
         BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
                 storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
                 null, null, true, true, getIndexDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * construct write file operator
@@ -463,7 +467,7 @@
     public JobSpecification generateClearState() throws HyracksException {
         JobSpecification spec = new JobSpecification();
         ClearStateOperatorDescriptor clearState = new ClearStateOperatorDescriptor(spec, jobId);
-        ClusterConfig.setLocationConstraint(spec, clearState);
+        setLocationConstraint(spec, clearState);
         spec.addRoot(clearState);
         return spec;
     }
@@ -477,11 +481,11 @@
     protected JobSpecification dropIndex(String indexName) throws HyracksException {
         JobSpecification spec = new JobSpecification();
 
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, indexName);
         IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
                 lcManagerProvider, fileSplitProvider, getIndexDataflowHelperFactory());
 
-        ClusterConfig.setLocationConstraint(spec, drop);
+        setLocationConstraint(spec, drop);
         spec.addRoot(drop);
         spec.setFrameSize(frameSize);
         return spec;
@@ -515,7 +519,7 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         JobSpecification spec = new JobSpecification();
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         /**
          * the graph file scan operator and use count constraint first, will use
@@ -537,7 +541,7 @@
         String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
         VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
                 readSchedule, confFactory);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * construct sort operator
@@ -549,7 +553,7 @@
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
                 nkmFactory, comparatorFactories, recordDescriptor);
-        ClusterConfig.setLocationConstraint(spec, sorter);
+        setLocationConstraint(spec, sorter);
 
         /**
          * construct tree bulk load operator
@@ -564,7 +568,7 @@
                 storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
                 sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, true, 0, false,
                 getIndexDataflowHelperFactory(), NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+        setLocationConstraint(spec, btreeBulkLoad);
 
         /**
          * connect operator descriptors
@@ -596,7 +600,7 @@
         RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
         ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
                 keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct btree search operator
@@ -606,7 +610,7 @@
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
@@ -615,7 +619,7 @@
         BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
                 storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
                 null, null, true, true, getIndexDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         ExternalSortOperatorDescriptor sort = null;
         if (!ckpointing) {
@@ -625,7 +629,7 @@
             sortCmpFactories[0] = JobGenUtil.getFinalBinaryComparatorFactory(vertexIdClass);
             sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields, nkmFactory, sortCmpFactories,
                     recordDescriptor);
-            ClusterConfig.setLocationConstraint(spec, scanner);
+            setLocationConstraint(spec, scanner);
         }
 
         /**
@@ -636,7 +640,7 @@
                 conf, vertexIdClass.getName(), vertexClass.getName());
         VertexFileWriteOperatorDescriptor writer = new VertexFileWriteOperatorDescriptor(spec, confFactory,
                 inputRdFactory, preHookFactory);
-        ClusterConfig.setLocationConstraint(spec, writer);
+        setLocationConstraint(spec, writer);
 
         /**
          * connect operator descriptors
@@ -681,14 +685,14 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
                 false, jobId, lastSuccessfulIteration + 1);
-        ClusterConfig.setLocationConstraint(spec, materializeRead);
+        setLocationConstraint(spec, materializeRead);
 
         String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration);;
         PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
@@ -700,7 +704,7 @@
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
                 conf, vertexIdClass.getName(), MsgList.class.getName());
         HDFSFileWriteOperatorDescriptor hdfsWriter = new HDFSFileWriteOperatorDescriptor(spec, tmpJob, inputRdFactory);
-        ClusterConfig.setLocationConstraint(spec, hdfsWriter);
+        setLocationConstraint(spec, hdfsWriter);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, materializeRead, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, hdfsWriter, 0);
@@ -743,7 +747,7 @@
         String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
         HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
                 readSchedule, new KeyValueParserFactory());
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /** construct the sort operator to sort message states */
         int[] keyFields = new int[] { 0 };
@@ -752,24 +756,24 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastCheckpointedIteration, vertexIdClass);
         ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, sort);
+        setLocationConstraint(spec, sort);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec,
                 recordDescriptor, jobId, lastCheckpointedIteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new RecoveryRuntimeHookFactory(jobId, lastCheckpointedIteration, new ConfigurationFactory(
                         pregelixJob.getConfiguration())));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink);
+        setLocationConstraint(spec, emptySink);
 
         /**
          * connect operator descriptors
@@ -807,7 +811,7 @@
          */
         List<JobSpecification> list = new ArrayList<JobSpecification>();
         list.add(bulkLoadLiveVertexBTree(iteration));
-        JobGen jobGen = new JobGenInnerJoin(pregelixJob, jobId);
+        JobGen jobGen = new JobGenInnerJoin(pregelixJob, jobId, optimizer);
         return Pair.of(list, jobGen);
     }
 
@@ -827,12 +831,12 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct btree search and function call update operator
          */
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
         RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
@@ -852,12 +856,12 @@
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 getIndexDataflowHelperFactory(), inputRdFactory, 1, new ExtractLiveVertexIdFunctionFactory(),
                 preHookFactory, null, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * construct bulk-load index operator
          */
-        IFileSplitProvider secondaryFileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
+        IFileSplitProvider secondaryFileSplitProvider = getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
         int[] fieldPermutation = new int[] { 0, 1 };
         int[] keyFields = new int[] { 0 };
         IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
@@ -866,7 +870,7 @@
         TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
                 storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
                 fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
-        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+        setLocationConstraint(spec, btreeBulkLoad);
 
         /** connect job spec */
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
@@ -876,4 +880,25 @@
         return spec;
     }
 
+    /**
+     * set the location constraint for operators
+     * 
+     * @param spec
+     * @param operator
+     */
+    public void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator) {
+        optimizer.setOptimizedLocationConstraints(spec, operator);
+    }
+    
+    /**
+     * get the file split provider
+     * 
+     * @param jobId
+     * @param indexName
+     * @return the IFileSplitProvider instance
+     */
+    public IFileSplitProvider getFileSplitProvider(String jobId, String indexName){
+        return optimizer.getOptimizedFileSplitProvider(jobId, indexName);
+    }
+
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
index ed580de..cbc9c81 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
@@ -17,26 +17,27 @@
 
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.core.base.IDriver.Plan;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
 
 public class JobGenFactory {
 
-    public static JobGen createJobGen(Plan planChoice, PregelixJob currentJob) {
+    public static JobGen createJobGen(Plan planChoice, PregelixJob currentJob, IOptimizer optimizer) {
         JobGen jobGen = null;
         switch (planChoice) {
             case INNER_JOIN:
-                jobGen = new JobGenInnerJoin(currentJob);
+                jobGen = new JobGenInnerJoin(currentJob, optimizer);
                 break;
             case OUTER_JOIN:
-                jobGen = new JobGenOuterJoin(currentJob);
+                jobGen = new JobGenOuterJoin(currentJob, optimizer);
                 break;
             case OUTER_JOIN_SORT:
-                jobGen = new JobGenOuterJoinSort(currentJob);
+                jobGen = new JobGenOuterJoinSort(currentJob, optimizer);
                 break;
             case OUTER_JOIN_SINGLE_SORT:
-                jobGen = new JobGenOuterJoinSingleSort(currentJob);
+                jobGen = new JobGenOuterJoinSingleSort(currentJob, optimizer);
                 break;
             default:
-                jobGen = new JobGenInnerJoin(currentJob);
+                jobGen = new JobGenInnerJoin(currentJob, optimizer);
         }
         return jobGen;
     }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 7bdb069..f838fb8 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -63,6 +63,7 @@
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -93,12 +94,12 @@
 public class JobGenInnerJoin extends JobGen {
     private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
 
-    public JobGenInnerJoin(PregelixJob job) {
-        super(job);
+    public JobGenInnerJoin(PregelixJob job, IOptimizer optimizer) {
+        super(job, optimizer);
     }
     
-    public JobGenInnerJoin(PregelixJob job, String jobId) {
-        super(job, jobId);
+    public JobGenInnerJoin(PregelixJob job, String jobId, IOptimizer optimizer) {
+        super(job, jobId, optimizer);
     }
 
     protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
@@ -113,18 +114,18 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+        setLocationConstraint(spec, preSuperStep);
 
         /**
          * construct drop index operator
          */
-        IFileSplitProvider secondaryFileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider secondaryFileSplitProvider = getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         /**
          * construct btree search and function call update operator
@@ -159,7 +160,7 @@
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 getIndexDataflowHelperFactory(), inputRdFactory, 6, new StartComputeUpdateFunctionFactory(confFactory),
                 preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * termination state write operator
@@ -188,7 +189,7 @@
         TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
                 storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
                 fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
-        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+        setLocationConstraint(spec, btreeBulkLoad);
 
         /**
          * construct local sort operator
@@ -199,7 +200,7 @@
                 .getClass());
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, localSort);
+        setLocationConstraint(spec, localSort);
 
         /**
          * construct local pre-clustered group-by operator
@@ -208,7 +209,7 @@
                 false, false);
         ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
-        ClusterConfig.setLocationConstraint(spec, localGby);
+        setLocationConstraint(spec, localGby);
 
         /**
          * construct global group-by operator
@@ -217,25 +218,25 @@
                 conf, true, true);
         ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, globalGby);
+        setLocationConstraint(spec, globalGby);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
                 jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         /**
          * do pre- & post- super step
          */
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PostSuperStepRuntimeHookFactory(jobId));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink);
+        setLocationConstraint(spec, emptySink);
 
         /**
          * add the insert operator to insert vertexes
@@ -244,7 +245,7 @@
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, insertOp);
+        setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
@@ -254,15 +255,15 @@
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
                 getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink3);
+        setLocationConstraint(spec, emptySink3);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink4);
+        setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
@@ -330,7 +331,7 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct pre-superstep
@@ -338,20 +339,20 @@
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
         RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+        setLocationConstraint(spec, preSuperStep);
 
         /**
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
                 true, jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materializeRead);
+        setLocationConstraint(spec, materializeRead);
 
         /**
          * construct the index-set-union operator
          */
         String readFile = iteration % 2 == 0 ? SECONDARY_INDEX_ODD : SECONDARY_INDEX_EVEN;
-        IFileSplitProvider secondaryFileSplitProviderRead = ClusterConfig.getFileSplitProvider(jobId, readFile);
+        IFileSplitProvider secondaryFileSplitProviderRead = getFileSplitProvider(jobId, readFile);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
@@ -359,12 +360,12 @@
         IndexNestedLoopJoinOperatorDescriptor setUnion = new IndexNestedLoopJoinOperatorDescriptor(spec, rdFinal,
                 storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, typeTraits,
                 comparatorFactories, true, keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true);
-        ClusterConfig.setLocationConstraint(spec, setUnion);
+        setLocationConstraint(spec, setUnion);
 
         /**
          * construct index-join-function-update operator
          */
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
                 VLongWritable.class.getName());
         RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
@@ -379,7 +380,7 @@
                 JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
                 getIndexDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
                 preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, join);
+        setLocationConstraint(spec, join);
 
         /**
          * construct bulk-load index operator
@@ -389,12 +390,12 @@
         indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
                 WritableComparator.get(vertexIdClass).getClass());
         String writeFile = iteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
-        IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
+        IFileSplitProvider secondaryFileSplitProviderWrite = getFileSplitProvider(jobId, writeFile);
         TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
                 storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
                 indexCmpFactories, fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR,
                 getIndexDataflowHelperFactory());
-        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+        setLocationConstraint(spec, btreeBulkLoad);
 
         /**
          * construct local sort operator
@@ -405,7 +406,7 @@
                 .getClass());
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, localSort);
+        setLocationConstraint(spec, localSort);
 
         /**
          * construct local pre-clustered group-by operator
@@ -414,7 +415,7 @@
                 false, false);
         ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
-        ClusterConfig.setLocationConstraint(spec, localGby);
+        setLocationConstraint(spec, localGby);
 
         /**
          * construct global group-by operator
@@ -423,23 +424,23 @@
                 conf, true, true);
         ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, globalGby);
+        setLocationConstraint(spec, globalGby);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
                 jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PostSuperStepRuntimeHookFactory(jobId));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink);
+        setLocationConstraint(spec, emptySink);
 
         /**
          * termination state write operator
@@ -464,7 +465,7 @@
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, insertOp);
+        setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
@@ -474,15 +475,15 @@
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
                 getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink3);
+        setLocationConstraint(spec, emptySink3);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink4);
+        setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
@@ -596,7 +597,7 @@
         String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
         HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
                 readSchedule, new KeyValueParserFactory());
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /** construct the sort operator to sort message states */
         int[] keyFields = new int[] { 0 };
@@ -605,7 +606,7 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration, vertexIdClass);
         ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, sort);
+        setLocationConstraint(spec, sort);
 
         /**
          * construct bulk-load index operator
@@ -618,12 +619,12 @@
         indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration + 1, WritableComparator
                 .get(vertexIdClass).getClass());
         String writeFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
-        IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
+        IFileSplitProvider secondaryFileSplitProviderWrite = getFileSplitProvider(jobId, writeFile);
         TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
                 storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
                 indexCmpFactories, fieldPermutation, new int[] { 0 }, DEFAULT_BTREE_FILL_FACTOR,
                 getIndexDataflowHelperFactory());
-        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+        setLocationConstraint(spec, btreeBulkLoad);
 
         /**
          * connect operator descriptors
@@ -648,7 +649,7 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
         Class<? extends Writable> msgListClass = MsgList.class;
         String readFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
-        IFileSplitProvider secondaryFileSplitProviderRead = ClusterConfig.getFileSplitProvider(jobId, readFile);
+        IFileSplitProvider secondaryFileSplitProviderRead = getFileSplitProvider(jobId, readFile);
         JobSpecification spec = new JobSpecification();
         /**
          * construct empty tuple operator
@@ -663,7 +664,7 @@
         RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
         ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
                 keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct btree search operator
@@ -681,7 +682,7 @@
                 storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, typeTraits,
                 comparatorFactories, null, null, null, true, true, getIndexDataflowHelperFactory(), false,
                 NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * construct write file operator
@@ -689,7 +690,7 @@
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
                 conf, vertexIdClass.getName(), MsgList.class.getName());
         HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, job, inputRdFactory);
-        ClusterConfig.setLocationConstraint(spec, writer);
+        setLocationConstraint(spec, writer);
 
         /**
          * connect operator descriptors
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 68e6706..39a56bf 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -41,7 +41,7 @@
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -69,12 +69,12 @@
 
 public class JobGenOuterJoin extends JobGen {
 
-    public JobGenOuterJoin(PregelixJob job) {
-        super(job);
+    public JobGenOuterJoin(PregelixJob job, IOptimizer optimizer) {
+        super(job, optimizer);
     }
-    
-    public JobGenOuterJoin(PregelixJob job, String jobId) {
-        super(job, jobId);
+
+    public JobGenOuterJoin(PregelixJob job, String jobId, IOptimizer optimizer) {
+        super(job, jobId, optimizer);
     }
 
     @Override
@@ -90,12 +90,12 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+        setLocationConstraint(spec, preSuperStep);
 
         /**
          * construct btree search function update operator
@@ -104,7 +104,7 @@
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);;
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
@@ -129,7 +129,7 @@
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
                 preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * construct local sort operator
@@ -140,7 +140,7 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, localSort);
+        setLocationConstraint(spec, localSort);
 
         /**
          * construct local pre-clustered group-by operator
@@ -149,7 +149,7 @@
                 false, false);
         ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
-        ClusterConfig.setLocationConstraint(spec, localGby);
+        setLocationConstraint(spec, localGby);
 
         /**
          * construct global group-by operator
@@ -160,22 +160,22 @@
                 conf, true, true);
         ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, globalGby);
+        setLocationConstraint(spec, globalGby);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
                 jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PostSuperStepRuntimeHookFactory(jobId));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink2);
+        setLocationConstraint(spec, emptySink2);
 
         /**
          * termination state write operator
@@ -202,7 +202,7 @@
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, insertOp);
+        setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
@@ -212,14 +212,14 @@
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
                 getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink4);
+        setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
         /** connect all operators **/
@@ -286,7 +286,7 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct pre-superstep hook
@@ -294,19 +294,19 @@
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
         RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+        setLocationConstraint(spec, preSuperStep);
 
         /**
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
                 true, jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materializeRead);
+        setLocationConstraint(spec, materializeRead);
 
         /**
          * construct index join function update operator
          */
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
@@ -329,7 +329,7 @@
                 getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate, rdInsert, rdDelete);
-        ClusterConfig.setLocationConstraint(spec, join);
+        setLocationConstraint(spec, join);
 
         /**
          * construct local sort operator
@@ -339,7 +339,7 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, localSort);
+        setLocationConstraint(spec, localSort);
 
         /**
          * construct local pre-clustered group-by operator
@@ -348,7 +348,7 @@
                 false, false);
         ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
-        ClusterConfig.setLocationConstraint(spec, localGby);
+        setLocationConstraint(spec, localGby);
 
         /**
          * construct global group-by operator
@@ -357,23 +357,23 @@
                 conf, true, true);
         ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, globalGby);
+        setLocationConstraint(spec, globalGby);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
                 jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PostSuperStepRuntimeHookFactory(jobId));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink);
+        setLocationConstraint(spec, emptySink);
 
         /**
          * termination state write operator
@@ -399,7 +399,7 @@
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, insertOp);
+        setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
@@ -409,15 +409,15 @@
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
                 getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink3);
+        setLocationConstraint(spec, emptySink3);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink4);
+        setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 3e4b213..c10e6c2 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -41,7 +41,7 @@
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -69,12 +69,12 @@
 
 public class JobGenOuterJoinSingleSort extends JobGen {
 
-    public JobGenOuterJoinSingleSort(PregelixJob job) {
-        super(job);
+    public JobGenOuterJoinSingleSort(PregelixJob job, IOptimizer optimizer) {
+        super(job, optimizer);
     }
-    
-    public JobGenOuterJoinSingleSort(PregelixJob job, String jobId) {
-        super(job, jobId);
+
+    public JobGenOuterJoinSingleSort(PregelixJob job, String jobId, IOptimizer optimizer) {
+        super(job, jobId, optimizer);
     }
 
     @Override
@@ -90,12 +90,12 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+        setLocationConstraint(spec, preSuperStep);
 
         /**
          * construct btree search operator
@@ -104,7 +104,7 @@
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
@@ -132,7 +132,7 @@
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
                 preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * construct global sort operator
@@ -144,7 +144,7 @@
                 .getClass());
         ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, globalSort);
+        setLocationConstraint(spec, globalSort);
 
         /**
          * construct global group-by operator
@@ -155,22 +155,22 @@
                 conf, true, false);
         ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, globalGby);
+        setLocationConstraint(spec, globalGby);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
                 jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PostSuperStepRuntimeHookFactory(jobId));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink2);
+        setLocationConstraint(spec, emptySink2);
 
         /**
          * termination state write operator
@@ -196,7 +196,7 @@
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, insertOp);
+        setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
@@ -206,15 +206,15 @@
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
                 getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink3);
+        setLocationConstraint(spec, emptySink3);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink4);
+        setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
@@ -277,7 +277,7 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct pre-superstep hook
@@ -285,19 +285,19 @@
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
         RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+        setLocationConstraint(spec, preSuperStep);
 
         /**
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
                 true, jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materializeRead);
+        setLocationConstraint(spec, materializeRead);
 
         /**
          * construct index join function update operator
          */
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
@@ -320,7 +320,7 @@
                 getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate, rdInsert, rdDelete);
-        ClusterConfig.setLocationConstraint(spec, join);
+        setLocationConstraint(spec, join);
 
         /**
          * construct global sort operator
@@ -331,7 +331,7 @@
                 .getClass());
         ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, globalSort);
+        setLocationConstraint(spec, globalSort);
 
         /**
          * construct global group-by operator
@@ -340,23 +340,23 @@
                 conf, true, false);
         ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, globalGby);
+        setLocationConstraint(spec, globalGby);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
                 jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PostSuperStepRuntimeHookFactory(jobId));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink);
+        setLocationConstraint(spec, emptySink);
 
         /**
          * termination state write operator
@@ -379,7 +379,7 @@
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, insertOp);
+        setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
@@ -389,15 +389,15 @@
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
                 getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink3);
+        setLocationConstraint(spec, emptySink3);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink4);
+        setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 7ca771c..953d82c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -41,7 +41,7 @@
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -69,8 +69,8 @@
 
 public class JobGenOuterJoinSort extends JobGen {
 
-    public JobGenOuterJoinSort(PregelixJob job) {
-        super(job);
+    public JobGenOuterJoinSort(PregelixJob job, IOptimizer optimizer) {
+        super(job, optimizer);
     }
 
     @Override
@@ -86,12 +86,12 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+        setLocationConstraint(spec, preSuperStep);
 
         /**
          * construct btree search function update operator
@@ -100,7 +100,7 @@
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
@@ -125,7 +125,7 @@
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
                 preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * construct local sort operator
@@ -137,7 +137,7 @@
                 .getClass());
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, localSort);
+        setLocationConstraint(spec, localSort);
 
         /**
          * construct local pre-clustered group-by operator
@@ -146,14 +146,14 @@
                 false, false);
         ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
-        ClusterConfig.setLocationConstraint(spec, localGby);
+        setLocationConstraint(spec, localGby);
 
         /**
          * construct global sort operator
          */
         ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, globalSort);
+        setLocationConstraint(spec, globalSort);
 
         /**
          * construct global group-by operator
@@ -164,22 +164,22 @@
                 conf, true, true);
         ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, globalGby);
+        setLocationConstraint(spec, globalGby);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
                 jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PostSuperStepRuntimeHookFactory(jobId));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink2);
+        setLocationConstraint(spec, emptySink2);
 
         /**
          * termination state write operator
@@ -206,7 +206,7 @@
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, insertOp);
+        setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
@@ -216,15 +216,15 @@
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
                 getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink3);
+        setLocationConstraint(spec, emptySink3);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink4);
+        setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
         /** connect all operators **/
@@ -287,7 +287,7 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct pre-superstep hook
@@ -295,19 +295,19 @@
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
         RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+        setLocationConstraint(spec, preSuperStep);
 
         /**
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
                 true, jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materializeRead);
+        setLocationConstraint(spec, materializeRead);
 
         /**
          * construct index join function update operator
          */
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
@@ -330,7 +330,7 @@
                 getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate, rdInsert, rdDelete);
-        ClusterConfig.setLocationConstraint(spec, join);
+        setLocationConstraint(spec, join);
 
         /**
          * construct local sort operator
@@ -341,7 +341,7 @@
                 .getClass());
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, localSort);
+        setLocationConstraint(spec, localSort);
 
         /**
          * construct local pre-clustered group-by operator
@@ -350,14 +350,14 @@
                 false, false);
         ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
-        ClusterConfig.setLocationConstraint(spec, localGby);
+        setLocationConstraint(spec, localGby);
 
         /**
          * construct global sort operator
          */
         ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, globalSort);
+        setLocationConstraint(spec, globalSort);
 
         /**
          * construct global group-by operator
@@ -366,23 +366,23 @@
                 conf, true, true);
         ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, globalGby);
+        setLocationConstraint(spec, globalGby);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
                 jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PostSuperStepRuntimeHookFactory(jobId));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink);
+        setLocationConstraint(spec, emptySink);
 
         /**
          * termination state write operator
@@ -408,7 +408,7 @@
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, insertOp);
+        setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
@@ -418,15 +418,15 @@
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
                 getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink3);
+        setLocationConstraint(spec, emptySink3);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink4);
+        setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 5c1a4b8..244d624 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -84,7 +84,7 @@
     }
 
     /**
-     * get file split provider
+     * get file split provider, for test only
      * 
      * @param jobId
      * @return
@@ -175,26 +175,6 @@
      * @param operator
      * @throws HyracksDataException
      */
-    public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator)
-            throws HyracksException {
-        int count = 0;
-        String[] locations = new String[NCs.length * stores.length];
-        for (String nc : NCs) {
-            for (int i = 0; i < stores.length; i++) {
-                locations[count] = nc;
-                count++;
-            }
-        }
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
-    }
-
-    /**
-     * set location constraint
-     * 
-     * @param spec
-     * @param operator
-     * @throws HyracksDataException
-     */
     public static void setCountConstraint(JobSpecification spec, IOperatorDescriptor operator) throws HyracksException {
         int count = NCs.length * stores.length;
         PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);
@@ -255,11 +235,35 @@
         }
         return locations;
     }
+    
+    /**
+     * set the default location constraint
+     * 
+     * @param spec
+     * @param operator
+     * @throws HyracksDataException
+     */
+    public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator)
+            throws HyracksException {
+        int count = 0;
+        String[] locations = new String[NCs.length * stores.length];
+        for (String nc : NCs) {
+            for (int i = 0; i < stores.length; i++) {
+                locations[count] = nc;
+                count++;
+            }
+        }
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
+    }
 
     public static String[] getNCNames() {
         return NCs;
     }
 
+    public static String[] getStores() {
+        return stores;
+    }
+
     public static void addToBlackListNodes(Collection<String> nodes) {
         blackListNodes.addAll(nodes);
     }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
index 01fc81b..064ca42 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
@@ -15,14 +15,105 @@
 
 package edu.uci.ics.pregelix.core.optimizer;
 
-import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.IntWritable;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.client.stats.Counters;
+import edu.uci.ics.hyracks.client.stats.IClusterCounterContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.pregelix.core.jobgen.JobGen;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
 
 public class DynamicOptimizer implements IOptimizer {
 
+    private IClusterCounterContext counterContext;
+    private Map<String, IntWritable> machineToDegreeOfParallelism = new HashMap<String, IntWritable>();
+    private int dop = 0;
+
+    public DynamicOptimizer(IClusterCounterContext counterContext) {
+        this.counterContext = counterContext;
+    }
+
     @Override
-    public JobGen optimize(ICounterContext counterContext, JobGen jobGen, int iteration) {
-        return jobGen;
+    public JobGen optimize(JobGen jobGen, int iteration) {
+        try {
+            initializeLoadPerMachine();
+            return jobGen;
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public void setOptimizedLocationConstraints(JobSpecification spec, IOperatorDescriptor operator) {
+        try {
+            String[] constraints = new String[dop];
+            int index = 0;
+            for (Entry<String, IntWritable> entry : machineToDegreeOfParallelism.entrySet()) {
+                String loc = entry.getKey();
+                IntWritable count = machineToDegreeOfParallelism.get(loc);
+                for (int j = 0; j < count.get(); j++) {
+                    constraints[index++] = loc;
+                }
+            }
+            PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, constraints);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public IFileSplitProvider getOptimizedFileSplitProvider(String jobId, String indexName) {
+        FileSplit[] fileSplits = new FileSplit[dop];
+        String[] stores = ClusterConfig.getStores();
+        int splitIndex = 0;
+        for (Entry<String, IntWritable> entry : machineToDegreeOfParallelism.entrySet()) {
+            String ncName = entry.getKey();
+            IntWritable count = machineToDegreeOfParallelism.get(ncName);
+            for (int j = 0; j < count.get(); j++) {
+                //cycles stores, each machine has the number of stores = the number of cores
+                int storeCursor = j % stores.length;
+                String st = stores[storeCursor];
+                FileSplit split = new FileSplit(ncName, st + File.separator + ncName + "-data" + File.separator + jobId
+                        + File.separator + indexName + (j / stores.length));
+                fileSplits[splitIndex++] = split;
+            }
+        }
+        return new ConstantFileSplitProvider(fileSplits);
+    }
+
+    /**
+     * initialize the load-per-machine map
+     * 
+     * @return the degree of parallelism
+     * @throws HyracksException
+     */
+    private int initializeLoadPerMachine() throws HyracksException {
+        machineToDegreeOfParallelism.clear();
+        String[] locationConstraints = ClusterConfig.getLocationConstraint();
+        for (String loc : locationConstraints) {
+            machineToDegreeOfParallelism.put(loc, new IntWritable(0));
+        }
+        dop = 0;
+        for (Entry<String, IntWritable> entry : machineToDegreeOfParallelism.entrySet()) {
+            String loc = entry.getKey();
+            //reserve one core for heartbeat
+            int load = (int) counterContext.getCounter(Counters.NUM_PROCESSOR, false).get() - 1;
+            IntWritable count = machineToDegreeOfParallelism.get(loc);
+            count.set(load);
+            dop += load;
+        }
+        return dop;
     }
 
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
index b5913c4..c8856aa 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
@@ -15,11 +15,17 @@
 
 package edu.uci.ics.pregelix.core.optimizer;
 
-import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.pregelix.core.jobgen.JobGen;
 
 public interface IOptimizer {
 
-    public JobGen optimize(ICounterContext counterContext, JobGen jobGen, int iteration);
-    
+    public JobGen optimize(JobGen jobGen, int iteration);
+
+    public void setOptimizedLocationConstraints(JobSpecification spec, IOperatorDescriptor operator);
+
+    public IFileSplitProvider getOptimizedFileSplitProvider(String jobId, String indexName);
+
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/NoOpOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/NoOpOptimizer.java
new file mode 100644
index 0000000..cd0ca37
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/NoOpOptimizer.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.optimizer;
+
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.pregelix.core.jobgen.JobGen;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+
+public class NoOpOptimizer implements IOptimizer {
+
+    @Override
+    public JobGen optimize(JobGen jobGen, int iteration) {
+        return jobGen;
+    }
+
+    @Override
+    public void setOptimizedLocationConstraints(JobSpecification spec, IOperatorDescriptor operator) {
+        try {
+            ClusterConfig.setLocationConstraint(spec, operator);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public IFileSplitProvider getOptimizedFileSplitProvider(String jobId, String indexName) {
+        try {
+           return ClusterConfig.getFileSplitProvider(jobId, indexName);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
index b4e17b6..5855fd3 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
@@ -35,6 +35,8 @@
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
+import edu.uci.ics.pregelix.core.optimizer.NoOpOptimizer;
 import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
 import edu.uci.ics.pregelix.example.PageRankVertex;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimulatedPageRankVertexInputFormat;
@@ -83,7 +85,9 @@
         FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
         FileUtils.cleanDirectory(new File(EXPECT_RESULT_DIR));
         FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
-        giraphTestJobGen = new JobGenOuterJoin(job);
+
+        IOptimizer dynamicOptimizer = new NoOpOptimizer();
+        giraphTestJobGen = new JobGenOuterJoin(job, dynamicOptimizer);
     }
 
     private void cleanupStores() throws IOException {
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index fcfeb95..0a5e891 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -25,7 +25,6 @@
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
 import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
-import edu.uci.ics.pregelix.api.util.HadoopCountersGlobalAggregateHook;
 import edu.uci.ics.pregelix.example.ConnectedComponentsVertex;
 import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
 import edu.uci.ics.pregelix.example.EarlyTerminationVertex;
@@ -54,8 +53,6 @@
 import edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingAggregator;
 import edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingVertex;
 import edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingVertex.TriangleCountingVertexOutputFormat;
-import edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingWithAggregateHadoopCountersVertex.TriangleHadoopCountersAggregator;
-import edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingWithAggregateHadoopCountersVertex;
 
 public class JobGenerator {
     private static String outputBase = "src/test/resources/jobs/";
@@ -82,6 +79,7 @@
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
         job.setCheckpointHook(ConservativeCheckpointHook.class);
+        job.setEnableDynamicOptimization(true);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
@@ -98,6 +96,7 @@
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
         job.setCheckpointHook(ConservativeCheckpointHook.class);
+        job.setEnableDynamicOptimization(true);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
@@ -126,6 +125,7 @@
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
         job.setCheckpointHook(ConservativeCheckpointHook.class);
+        job.setEnableDynamicOptimization(true);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
@@ -140,6 +140,7 @@
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.setEnableDynamicOptimization(true);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
@@ -155,6 +156,7 @@
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+        job.setEnableDynamicOptimization(true);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
@@ -228,6 +230,7 @@
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.setEnableDynamicOptimization(true);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
@@ -240,19 +243,7 @@
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH3);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
-        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-    }
-    
-    private static void generateTriangleCountingWithHadoopCountersJob(String jobName, String outputPath) throws IOException {
-        PregelixJob job = new PregelixJob(jobName);
-        job.setVertexClass(TriangleCountingWithAggregateHadoopCountersVertex.class);
-        job.addGlobalAggregatorClass(TriangleCountingAggregator.class);
-        job.setCounterAggregatorClass(TriangleCountingWithAggregateHadoopCountersVertex.TriangleHadoopCountersAggregator.class);
-        job.setVertexInputFormatClass(TextTriangleCountingInputFormat.class);
-        job.setVertexOutputFormatClass(TriangleCountingVertexOutputFormat.class);
-        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
-        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH3);
-        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
+        job.setDynamicVertexValueSize(true);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
@@ -391,7 +382,6 @@
 
     private static void genTriangleCounting() throws IOException {
         generateTriangleCountingJob("Triangle Counting", outputBase + "TriangleCounting.xml");
-//        generateTriangleCountingWithHadoopCountersJob("Triangle Counting", outputBase + "TriangleCountingWithHadoopCounters.xml");
     }
 
     private static void genMaximalClique() throws IOException {
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/Record.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/Record.java
new file mode 100644
index 0000000..10f5829
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/Record.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.util;
+
+@SuppressWarnings("rawtypes")
+public class Record implements Comparable {
+
+    private String recordText;
+
+    public Record(String text) {
+        recordText = text;
+    }
+
+    @Override
+    public int hashCode() {
+        return recordText.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return recordText;
+    }
+
+    @Override
+    public int compareTo(Object o) {
+        if (!(o instanceof Record)) {
+            throw new IllegalStateException("uncomparable items");
+        }
+        Record record = (Record) o;
+        boolean equal = equalStrings(recordText, record.recordText);
+        if (equal) {
+            return 0;
+        } else {
+            return recordText.compareTo(record.recordText);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof Record)) {
+            return false;
+        }
+        Record record = (Record) o;
+        return equalStrings(recordText, record.recordText);
+    }
+
+    private boolean equalStrings(String s1, String s2) {
+        String[] rowsOne = s1.split("\n");
+        String[] rowsTwo = s2.split("\n");
+
+        if (rowsOne.length != rowsTwo.length)
+            return false;
+
+        for (int i = 0; i < rowsOne.length; i++) {
+            String row1 = rowsOne[i];
+            String row2 = rowsTwo[i];
+
+            if (row1.equals(row2))
+                continue;
+
+            boolean spaceOrTab = false;
+            spaceOrTab = row1.contains(" ");
+            String[] fields1 = spaceOrTab ? row1.split(" ") : row1.split("\t");
+            String[] fields2 = spaceOrTab ? row2.split(" ") : row2.split("\t");
+
+            for (int j = 0; j < fields1.length; j++) {
+                if (j >= fields2.length) {
+                    return false;
+                }
+                if (fields1[j].equals(fields2[j])) {
+                    continue;
+                } else if (fields1[j].indexOf('.') < 0) {
+                    return false;
+                } else {
+                    Double double1 = Double.parseDouble(fields1[j]);
+                    Double double2 = Double.parseDouble(fields2[j]);
+                    float float1 = (float) double1.doubleValue();
+                    float float2 = (float) double2.doubleValue();
+
+                    if (Math.abs(float1 - float2) < 1.0e-7)
+                        continue;
+                    else {
+                        return false;
+                    }
+                }
+            }
+        }
+        return true;
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
index fe07cf5..e6347ed 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
@@ -16,83 +16,82 @@
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.junit.Assert;
 
 public class TestUtils {
 
+    private static final String PREFIX = "part";
+
     public static void compareWithResultDir(File expectedFileDir, File actualFileDir) throws Exception {
-        String[] fileNames = expectedFileDir.list();
-        for (String fileName : fileNames) {
-            compareWithResult(new File(expectedFileDir, fileName), new File(actualFileDir, fileName));
-        }
+        Collection<Record> expectedRecords = loadRecords(expectedFileDir);
+        Collection<Record> actualRecords = loadRecords(actualFileDir);
+        boolean equal = collectionEqual(expectedRecords, actualRecords);
+        Assert.assertTrue(equal);
     }
 
-    public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
-        BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
-        BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
-        String lineExpected, lineActual;
-        int num = 1;
-        try {
-            while ((lineExpected = readerExpected.readLine()) != null) {
-                lineActual = readerActual.readLine();
-                if (lineActual == null) {
-                    throw new Exception("Actual result changed at line " + num + ":\n< " + lineExpected + "\n> ");
+    public static boolean collectionEqual(Collection<Record> c1, Collection<Record> c2) {
+        for (Record r1 : c1) {
+            boolean exists = false;
+            for (Record r2 : c2) {
+                if (r1.equals(r2)) {
+                    exists = true;
+                    break;
                 }
-                if (!equalStrings(lineExpected, lineActual)) {
-                    throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
-                            + lineActual);
-                }
-                ++num;
             }
-            lineActual = readerActual.readLine();
-            if (lineActual != null) {
-                throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineActual);
+            if (!exists) {
+                return false;
             }
-        } finally {
-            readerExpected.close();
-            readerActual.close();
         }
-    }
-
-    private static boolean equalStrings(String s1, String s2) {
-        String[] rowsOne = s1.split("\n");
-        String[] rowsTwo = s2.split("\n");
-
-        if (rowsOne.length != rowsTwo.length)
-            return false;
-
-        for (int i = 0; i < rowsOne.length; i++) {
-            String row1 = rowsOne[i];
-            String row2 = rowsTwo[i];
-
-            if (row1.equals(row2))
-                continue;
-
-            boolean spaceOrTab = false;
-            spaceOrTab = row1.contains(" ");
-            String[] fields1 = spaceOrTab ? row1.split(" ") : row1.split("\t");
-            String[] fields2 = spaceOrTab ? row2.split(" ") : row2.split("\t");
-
-            for (int j = 0; j < fields1.length; j++) {
-                if (fields1[j].equals(fields2[j])) {
-                    continue;
-                } else if (fields1[j].indexOf('.') < 0) {
-                    return false;
-                } else {
-                    Double double1 = Double.parseDouble(fields1[j]);
-                    Double double2 = Double.parseDouble(fields2[j]);
-                    float float1 = (float) double1.doubleValue();
-                    float float2 = (float) double2.doubleValue();
-
-                    if (Math.abs(float1 - float2) < 1.0e-7)
-                        continue;
-                    else {
-                        return false;
-                    }
+        for (Record r2 : c2) {
+            boolean exists = false;
+            for (Record r1 : c1) {
+                if (r2.equals(r1)) {
+                    exists = true;
+                    break;
                 }
             }
+            if (!exists) {
+                return false;
+            }
         }
         return true;
     }
 
+    public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
+        Collection<Record> expectedRecords = new ArrayList<Record>();
+        Collection<Record> actualRecords = new ArrayList<Record>();
+        populateResultFile(expectedRecords, expectedFile);
+        populateResultFile(actualRecords, actualFile);
+        boolean equal = expectedRecords.equals(actualRecords);
+        Assert.assertTrue(equal);
+    }
+
+    private static Collection<Record> loadRecords(File dir) throws Exception {
+        String[] fileNames = dir.list();
+        Collection<Record> records = new ArrayList<Record>();
+        for (String fileName : fileNames) {
+            if (fileName.startsWith(PREFIX)) {
+                File file = new File(dir, fileName);
+                populateResultFile(records, file);
+            }
+        }
+        return records;
+    }
+
+    private static void populateResultFile(Collection<Record> records, File file) throws FileNotFoundException,
+            IOException {
+        BufferedReader reader = new BufferedReader(new FileReader(file));
+        String line = null;
+        while ((line = reader.readLine()) != null) {
+            records.add(new Record(line));
+        }
+        reader.close();
+    }
+
 }
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
index 591446c..3091c83 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
@@ -80,6 +80,7 @@
 <property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
 <property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
 <property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
 <property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
 <property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
 <property><name>mapred.queue.names</name><value>default</value></property>
@@ -125,7 +126,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index 32c2a1a..b6af65c 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -29,7 +29,6 @@
 <property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleMinCombiner</value></property>
 <property><name>mapred.output.compress</name><value>false</value></property>
 <property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
 <property><name>mapred.reduce.max.attempts</name><value>4</value></property>
@@ -86,6 +85,7 @@
 <property><name>fs.checkpoint.period</name><value>3600</value></property>
 <property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
 <property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
 <property><name>fs.s3.maxRetries</name><value>4</value></property>
 <property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
 <property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml b/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
index d06068d..d908da8 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
@@ -124,7 +124,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml b/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
index 01d85a5..d5ec8f1 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
@@ -124,7 +124,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
index 072ea9e..b4c42e6 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
@@ -124,7 +124,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator,class edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
index 3ae367d..6cf075b 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
@@ -124,7 +124,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator,class edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
index 6cb617f..49e2e6f 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
@@ -125,7 +125,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator,class edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
index 76d6e87..8316c64 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
@@ -125,7 +125,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
index 1f52250..a894ccd 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
@@ -124,7 +124,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
index 9d2c9e1..a9f8925 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
@@ -1,146 +1,145 @@
 <?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>pregelix.updateIntensive</name><value>true</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>pregelix.framesize</name><value>2048</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>pregelix.numVertices</name><value>20</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.job.name</name><value>Message Overflow LSM</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
 <property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
 <property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.output.dir</name><value>/result</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
 <property><name>mapred.input.dir</name><value>file:/webmap</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
 <property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Message Overflow LSM</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>pregelix.updateIntensive</name><value>true</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.framesize</name><value>2048</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
 </configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
index f1c27ca..d29b2da 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
@@ -125,7 +125,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimulatedPageRankVertexInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>pregelix.incStateLength</name><value>false</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index dfb4e71..6fe04fb 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -30,7 +30,6 @@
 <property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
 <property><name>mapred.output.compress</name><value>false</value></property>
 <property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
 <property><name>mapred.reduce.max.attempts</name><value>4</value></property>
@@ -86,6 +85,7 @@
 <property><name>fs.checkpoint.period</name><value>3600</value></property>
 <property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
 <property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
 <property><name>fs.s3.maxRetries</name><value>4</value></property>
 <property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
 <property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index 49a6e20..d0f9759 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -30,7 +30,6 @@
 <property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
 <property><name>mapred.output.compress</name><value>false</value></property>
 <property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
 <property><name>mapred.reduce.max.attempts</name><value>4</value></property>
@@ -87,6 +86,7 @@
 <property><name>fs.checkpoint.period</name><value>3600</value></property>
 <property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
 <property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
 <property><name>fs.s3.maxRetries</name><value>4</value></property>
 <property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
 <property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
index 789ea32..0173390 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
@@ -80,6 +80,7 @@
 <property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
 <property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
 <property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
 <property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
 <property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
 <property><name>mapred.queue.names</name><value>default</value></property>
@@ -125,7 +126,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
index 796b1d1..a7a38e0 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
@@ -80,6 +80,7 @@
 <property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
 <property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
 <property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
 <property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
 <property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
 <property><name>mapred.queue.names</name><value>default</value></property>
@@ -125,7 +126,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>pregelix.incStateLength</name><value>false</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
index 8834ead..225429a 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
@@ -1,146 +1,145 @@
 <?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>pregelix.numVertices</name><value>23</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.job.name</name><value>Reachibility</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
 <property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
 <property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
-<property><name>ReachibilityVertex.destId</name><value>10</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
 <property><name>mapred.input.dir</name><value>file:/webmapcomplex</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
 <property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Reachibility</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>23</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>ReachibilityVertex.destId</name><value>10</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
 </configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
index 234dbf9..bd9da92 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
@@ -1,146 +1,145 @@
 <?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>pregelix.numVertices</name><value>23</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.job.name</name><value>Reachibility</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
 <property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
 <property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
-<property><name>ReachibilityVertex.destId</name><value>25</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
 <property><name>mapred.input.dir</name><value>file:/webmapcomplex</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
 <property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Reachibility</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>23</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>ReachibilityVertex.destId</name><value>25</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
 </configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
index b1c57c5..9acd7bc 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
@@ -126,7 +126,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimulatedPageRankVertexInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
index f1d2dc6..6c25575 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
@@ -126,7 +126,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml b/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
index 951ac6f..80cea20 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
@@ -123,9 +123,10 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.trianglecounting.TextTriangleCountingInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator,class edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingAggregator</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
 <property><name>topology.script.number.args</name><value>100</value></property>
 <property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>