Merge branch 'master' into yingyi/fullstack_fix
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
index 46487eb..c39cba7 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
@@ -149,7 +149,7 @@
         for (String counterName : RESET_COUNTERS) {
             updateCounter(slave, jo, counterName);
         }
-        
+
         for (String counterName : AGG_COUNTERS) {
             updateCounter(slave, jo, counterName);
         }
@@ -168,7 +168,9 @@
 
     private long extractCounterValue(Object counterObject) {
         long counterValue = 0;
-        if (counterObject instanceof JSONArray) {
+        if (counterObject == null) {
+            return counterValue;
+        } else if (counterObject instanceof JSONArray) {
             JSONArray jArray = (JSONArray) counterObject;
             Object[] values = jArray.toArray();
             for (Object value : values) {
@@ -256,4 +258,4 @@
 
     }
 
-}
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
index 5ea6413..c1473dc 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
@@ -31,7 +31,7 @@
  * 2. a final phase which aggregates all partially aggregated states
  * 
  * @param <I extends Writable> vertex identifier type
- * @param <E extends Writable> vertex value type
+ * @param <V extends Writable> vertex value type
  * @param <E extends Writable> edge type
  * @param <M extends Writable> message type
  * @param <P extends Writable>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index a42b411..8135479 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -234,19 +234,44 @@
     /**
      * Vote to halt. Once all vertex vote to halt and no more messages, a
      * Pregelix job will terminate.
+     * 
+     * The state of the current vertex value is saved.
      */
     public final void voteToHalt() {
         halt = true;
         updated = true;
     }
+    
+    /**
+     * Vote to halt. Once all vertex vote to halt and no more messages, a
+     * Pregelix job will terminate.
+     * 
+     * @param update whether or not to save the vertex value
+     */
+    public final void voteToHalt(boolean update) {
+        halt = true;
+        updated = update;
+    }
 
     /**
      * Activate a halted vertex such that it is alive again.
+     * 
+     * The state of the current vertex value is saved.
      */
     public final void activate() {
         halt = false;
         updated = true;
     }
+    
+    /**
+     * Activate a halted vertex such that it is alive again.
+     * 
+     * @param update whether or not to save the vertex value
+     */
+    public final void activate(boolean update) {
+        halt = false;
+        updated = update;
+    }
 
     /**
      * @return the vertex is halted (true) or not (false)
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/IIterationCompleteReporterHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/IIterationCompleteReporterHook.java
new file mode 100644
index 0000000..5254b8c
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/IIterationCompleteReporterHook.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.job;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Interface for an object whose {@link completeIteration} method is called at the end
+ * of each pregelix job iteration.
+ * 
+ * This class can be used to extend/replace the simple reporting in pregelix or to
+ * implement aggregation across iterations of a job (rather than having the values
+ * reset after each iteration).
+ * One object is created for each job.
+ * 
+ * @author jake.biesinger
+ */
+public interface IIterationCompleteReporterHook {
+
+    public void completeIteration(int superstep, PregelixJob job) throws HyracksDataException;
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 06e79de..f2c9c84 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -16,6 +16,7 @@
 package edu.uci.ics.pregelix.api.job;
 
 import java.io.IOException;
+import java.lang.reflect.Modifier;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
@@ -26,7 +27,9 @@
 import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.util.HadoopCountersGlobalAggregateHook;
 import edu.uci.ics.pregelix.api.util.GlobalCountAggregator;
+import edu.uci.ics.pregelix.api.util.HadoopCountersAggregator;
 
 /**
  * This class represents a Pregelix job.
@@ -82,8 +85,16 @@
     public static final String CKP_INTERVAL = "pregelix.ckpinterval";
     /** the dynamic optimization */
     public static final String DYNAMIC_OPTIMIZATION = "pregelix.dynamicopt";
+    /** the iteration complete reporter hook */
+    public static final String ITERATION_COMPLETE_CLASS = "pregelix.iterationCompleteReporter";
     /** comma */
     public static final String COMMA_STR = ",";
+    /** period */
+    public static final String PERIOD_STR = ".";
+    /** the names of the aggregator classes active for all vertex types */
+    public static final String[] DEFAULT_GLOBAL_AGGREGATOR_CLASSES = { GlobalCountAggregator.class.getName() };
+    /** The name of an optional class that aggregates all Vertexes into mapreduce.Counters */
+    public static final String COUNTERS_AGGREGATOR_CLASS = "pregelix.aggregatedCountersClass";
 
     /**
      * Construct a Pregelix job from an existing configuration
@@ -93,7 +104,6 @@
      */
     public PregelixJob(Configuration conf) throws IOException {
         super(conf);
-        this.addGlobalAggregatorClass(GlobalCountAggregator.class);
     }
 
     /**
@@ -105,7 +115,6 @@
      */
     public PregelixJob(String jobName) throws IOException {
         super(new Configuration(), jobName);
-        this.addGlobalAggregatorClass(GlobalCountAggregator.class);
     }
 
     /**
@@ -119,7 +128,6 @@
      */
     public PregelixJob(Configuration conf, String jobName) throws IOException {
         super(conf, jobName);
-        this.addGlobalAggregatorClass(GlobalCountAggregator.class);
     }
 
     /**
@@ -262,16 +270,35 @@
     final public void setCheckpointingInterval(int ckpInterval) {
         getConfiguration().setInt(CKP_INTERVAL, ckpInterval);
     }
-    
+
+    /**
+     * Users can provide an IIterationCompleteReporterHook implementation to perform actions
+     * at the end of each iteration
+     * 
+     * @param reporterClass
+     */
+    final public void setIterationCompleteReporterHook(Class<? extends IIterationCompleteReporterHook> reporterClass) {
+        getConfiguration().setClass(ITERATION_COMPLETE_CLASS, reporterClass, IIterationCompleteReporterHook.class);
+    }
+
     /**
      * Indicate if dynamic optimization is enabled
      * 
      * @param dynamicOpt
      */
-    final public void setEnableDynamicOptimization(boolean dynamicOpt){
+    final public void setEnableDynamicOptimization(boolean dynamicOpt) {
         getConfiguration().setBoolean(DYNAMIC_OPTIMIZATION, dynamicOpt);
     }
 
+    final public void setCounterAggregatorClass(Class<? extends HadoopCountersAggregator<?, ?, ?, ?, ?>> aggClass) {
+        if (Modifier.isAbstract(aggClass.getModifiers())) {
+            throw new IllegalArgumentException("Aggregate class must be a concrete class, not an abstract one! (was " + aggClass.getName() + ")");
+        }
+        getConfiguration().setClass(COUNTERS_AGGREGATOR_CLASS, aggClass, HadoopCountersAggregator.class);
+        addGlobalAggregatorClass(aggClass);
+        setIterationCompleteReporterHook(HadoopCountersGlobalAggregateHook.class);
+    }
+
     @Override
     public String toString() {
         return getJobName();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index f44942f..bef9aa9 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -15,14 +15,23 @@
 
 package edu.uci.ics.pregelix.api.util;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
 import edu.uci.ics.pregelix.api.graph.MessageCombiner;
 import edu.uci.ics.pregelix.api.graph.MsgList;
@@ -33,6 +42,7 @@
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
 import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.job.ICheckpointHook;
+import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 
 /**
@@ -40,6 +50,10 @@
  * them.
  */
 public class BspUtils {
+    
+    public static final String TMP_DIR = "/tmp/";
+    private static final String COUNTERS_VALUE_ON_ITERATION = ".counters.valueOnIter.";
+    private static final String COUNTERS_LAST_ITERATION_COMPLETED = ".counters.lastIterCompleted";
 
     /**
      * Get the user's subclassed {@link VertexInputFormat}.
@@ -123,9 +137,17 @@
     public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable, F extends Writable> List<Class<? extends GlobalAggregator<I, V, E, M, P, F>>> getGlobalAggregatorClasses(
             Configuration conf) {
         String aggStrs = conf.get(PregelixJob.GLOBAL_AGGREGATOR_CLASS);
-        String[] classnames = aggStrs.split(PregelixJob.COMMA_STR);
+        String[] classnames;
+        if (aggStrs == null) {
+            classnames = new String[0];
+        } else {
+            classnames = aggStrs.split(PregelixJob.COMMA_STR);
+        }
         try {
             List<Class<? extends GlobalAggregator<I, V, E, M, P, F>>> classes = new ArrayList<Class<? extends GlobalAggregator<I, V, E, M, P, F>>>();
+            for (String defaultClass : PregelixJob.DEFAULT_GLOBAL_AGGREGATOR_CLASSES) {
+                classes.add((Class<? extends GlobalAggregator<I, V, E, M, P, F>>) conf.getClassByName(defaultClass));
+            }
             for (int i = 0; i < classnames.length; i++) {
                 classes.add((Class<? extends GlobalAggregator<I, V, E, M, P, F>>) conf.getClassByName(classnames[i]));
             }
@@ -563,6 +585,24 @@
     }
 
     /**
+     * Create a hook that indicates an iteration is complete
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user aggregate value
+     */
+    public static IIterationCompleteReporterHook createIterationCompleteHook(Configuration conf) {
+        Class<? extends IIterationCompleteReporterHook> itCompleteClass = getIterationCompleteReporterHookClass(conf);
+        try {
+            return itCompleteClass.newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createVertexPartitioner: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createVertexPartitioner: Illegally accessed", e);
+        }
+    }
+
+    /**
      * Get the user's subclassed vertex partitioner class.
      * 
      * @param conf
@@ -587,6 +627,20 @@
     }
 
     /**
+     * Get the user's subclassed iteration complete reporter hook class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return The user defined vertex iteration complete reporter class
+     */
+    @SuppressWarnings("unchecked")
+    public static <V extends IIterationCompleteReporterHook> Class<V> getIterationCompleteReporterHookClass(
+            Configuration conf) {
+        return (Class<V>) conf.getClass(PregelixJob.ITERATION_COMPLETE_CLASS,
+                DefaultIterationCompleteReporterHook.class, IIterationCompleteReporterHook.class);
+    }
+
+    /**
      * Get the job configuration parameter whether the vertex states will increase dynamically
      * 
      * @param conf
@@ -671,15 +725,16 @@
     public static int getRecoveryCount(Configuration conf) {
         return conf.getInt(PregelixJob.RECOVERY_COUNT, 0);
     }
-    
+
     /***
      * Get enable dynamic optimization
      * 
-     * @param conf Configuration
+     * @param conf
+     *            Configuration
      * @return true if enabled; otherwise false
      */
-    public static boolean getEnableDynamicOptimization(Configuration conf){
-        return conf.getBoolean(PregelixJob.DYNAMIC_OPTIMIZATION, true);
+    public static boolean getEnableDynamicOptimization(Configuration conf) {
+        return conf.getBoolean(PregelixJob.DYNAMIC_OPTIMIZATION, false);
     }
 
     /***
@@ -691,4 +746,115 @@
     public static int getCheckpointingInterval(Configuration conf) {
         return conf.getInt(PregelixJob.CKP_INTERVAL, -1);
     }
+
+    public static Writable readGlobalAggregateValue(Configuration conf, String jobId, String aggClassName)
+            throws HyracksDataException {
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            String pathStr = TMP_DIR + jobId + "agg";
+            Path path = new Path(pathStr);
+            FSDataInputStream input = dfs.open(path);
+            int numOfAggs = createFinalAggregateValues(conf).size();
+            for (int i = 0; i < numOfAggs; i++) {
+                String aggName = input.readUTF();
+                Writable agg = createFinalAggregateValue(conf, aggName);
+                if (aggName.equals(aggClassName)) {
+                    agg.readFields(input);
+                    input.close();
+                    return agg;
+                } else {
+                    agg.readFields(input);
+                }
+            }
+            throw new IllegalStateException("Cannot find the aggregate value for " + aggClassName);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public static HashMap<String, Writable> readAllGlobalAggregateValues(Configuration conf, String jobId)
+            throws HyracksDataException {
+        String pathStr = TMP_DIR + jobId + "agg";
+        Path path = new Path(pathStr);
+        List<Writable> aggValues = createFinalAggregateValues(conf);
+        HashMap<String, Writable> finalAggs = new HashMap<>();
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            FSDataInputStream input = dfs.open(path);
+            for (int i = 0; i < aggValues.size(); i++) {
+                String aggName = input.readUTF();
+                aggValues.get(i).readFields(input);
+                finalAggs.put(aggName, aggValues.get(i));
+            }
+            input.close();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        return finalAggs;
+    }
+
+    public static Counters getCounters(PregelixJob job) throws HyracksDataException {
+        Configuration conf = job.getConfiguration();
+        String jobId = getJobId(conf);
+        int lastIter = BspUtils.readCountersLastIteration(conf, jobId);
+        return BspUtils.readCounters(lastIter, conf, jobId);
+    }
+
+    static Counters readCounters(int superstep, Configuration conf, String jobId) throws HyracksDataException {
+        String pathStr = TMP_DIR + jobId + BspUtils.COUNTERS_VALUE_ON_ITERATION + superstep;
+        Path path = new Path(pathStr);
+        Counters savedCounters = new Counters();
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            FSDataInputStream input = dfs.open(path);
+            savedCounters.readFields(input);
+            input.close();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        return savedCounters;
+    }
+
+    static void writeCounters(Counters toWrite, int superstep, Configuration conf, String jobId)
+            throws HyracksDataException {
+        String pathStr = TMP_DIR + jobId + BspUtils.COUNTERS_VALUE_ON_ITERATION + superstep;
+        Path path = new Path(pathStr);
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            FSDataOutputStream output = dfs.create(path, true);
+            toWrite.write(output);
+            output.close();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    static int readCountersLastIteration(Configuration conf, String jobId) throws HyracksDataException {
+        String pathStr = TMP_DIR + jobId + BspUtils.COUNTERS_LAST_ITERATION_COMPLETED;
+        Path path = new Path(pathStr);
+        IntWritable lastIter = new IntWritable();
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            FSDataInputStream input = dfs.open(path);
+            lastIter.readFields(input);
+            input.close();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        return lastIter.get();
+    }
+
+    static void writeCountersLastIteration(int superstep, Configuration conf, String jobId) throws HyracksDataException {
+        String pathStr = TMP_DIR + jobId + BspUtils.COUNTERS_LAST_ITERATION_COMPLETED;
+        Path path = new Path(pathStr);
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            FSDataOutputStream output = dfs.create(path, true);
+            new IntWritable(superstep).write(output);
+            output.close();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
index 4f5fef0..9721589 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
@@ -17,7 +17,7 @@
 import edu.uci.ics.pregelix.api.job.ICheckpointHook;
 
 /**
- * A conservative checkpoint hook which does checkpoint every 5 supersteps
+ * A conservative checkpoint hook which does checkpoint every 2 supersteps
  * 
  * @author yingyib
  */
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultIterationCompleteReporterHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultIterationCompleteReporterHook.java
new file mode 100644
index 0000000..7b004d8
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultIterationCompleteReporterHook.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+
+/**
+ * The default iteration complete reporter hook does nothing
+ * 
+ * @author wbiesing
+ */
+public class DefaultIterationCompleteReporterHook implements IIterationCompleteReporterHook {
+
+    private static final Log LOG = LogFactory.getLog(DefaultIterationCompleteReporterHook.class);
+
+    @Override
+    public void completeIteration(int superstep, PregelixJob job) {
+        LOG.debug("iteration complete reporter for " + superstep + " job " + job.getJobName());
+    }
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersAggregator.java
new file mode 100644
index 0000000..b0814d9
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersAggregator.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Counters;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * A global aggregator that produces a Hadoop mapreduce.Counters object
+ */
+@SuppressWarnings("rawtypes")
+public abstract class HadoopCountersAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable>
+        extends GlobalAggregator<I, V, E, M, Counters, Counters> {
+    private ResettableCounters counters = new ResettableCounters();
+
+    public Counters getCounters() {
+        return counters;
+    }
+
+    @Override
+    public void init() {
+        counters.reset();
+    }
+
+    @Override
+    public void step(Counters partialResult) {
+        counters.incrAllCounters(partialResult);
+    }
+
+    @Override
+    public Counters finishPartial() {
+        return counters;
+    }
+
+    @Override
+    public Counters finishFinal() {
+        return counters;
+    }
+
+    /**
+     * mapreduce.Counters object that is resettable via .reset()
+     */
+    public static class ResettableCounters extends Counters {
+        private static final DataInputStream zeroStream = new DataInputStream(new InputStream() {
+            @Override
+            public int read() throws IOException {
+                return 0;
+            }
+        });
+
+        /**
+         * Reset this Counters object
+         * 
+         * The reset is done by simulating a readFields() from a stream of 0's,
+         * indicating a serialized length of 0 groups. The Counters' cache is not changed. 
+         */
+        public void reset() {
+            try {
+                this.readFields(zeroStream);
+            } catch (IOException e) {
+                throw new RuntimeException("Unexpected failure when trying to reset Counters object!", e);
+            }
+        }
+    }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersGlobalAggregateHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersGlobalAggregateHook.java
new file mode 100644
index 0000000..35ff22c
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersGlobalAggregateHook.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counters;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+
+/**
+ * Hook that aggregates a mapreduce.Counters object across all
+ * iterations of a job, saving to HDFS
+ * 
+ * @author wbiesing
+ */
+public class HadoopCountersGlobalAggregateHook implements IIterationCompleteReporterHook {
+
+    @Override
+    public void completeIteration(int superstep, PregelixJob job) throws HyracksDataException {
+        Configuration conf = job.getConfiguration();
+        String jobId = BspUtils.getJobId(conf);
+        Class<?> aggClass = conf.getClass(PregelixJob.COUNTERS_AGGREGATOR_CLASS, null);
+        if (aggClass == null)
+            throw new HyracksDataException(
+                    "A subclass of HadoopCountersAggregator must active for GlobalAggregateCountersHook to operate!");
+        Counters curIterCounters;
+        try {
+            curIterCounters = (Counters) BspUtils.readGlobalAggregateValue(conf, jobId, aggClass.getName());
+        } catch (IllegalStateException e) {
+            throw new HyracksDataException(
+                    "A subclass of HadoopCountersAggregator must active for GlobalAggregateCountersHook to operate!", e);
+        }
+        if (superstep > 1) {
+            Counters prevCounters = BspUtils.readCounters(superstep - 1, conf, jobId); // the counters from the previous iterations, all aggregated together
+            curIterCounters.incrAllCounters(prevCounters); // add my counters to previous ones
+        }
+        BspUtils.writeCounters(curIterCounters, superstep, conf, jobId);
+        BspUtils.writeCountersLastIteration(superstep, conf, jobId);
+    }
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index b3a90e9..a71ea3d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -46,6 +46,7 @@
 import edu.uci.ics.hyracks.client.stats.Counters;
 import edu.uci.ics.hyracks.client.stats.impl.ClientCounterContext;
 import edu.uci.ics.pregelix.api.job.ICheckpointHook;
+import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.base.IDriver;
@@ -54,6 +55,7 @@
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
 import edu.uci.ics.pregelix.core.optimizer.DynamicOptimizer;
 import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
+import edu.uci.ics.pregelix.core.optimizer.NoOpOptimizer;
 import edu.uci.ics.pregelix.core.util.ExceptionUtilities;
 import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
 
@@ -97,7 +99,7 @@
             PregelixJob currentJob = jobs.get(0);
             PregelixJob lastJob = currentJob;
             addHadoopConfiguration(currentJob, ipAddress, port, true);
-            ClientCounterContext counterContext = new ClientCounterContext(ipAddress, 16001,
+            ClientCounterContext counterContext = new ClientCounterContext(ipAddress, ClusterConfig.getCCHTTPort(),
                     Arrays.asList(ClusterConfig.getNCNames()));
             JobGen jobGen = null;
 
@@ -110,8 +112,11 @@
             boolean failed = false;
             int retryCount = 0;
             int maxRetryCount = 3;
-            jobGen = selectJobGen(planChoice, currentJob);
-            IOptimizer dynamicOptimzier = new DynamicOptimizer();
+
+            IOptimizer dynamicOptimizer = BspUtils.getEnableDynamicOptimization(currentJob.getConfiguration()) == false ? new NoOpOptimizer()
+                    : new DynamicOptimizer(counterContext);
+            jobGen = selectJobGen(planChoice, currentJob, dynamicOptimizer);
+            jobGen = dynamicOptimizer.optimize(jobGen, 0);
 
             do {
                 try {
@@ -139,9 +144,7 @@
                         }
 
                         /** run loop-body jobs with dynamic optimizer if it is enabled */
-                        if (BspUtils.getEnableDynamicOptimization(currentJob.getConfiguration())) {
-                            jobGen = dynamicOptimzier.optimize(counterContext, jobGen, i);
-                        }
+                        jobGen = dynamicOptimizer.optimize(jobGen, i);
                         runLoopBody(deploymentId, currentJob, jobGen, i, lastSnapshotJobIndex, lastSnapshotSuperstep,
                                 ckpHook, failed);
                         runClearState(deploymentId, jobGen);
@@ -196,8 +199,8 @@
                         .equals(currentInputPaths[0])));
     }
 
-    private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob) {
-        return JobGenFactory.createJobGen(planChoice, currentJob);
+    private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob, IOptimizer optimizer) {
+        return JobGenFactory.createJobGen(planChoice, currentJob, optimizer);
     }
 
     private long loadData(PregelixJob currentJob, JobGen jobGen, DeploymentId deploymentId) throws IOException,
@@ -277,6 +280,9 @@
                 loadData(job, jobGen, deploymentId);
             }
         }
+        // TODO how long should the hook persist? One per job?  Or one per recovery attempt?
+        // since the hook shouldn't be stateful, we do one per recovery attempt
+        IIterationCompleteReporterHook itCompleteHook = BspUtils.createIterationCompleteHook(job.getConfiguration());
         int i = doRecovery ? snapshotSuperstep.get() + 1 : 1;
         int ckpInterval = BspUtils.getCheckpointingInterval(job.getConfiguration());
         boolean terminate = false;
@@ -294,6 +300,7 @@
                 snapshotJobIndex.set(currentJobIndex);
                 snapshotSuperstep.set(i);
             }
+            itCompleteHook.completeIteration(i, job);
             i++;
         } while (!terminate);
     }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index c1f6aae..163e476 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -47,6 +47,7 @@
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -100,6 +101,7 @@
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ClearStateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -140,6 +142,7 @@
     protected String jobId = UUID.randomUUID().toString();;
     protected int frameSize = ClusterConfig.getFrameSize();
     protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+    protected IOptimizer optimizer;
 
     private static final Map<String, String> MERGE_POLICY_PROPERTIES;
     static {
@@ -150,18 +153,19 @@
     protected static final String SECONDARY_INDEX_ODD = "secondary1";
     protected static final String SECONDARY_INDEX_EVEN = "secondary2";
 
-    public JobGen(PregelixJob job) {
-        init(job);
-    }
-    
-    public JobGen(PregelixJob job, String jobId) {
-        if(jobId!=null){
-            this.jobId = jobId;
-        }
-        init(job);
+    public JobGen(PregelixJob job, IOptimizer optimizer) {
+        init(job, optimizer);
     }
 
-    private void init(PregelixJob job) {
+    public JobGen(PregelixJob job, String jobId, IOptimizer optimizer) {
+        if (jobId != null) {
+            this.jobId = jobId;
+        }
+        init(job, optimizer);
+    }
+
+    private void init(PregelixJob job, IOptimizer optimizer) {
+        this.optimizer = optimizer;
         conf = job.getConfiguration();
         pregelixJob = job;
         initJobConfiguration();
@@ -178,7 +182,7 @@
     }
 
     public void reset(PregelixJob job) {
-        init(job);
+        init(job, this.optimizer);
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -232,12 +236,12 @@
 
         int[] keyFields = new int[1];
         keyFields[0] = 0;
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
         TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
                 storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
                 keyFields, getIndexDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
                 NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, btreeCreate);
+        setLocationConstraint(spec, btreeCreate);
         spec.setFrameSize(frameSize);
         return spec;
     }
@@ -258,7 +262,7 @@
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
         JobSpecification spec = new JobSpecification();
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         /**
          * the graph file scan operator and use count constraint first, will use
@@ -277,7 +281,7 @@
         String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
         VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
                 readSchedule, confFactory);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * construct sort operator
@@ -289,7 +293,7 @@
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
                 nkmFactory, comparatorFactories, recordDescriptor);
-        ClusterConfig.setLocationConstraint(spec, sorter);
+        setLocationConstraint(spec, sorter);
 
         /**
          * construct write file operator
@@ -336,7 +340,7 @@
         RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
         ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
                 keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct btree search operator
@@ -346,14 +350,14 @@
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
         BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
                 storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
                 null, null, true, true, getIndexDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * construct write file operator
@@ -463,7 +467,7 @@
     public JobSpecification generateClearState() throws HyracksException {
         JobSpecification spec = new JobSpecification();
         ClearStateOperatorDescriptor clearState = new ClearStateOperatorDescriptor(spec, jobId);
-        ClusterConfig.setLocationConstraint(spec, clearState);
+        setLocationConstraint(spec, clearState);
         spec.addRoot(clearState);
         return spec;
     }
@@ -477,11 +481,11 @@
     protected JobSpecification dropIndex(String indexName) throws HyracksException {
         JobSpecification spec = new JobSpecification();
 
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, indexName);
         IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
                 lcManagerProvider, fileSplitProvider, getIndexDataflowHelperFactory());
 
-        ClusterConfig.setLocationConstraint(spec, drop);
+        setLocationConstraint(spec, drop);
         spec.addRoot(drop);
         spec.setFrameSize(frameSize);
         return spec;
@@ -515,7 +519,7 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         JobSpecification spec = new JobSpecification();
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         /**
          * the graph file scan operator and use count constraint first, will use
@@ -537,7 +541,7 @@
         String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
         VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
                 readSchedule, confFactory);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * construct sort operator
@@ -549,7 +553,7 @@
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
                 nkmFactory, comparatorFactories, recordDescriptor);
-        ClusterConfig.setLocationConstraint(spec, sorter);
+        setLocationConstraint(spec, sorter);
 
         /**
          * construct tree bulk load operator
@@ -564,7 +568,7 @@
                 storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
                 sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, true, 0, false,
                 getIndexDataflowHelperFactory(), NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+        setLocationConstraint(spec, btreeBulkLoad);
 
         /**
          * connect operator descriptors
@@ -596,7 +600,7 @@
         RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
         ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
                 keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct btree search operator
@@ -606,7 +610,7 @@
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
@@ -615,7 +619,7 @@
         BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
                 storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
                 null, null, true, true, getIndexDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         ExternalSortOperatorDescriptor sort = null;
         if (!ckpointing) {
@@ -625,7 +629,7 @@
             sortCmpFactories[0] = JobGenUtil.getFinalBinaryComparatorFactory(vertexIdClass);
             sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields, nkmFactory, sortCmpFactories,
                     recordDescriptor);
-            ClusterConfig.setLocationConstraint(spec, scanner);
+            setLocationConstraint(spec, scanner);
         }
 
         /**
@@ -636,7 +640,7 @@
                 conf, vertexIdClass.getName(), vertexClass.getName());
         VertexFileWriteOperatorDescriptor writer = new VertexFileWriteOperatorDescriptor(spec, confFactory,
                 inputRdFactory, preHookFactory);
-        ClusterConfig.setLocationConstraint(spec, writer);
+        setLocationConstraint(spec, writer);
 
         /**
          * connect operator descriptors
@@ -681,14 +685,14 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
                 false, jobId, lastSuccessfulIteration + 1);
-        ClusterConfig.setLocationConstraint(spec, materializeRead);
+        setLocationConstraint(spec, materializeRead);
 
         String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration);;
         PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
@@ -700,7 +704,7 @@
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
                 conf, vertexIdClass.getName(), MsgList.class.getName());
         HDFSFileWriteOperatorDescriptor hdfsWriter = new HDFSFileWriteOperatorDescriptor(spec, tmpJob, inputRdFactory);
-        ClusterConfig.setLocationConstraint(spec, hdfsWriter);
+        setLocationConstraint(spec, hdfsWriter);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, materializeRead, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, hdfsWriter, 0);
@@ -743,7 +747,7 @@
         String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
         HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
                 readSchedule, new KeyValueParserFactory());
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /** construct the sort operator to sort message states */
         int[] keyFields = new int[] { 0 };
@@ -752,24 +756,24 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastCheckpointedIteration, vertexIdClass);
         ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, sort);
+        setLocationConstraint(spec, sort);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec,
                 recordDescriptor, jobId, lastCheckpointedIteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new RecoveryRuntimeHookFactory(jobId, lastCheckpointedIteration, new ConfigurationFactory(
                         pregelixJob.getConfiguration())));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink);
+        setLocationConstraint(spec, emptySink);
 
         /**
          * connect operator descriptors
@@ -807,7 +811,7 @@
          */
         List<JobSpecification> list = new ArrayList<JobSpecification>();
         list.add(bulkLoadLiveVertexBTree(iteration));
-        JobGen jobGen = new JobGenInnerJoin(pregelixJob, jobId);
+        JobGen jobGen = new JobGenInnerJoin(pregelixJob, jobId, optimizer);
         return Pair.of(list, jobGen);
     }
 
@@ -827,12 +831,12 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct btree search and function call update operator
          */
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
         RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
@@ -852,12 +856,12 @@
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 getIndexDataflowHelperFactory(), inputRdFactory, 1, new ExtractLiveVertexIdFunctionFactory(),
                 preHookFactory, null, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * construct bulk-load index operator
          */
-        IFileSplitProvider secondaryFileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
+        IFileSplitProvider secondaryFileSplitProvider = getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
         int[] fieldPermutation = new int[] { 0, 1 };
         int[] keyFields = new int[] { 0 };
         IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
@@ -866,7 +870,7 @@
         TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
                 storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
                 fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
-        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+        setLocationConstraint(spec, btreeBulkLoad);
 
         /** connect job spec */
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
@@ -876,4 +880,25 @@
         return spec;
     }
 
+    /**
+     * set the location constraint for operators
+     * 
+     * @param spec
+     * @param operator
+     */
+    public void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator) {
+        optimizer.setOptimizedLocationConstraints(spec, operator);
+    }
+    
+    /**
+     * get the file split provider
+     * 
+     * @param jobId
+     * @param indexName
+     * @return the IFileSplitProvider instance
+     */
+    public IFileSplitProvider getFileSplitProvider(String jobId, String indexName){
+        return optimizer.getOptimizedFileSplitProvider(jobId, indexName);
+    }
+
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
index ed580de..cbc9c81 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
@@ -17,26 +17,27 @@
 
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.core.base.IDriver.Plan;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
 
 public class JobGenFactory {
 
-    public static JobGen createJobGen(Plan planChoice, PregelixJob currentJob) {
+    public static JobGen createJobGen(Plan planChoice, PregelixJob currentJob, IOptimizer optimizer) {
         JobGen jobGen = null;
         switch (planChoice) {
             case INNER_JOIN:
-                jobGen = new JobGenInnerJoin(currentJob);
+                jobGen = new JobGenInnerJoin(currentJob, optimizer);
                 break;
             case OUTER_JOIN:
-                jobGen = new JobGenOuterJoin(currentJob);
+                jobGen = new JobGenOuterJoin(currentJob, optimizer);
                 break;
             case OUTER_JOIN_SORT:
-                jobGen = new JobGenOuterJoinSort(currentJob);
+                jobGen = new JobGenOuterJoinSort(currentJob, optimizer);
                 break;
             case OUTER_JOIN_SINGLE_SORT:
-                jobGen = new JobGenOuterJoinSingleSort(currentJob);
+                jobGen = new JobGenOuterJoinSingleSort(currentJob, optimizer);
                 break;
             default:
-                jobGen = new JobGenInnerJoin(currentJob);
+                jobGen = new JobGenInnerJoin(currentJob, optimizer);
         }
         return jobGen;
     }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 7bdb069..f838fb8 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -63,6 +63,7 @@
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -93,12 +94,12 @@
 public class JobGenInnerJoin extends JobGen {
     private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
 
-    public JobGenInnerJoin(PregelixJob job) {
-        super(job);
+    public JobGenInnerJoin(PregelixJob job, IOptimizer optimizer) {
+        super(job, optimizer);
     }
     
-    public JobGenInnerJoin(PregelixJob job, String jobId) {
-        super(job, jobId);
+    public JobGenInnerJoin(PregelixJob job, String jobId, IOptimizer optimizer) {
+        super(job, jobId, optimizer);
     }
 
     protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
@@ -113,18 +114,18 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+        setLocationConstraint(spec, preSuperStep);
 
         /**
          * construct drop index operator
          */
-        IFileSplitProvider secondaryFileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider secondaryFileSplitProvider = getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         /**
          * construct btree search and function call update operator
@@ -159,7 +160,7 @@
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 getIndexDataflowHelperFactory(), inputRdFactory, 6, new StartComputeUpdateFunctionFactory(confFactory),
                 preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * termination state write operator
@@ -188,7 +189,7 @@
         TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
                 storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
                 fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
-        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+        setLocationConstraint(spec, btreeBulkLoad);
 
         /**
          * construct local sort operator
@@ -199,7 +200,7 @@
                 .getClass());
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, localSort);
+        setLocationConstraint(spec, localSort);
 
         /**
          * construct local pre-clustered group-by operator
@@ -208,7 +209,7 @@
                 false, false);
         ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
-        ClusterConfig.setLocationConstraint(spec, localGby);
+        setLocationConstraint(spec, localGby);
 
         /**
          * construct global group-by operator
@@ -217,25 +218,25 @@
                 conf, true, true);
         ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, globalGby);
+        setLocationConstraint(spec, globalGby);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
                 jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         /**
          * do pre- & post- super step
          */
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PostSuperStepRuntimeHookFactory(jobId));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink);
+        setLocationConstraint(spec, emptySink);
 
         /**
          * add the insert operator to insert vertexes
@@ -244,7 +245,7 @@
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, insertOp);
+        setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
@@ -254,15 +255,15 @@
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
                 getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink3);
+        setLocationConstraint(spec, emptySink3);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink4);
+        setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
@@ -330,7 +331,7 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct pre-superstep
@@ -338,20 +339,20 @@
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
         RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+        setLocationConstraint(spec, preSuperStep);
 
         /**
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
                 true, jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materializeRead);
+        setLocationConstraint(spec, materializeRead);
 
         /**
          * construct the index-set-union operator
          */
         String readFile = iteration % 2 == 0 ? SECONDARY_INDEX_ODD : SECONDARY_INDEX_EVEN;
-        IFileSplitProvider secondaryFileSplitProviderRead = ClusterConfig.getFileSplitProvider(jobId, readFile);
+        IFileSplitProvider secondaryFileSplitProviderRead = getFileSplitProvider(jobId, readFile);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
@@ -359,12 +360,12 @@
         IndexNestedLoopJoinOperatorDescriptor setUnion = new IndexNestedLoopJoinOperatorDescriptor(spec, rdFinal,
                 storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, typeTraits,
                 comparatorFactories, true, keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true);
-        ClusterConfig.setLocationConstraint(spec, setUnion);
+        setLocationConstraint(spec, setUnion);
 
         /**
          * construct index-join-function-update operator
          */
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
                 VLongWritable.class.getName());
         RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
@@ -379,7 +380,7 @@
                 JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
                 getIndexDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
                 preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, join);
+        setLocationConstraint(spec, join);
 
         /**
          * construct bulk-load index operator
@@ -389,12 +390,12 @@
         indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
                 WritableComparator.get(vertexIdClass).getClass());
         String writeFile = iteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
-        IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
+        IFileSplitProvider secondaryFileSplitProviderWrite = getFileSplitProvider(jobId, writeFile);
         TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
                 storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
                 indexCmpFactories, fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR,
                 getIndexDataflowHelperFactory());
-        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+        setLocationConstraint(spec, btreeBulkLoad);
 
         /**
          * construct local sort operator
@@ -405,7 +406,7 @@
                 .getClass());
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, localSort);
+        setLocationConstraint(spec, localSort);
 
         /**
          * construct local pre-clustered group-by operator
@@ -414,7 +415,7 @@
                 false, false);
         ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
-        ClusterConfig.setLocationConstraint(spec, localGby);
+        setLocationConstraint(spec, localGby);
 
         /**
          * construct global group-by operator
@@ -423,23 +424,23 @@
                 conf, true, true);
         ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, globalGby);
+        setLocationConstraint(spec, globalGby);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
                 jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PostSuperStepRuntimeHookFactory(jobId));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink);
+        setLocationConstraint(spec, emptySink);
 
         /**
          * termination state write operator
@@ -464,7 +465,7 @@
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, insertOp);
+        setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
@@ -474,15 +475,15 @@
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
                 getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink3);
+        setLocationConstraint(spec, emptySink3);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink4);
+        setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
@@ -596,7 +597,7 @@
         String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
         HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
                 readSchedule, new KeyValueParserFactory());
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /** construct the sort operator to sort message states */
         int[] keyFields = new int[] { 0 };
@@ -605,7 +606,7 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration, vertexIdClass);
         ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, sort);
+        setLocationConstraint(spec, sort);
 
         /**
          * construct bulk-load index operator
@@ -618,12 +619,12 @@
         indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration + 1, WritableComparator
                 .get(vertexIdClass).getClass());
         String writeFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
-        IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
+        IFileSplitProvider secondaryFileSplitProviderWrite = getFileSplitProvider(jobId, writeFile);
         TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
                 storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
                 indexCmpFactories, fieldPermutation, new int[] { 0 }, DEFAULT_BTREE_FILL_FACTOR,
                 getIndexDataflowHelperFactory());
-        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+        setLocationConstraint(spec, btreeBulkLoad);
 
         /**
          * connect operator descriptors
@@ -648,7 +649,7 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
         Class<? extends Writable> msgListClass = MsgList.class;
         String readFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
-        IFileSplitProvider secondaryFileSplitProviderRead = ClusterConfig.getFileSplitProvider(jobId, readFile);
+        IFileSplitProvider secondaryFileSplitProviderRead = getFileSplitProvider(jobId, readFile);
         JobSpecification spec = new JobSpecification();
         /**
          * construct empty tuple operator
@@ -663,7 +664,7 @@
         RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
         ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
                 keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct btree search operator
@@ -681,7 +682,7 @@
                 storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, typeTraits,
                 comparatorFactories, null, null, null, true, true, getIndexDataflowHelperFactory(), false,
                 NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * construct write file operator
@@ -689,7 +690,7 @@
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
                 conf, vertexIdClass.getName(), MsgList.class.getName());
         HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, job, inputRdFactory);
-        ClusterConfig.setLocationConstraint(spec, writer);
+        setLocationConstraint(spec, writer);
 
         /**
          * connect operator descriptors
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 68e6706..39a56bf 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -41,7 +41,7 @@
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -69,12 +69,12 @@
 
 public class JobGenOuterJoin extends JobGen {
 
-    public JobGenOuterJoin(PregelixJob job) {
-        super(job);
+    public JobGenOuterJoin(PregelixJob job, IOptimizer optimizer) {
+        super(job, optimizer);
     }
-    
-    public JobGenOuterJoin(PregelixJob job, String jobId) {
-        super(job, jobId);
+
+    public JobGenOuterJoin(PregelixJob job, String jobId, IOptimizer optimizer) {
+        super(job, jobId, optimizer);
     }
 
     @Override
@@ -90,12 +90,12 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+        setLocationConstraint(spec, preSuperStep);
 
         /**
          * construct btree search function update operator
@@ -104,7 +104,7 @@
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);;
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
@@ -129,7 +129,7 @@
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
                 preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * construct local sort operator
@@ -140,7 +140,7 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, localSort);
+        setLocationConstraint(spec, localSort);
 
         /**
          * construct local pre-clustered group-by operator
@@ -149,7 +149,7 @@
                 false, false);
         ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
-        ClusterConfig.setLocationConstraint(spec, localGby);
+        setLocationConstraint(spec, localGby);
 
         /**
          * construct global group-by operator
@@ -160,22 +160,22 @@
                 conf, true, true);
         ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, globalGby);
+        setLocationConstraint(spec, globalGby);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
                 jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PostSuperStepRuntimeHookFactory(jobId));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink2);
+        setLocationConstraint(spec, emptySink2);
 
         /**
          * termination state write operator
@@ -202,7 +202,7 @@
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, insertOp);
+        setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
@@ -212,14 +212,14 @@
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
                 getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink4);
+        setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
         /** connect all operators **/
@@ -286,7 +286,7 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct pre-superstep hook
@@ -294,19 +294,19 @@
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
         RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+        setLocationConstraint(spec, preSuperStep);
 
         /**
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
                 true, jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materializeRead);
+        setLocationConstraint(spec, materializeRead);
 
         /**
          * construct index join function update operator
          */
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
@@ -329,7 +329,7 @@
                 getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate, rdInsert, rdDelete);
-        ClusterConfig.setLocationConstraint(spec, join);
+        setLocationConstraint(spec, join);
 
         /**
          * construct local sort operator
@@ -339,7 +339,7 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, localSort);
+        setLocationConstraint(spec, localSort);
 
         /**
          * construct local pre-clustered group-by operator
@@ -348,7 +348,7 @@
                 false, false);
         ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
-        ClusterConfig.setLocationConstraint(spec, localGby);
+        setLocationConstraint(spec, localGby);
 
         /**
          * construct global group-by operator
@@ -357,23 +357,23 @@
                 conf, true, true);
         ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, globalGby);
+        setLocationConstraint(spec, globalGby);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
                 jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PostSuperStepRuntimeHookFactory(jobId));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink);
+        setLocationConstraint(spec, emptySink);
 
         /**
          * termination state write operator
@@ -399,7 +399,7 @@
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, insertOp);
+        setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
@@ -409,15 +409,15 @@
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
                 getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink3);
+        setLocationConstraint(spec, emptySink3);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink4);
+        setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 3e4b213..c10e6c2 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -41,7 +41,7 @@
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -69,12 +69,12 @@
 
 public class JobGenOuterJoinSingleSort extends JobGen {
 
-    public JobGenOuterJoinSingleSort(PregelixJob job) {
-        super(job);
+    public JobGenOuterJoinSingleSort(PregelixJob job, IOptimizer optimizer) {
+        super(job, optimizer);
     }
-    
-    public JobGenOuterJoinSingleSort(PregelixJob job, String jobId) {
-        super(job, jobId);
+
+    public JobGenOuterJoinSingleSort(PregelixJob job, String jobId, IOptimizer optimizer) {
+        super(job, jobId, optimizer);
     }
 
     @Override
@@ -90,12 +90,12 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+        setLocationConstraint(spec, preSuperStep);
 
         /**
          * construct btree search operator
@@ -104,7 +104,7 @@
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
@@ -132,7 +132,7 @@
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
                 preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * construct global sort operator
@@ -144,7 +144,7 @@
                 .getClass());
         ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, globalSort);
+        setLocationConstraint(spec, globalSort);
 
         /**
          * construct global group-by operator
@@ -155,22 +155,22 @@
                 conf, true, false);
         ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, globalGby);
+        setLocationConstraint(spec, globalGby);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
                 jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PostSuperStepRuntimeHookFactory(jobId));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink2);
+        setLocationConstraint(spec, emptySink2);
 
         /**
          * termination state write operator
@@ -196,7 +196,7 @@
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, insertOp);
+        setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
@@ -206,15 +206,15 @@
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
                 getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink3);
+        setLocationConstraint(spec, emptySink3);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink4);
+        setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
@@ -277,7 +277,7 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct pre-superstep hook
@@ -285,19 +285,19 @@
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
         RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+        setLocationConstraint(spec, preSuperStep);
 
         /**
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
                 true, jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materializeRead);
+        setLocationConstraint(spec, materializeRead);
 
         /**
          * construct index join function update operator
          */
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
@@ -320,7 +320,7 @@
                 getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate, rdInsert, rdDelete);
-        ClusterConfig.setLocationConstraint(spec, join);
+        setLocationConstraint(spec, join);
 
         /**
          * construct global sort operator
@@ -331,7 +331,7 @@
                 .getClass());
         ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, globalSort);
+        setLocationConstraint(spec, globalSort);
 
         /**
          * construct global group-by operator
@@ -340,23 +340,23 @@
                 conf, true, false);
         ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, globalGby);
+        setLocationConstraint(spec, globalGby);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
                 jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PostSuperStepRuntimeHookFactory(jobId));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink);
+        setLocationConstraint(spec, emptySink);
 
         /**
          * termination state write operator
@@ -379,7 +379,7 @@
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, insertOp);
+        setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
@@ -389,15 +389,15 @@
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
                 getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink3);
+        setLocationConstraint(spec, emptySink3);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink4);
+        setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 7ca771c..953d82c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -41,7 +41,7 @@
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
@@ -69,8 +69,8 @@
 
 public class JobGenOuterJoinSort extends JobGen {
 
-    public JobGenOuterJoinSort(PregelixJob job) {
-        super(job);
+    public JobGenOuterJoinSort(PregelixJob job, IOptimizer optimizer) {
+        super(job, optimizer);
     }
 
     @Override
@@ -86,12 +86,12 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+        setLocationConstraint(spec, preSuperStep);
 
         /**
          * construct btree search function update operator
@@ -100,7 +100,7 @@
                 vertexIdClass.getName(), vertexClass.getName());
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
 
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
@@ -125,7 +125,7 @@
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 getIndexDataflowHelperFactory(), inputRdFactory, 5, new StartComputeUpdateFunctionFactory(confFactory),
                 preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
-        ClusterConfig.setLocationConstraint(spec, scanner);
+        setLocationConstraint(spec, scanner);
 
         /**
          * construct local sort operator
@@ -137,7 +137,7 @@
                 .getClass());
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, localSort);
+        setLocationConstraint(spec, localSort);
 
         /**
          * construct local pre-clustered group-by operator
@@ -146,14 +146,14 @@
                 false, false);
         ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
-        ClusterConfig.setLocationConstraint(spec, localGby);
+        setLocationConstraint(spec, localGby);
 
         /**
          * construct global sort operator
          */
         ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, globalSort);
+        setLocationConstraint(spec, globalSort);
 
         /**
          * construct global group-by operator
@@ -164,22 +164,22 @@
                 conf, true, true);
         ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, globalGby);
+        setLocationConstraint(spec, globalGby);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
                 jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PostSuperStepRuntimeHookFactory(jobId));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink2);
+        setLocationConstraint(spec, emptySink2);
 
         /**
          * termination state write operator
@@ -206,7 +206,7 @@
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, insertOp);
+        setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
@@ -216,15 +216,15 @@
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
                 getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink3);
+        setLocationConstraint(spec, emptySink3);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink4);
+        setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
         /** connect all operators **/
@@ -287,7 +287,7 @@
          * construct empty tuple operator
          */
         EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+        setLocationConstraint(spec, emptyTupleSource);
 
         /**
          * construct pre-superstep hook
@@ -295,19 +295,19 @@
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
         RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PreSuperStepRuntimeHookFactory(jobId, confFactory));
-        ClusterConfig.setLocationConstraint(spec, preSuperStep);
+        setLocationConstraint(spec, preSuperStep);
 
         /**
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
                 true, jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materializeRead);
+        setLocationConstraint(spec, materializeRead);
 
         /**
          * construct index join function update operator
          */
-        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
@@ -330,7 +330,7 @@
                 getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate, rdInsert, rdDelete);
-        ClusterConfig.setLocationConstraint(spec, join);
+        setLocationConstraint(spec, join);
 
         /**
          * construct local sort operator
@@ -341,7 +341,7 @@
                 .getClass());
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, localSort);
+        setLocationConstraint(spec, localSort);
 
         /**
          * construct local pre-clustered group-by operator
@@ -350,14 +350,14 @@
                 false, false);
         ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
-        ClusterConfig.setLocationConstraint(spec, localGby);
+        setLocationConstraint(spec, localGby);
 
         /**
          * construct global sort operator
          */
         ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
-        ClusterConfig.setLocationConstraint(spec, globalSort);
+        setLocationConstraint(spec, globalSort);
 
         /**
          * construct global group-by operator
@@ -366,23 +366,23 @@
                 conf, true, true);
         ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
-        ClusterConfig.setLocationConstraint(spec, globalGby);
+        setLocationConstraint(spec, globalGby);
 
         /**
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
                 jobId, iteration);
-        ClusterConfig.setLocationConstraint(spec, materialize);
+        setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
                 new PostSuperStepRuntimeHookFactory(jobId));
-        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+        setLocationConstraint(spec, postSuperStep);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink);
+        setLocationConstraint(spec, emptySink);
 
         /**
          * termination state write operator
@@ -408,7 +408,7 @@
                 spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
                 null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, insertOp);
+        setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
@@ -418,15 +418,15 @@
                 spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
                 getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
-        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink3);
+        setLocationConstraint(spec, emptySink3);
 
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
-        ClusterConfig.setLocationConstraint(spec, emptySink4);
+        setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 5c1a4b8..d1f8b65 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -57,6 +57,7 @@
     private static Scheduler hdfsScheduler;
     private static Set<String> blackListNodes = new HashSet<String>();
     private static IHyracksClientConnection hcc;
+    private static final int DEFAULT_CC_HTTP_PORT = 16001;
 
     /**
      * let tests set config path to be whatever
@@ -84,7 +85,7 @@
     }
 
     /**
-     * get file split provider
+     * get file split provider, for test only
      * 
      * @param jobId
      * @return
@@ -126,6 +127,14 @@
         return Integer.parseInt(clusterProperties.getProperty("FRAME_SIZE"));
     }
 
+    public static int getCCHTTPort() {
+        try { // TODO should we really provide a default value?
+            return Integer.parseInt(clusterProperties.getProperty("CC_HTTPPORT"));
+        } catch (NumberFormatException e) {
+            return DEFAULT_CC_HTTP_PORT;
+        }
+    }
+
     /**
      * set location constraint
      * 
@@ -175,26 +184,6 @@
      * @param operator
      * @throws HyracksDataException
      */
-    public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator)
-            throws HyracksException {
-        int count = 0;
-        String[] locations = new String[NCs.length * stores.length];
-        for (String nc : NCs) {
-            for (int i = 0; i < stores.length; i++) {
-                locations[count] = nc;
-                count++;
-            }
-        }
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
-    }
-
-    /**
-     * set location constraint
-     * 
-     * @param spec
-     * @param operator
-     * @throws HyracksDataException
-     */
     public static void setCountConstraint(JobSpecification spec, IOperatorDescriptor operator) throws HyracksException {
         int count = NCs.length * stores.length;
         PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);
@@ -255,11 +244,35 @@
         }
         return locations;
     }
+    
+    /**
+     * set the default location constraint
+     * 
+     * @param spec
+     * @param operator
+     * @throws HyracksDataException
+     */
+    public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator)
+            throws HyracksException {
+        int count = 0;
+        String[] locations = new String[NCs.length * stores.length];
+        for (String nc : NCs) {
+            for (int i = 0; i < stores.length; i++) {
+                locations[count] = nc;
+                count++;
+            }
+        }
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
+    }
 
     public static String[] getNCNames() {
         return NCs;
     }
 
+    public static String[] getStores() {
+        return stores;
+    }
+
     public static void addToBlackListNodes(Collection<String> nodes) {
         blackListNodes.addAll(nodes);
     }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
index 01fc81b..064ca42 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
@@ -15,14 +15,105 @@
 
 package edu.uci.ics.pregelix.core.optimizer;
 
-import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.IntWritable;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.client.stats.Counters;
+import edu.uci.ics.hyracks.client.stats.IClusterCounterContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.pregelix.core.jobgen.JobGen;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
 
 public class DynamicOptimizer implements IOptimizer {
 
+    private IClusterCounterContext counterContext;
+    private Map<String, IntWritable> machineToDegreeOfParallelism = new HashMap<String, IntWritable>();
+    private int dop = 0;
+
+    public DynamicOptimizer(IClusterCounterContext counterContext) {
+        this.counterContext = counterContext;
+    }
+
     @Override
-    public JobGen optimize(ICounterContext counterContext, JobGen jobGen, int iteration) {
-        return jobGen;
+    public JobGen optimize(JobGen jobGen, int iteration) {
+        try {
+            initializeLoadPerMachine();
+            return jobGen;
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public void setOptimizedLocationConstraints(JobSpecification spec, IOperatorDescriptor operator) {
+        try {
+            String[] constraints = new String[dop];
+            int index = 0;
+            for (Entry<String, IntWritable> entry : machineToDegreeOfParallelism.entrySet()) {
+                String loc = entry.getKey();
+                IntWritable count = machineToDegreeOfParallelism.get(loc);
+                for (int j = 0; j < count.get(); j++) {
+                    constraints[index++] = loc;
+                }
+            }
+            PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, constraints);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public IFileSplitProvider getOptimizedFileSplitProvider(String jobId, String indexName) {
+        FileSplit[] fileSplits = new FileSplit[dop];
+        String[] stores = ClusterConfig.getStores();
+        int splitIndex = 0;
+        for (Entry<String, IntWritable> entry : machineToDegreeOfParallelism.entrySet()) {
+            String ncName = entry.getKey();
+            IntWritable count = machineToDegreeOfParallelism.get(ncName);
+            for (int j = 0; j < count.get(); j++) {
+                //cycles stores, each machine has the number of stores = the number of cores
+                int storeCursor = j % stores.length;
+                String st = stores[storeCursor];
+                FileSplit split = new FileSplit(ncName, st + File.separator + ncName + "-data" + File.separator + jobId
+                        + File.separator + indexName + (j / stores.length));
+                fileSplits[splitIndex++] = split;
+            }
+        }
+        return new ConstantFileSplitProvider(fileSplits);
+    }
+
+    /**
+     * initialize the load-per-machine map
+     * 
+     * @return the degree of parallelism
+     * @throws HyracksException
+     */
+    private int initializeLoadPerMachine() throws HyracksException {
+        machineToDegreeOfParallelism.clear();
+        String[] locationConstraints = ClusterConfig.getLocationConstraint();
+        for (String loc : locationConstraints) {
+            machineToDegreeOfParallelism.put(loc, new IntWritable(0));
+        }
+        dop = 0;
+        for (Entry<String, IntWritable> entry : machineToDegreeOfParallelism.entrySet()) {
+            String loc = entry.getKey();
+            //reserve one core for heartbeat
+            int load = (int) counterContext.getCounter(Counters.NUM_PROCESSOR, false).get() - 1;
+            IntWritable count = machineToDegreeOfParallelism.get(loc);
+            count.set(load);
+            dop += load;
+        }
+        return dop;
     }
 
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
index b5913c4..c8856aa 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
@@ -15,11 +15,17 @@
 
 package edu.uci.ics.pregelix.core.optimizer;
 
-import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.pregelix.core.jobgen.JobGen;
 
 public interface IOptimizer {
 
-    public JobGen optimize(ICounterContext counterContext, JobGen jobGen, int iteration);
-    
+    public JobGen optimize(JobGen jobGen, int iteration);
+
+    public void setOptimizedLocationConstraints(JobSpecification spec, IOperatorDescriptor operator);
+
+    public IFileSplitProvider getOptimizedFileSplitProvider(String jobId, String indexName);
+
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/NoOpOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/NoOpOptimizer.java
new file mode 100644
index 0000000..cd0ca37
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/NoOpOptimizer.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.optimizer;
+
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.pregelix.core.jobgen.JobGen;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+
+public class NoOpOptimizer implements IOptimizer {
+
+    @Override
+    public JobGen optimize(JobGen jobGen, int iteration) {
+        return jobGen;
+    }
+
+    @Override
+    public void setOptimizedLocationConstraints(JobSpecification spec, IOperatorDescriptor operator) {
+        try {
+            ClusterConfig.setLocationConstraint(spec, operator);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public IFileSplitProvider getOptimizedFileSplitProvider(String jobId, String indexName) {
+        try {
+           return ClusterConfig.getFileSplitProvider(jobId, indexName);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 6de65ca..d834868 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.pregelix.dataflow.util;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -36,7 +37,7 @@
 import edu.uci.ics.pregelix.dataflow.context.TaskIterationID;
 
 public class IterationUtils {
-    public static final String TMP_DIR = "/tmp/";
+    public static final String TMP_DIR = BspUtils.TMP_DIR;
 
     public static void setIterationState(IHyracksTaskContext ctx, String pregelixJobId, int partition, int iteration,
             IStateObject state) {
@@ -143,28 +144,13 @@
     }
 
     public static Writable readGlobalAggregateValue(Configuration conf, String jobId, String aggClassName)
-            throws HyracksDataException {
-        try {
-            FileSystem dfs = FileSystem.get(conf);
-            String pathStr = IterationUtils.TMP_DIR + jobId + "agg";
-            Path path = new Path(pathStr);
-            FSDataInputStream input = dfs.open(path);
-            int numOfAggs = BspUtils.createFinalAggregateValues(conf).size();
-            for (int i = 0; i < numOfAggs; i++) {
-                String aggName = input.readUTF();
-                Writable agg = BspUtils.createFinalAggregateValue(conf, aggName);
-                if (aggName.equals(aggClassName)) {
-                    agg.readFields(input);
-                    input.close();
-                    return agg;
-                } else {
-                    agg.readFields(input);
-                }
-            }
-            throw new IllegalStateException("Cannot find the aggregate value for " + aggClassName);
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
+    throws HyracksDataException {
+        return BspUtils.readGlobalAggregateValue(conf, jobId, aggClassName);
+    }
+    
+    public static HashMap<String, Writable> readAllGlobalAggregateValues(Configuration conf, String jobId)
+    throws HyracksDataException {
+        return BspUtils.readAllGlobalAggregateValues(conf, jobId);
     }
 
 }
diff --git a/pregelix/pregelix-dist/src/main/resources/conf/cluster.properties b/pregelix/pregelix-dist/src/main/resources/conf/cluster.properties
index f5d12a1..79f42ed 100644
--- a/pregelix/pregelix-dist/src/main/resources/conf/cluster.properties
+++ b/pregelix/pregelix-dist/src/main/resources/conf/cluster.properties
@@ -18,6 +18,9 @@
 #The CC port for Hyracks cluster management
 CC_CLUSTERPORT=1099
 
+#The CC port for REST communication
+CC_HTTPPORT=16001
+
 #The tmp directory for cc to install jars
 CCTMP_DIR=/tmp/t1
 
@@ -39,12 +42,15 @@
 #The frame size of the internal dataflow engine
 FRAME_SIZE=65536
 
+#The number of jobs whose logs are kept in-memory on the CC
+JOB_HISTORY_SIZE=0
+
 #CC JAVA_OPTS
-CCJAVA_OPTS="-Xmx1g -Djava.util.logging.config.file=logging.properties"
-# debug option: CCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+CCJAVA_OPTS="-Djava.util.logging.config.file=logging.properties"
+# debug option: CCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Djava.util.logging.config.file=logging.properties"
 # Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
 
 #NC JAVA_OPTS
-NCJAVA_OPTS="-Xmx1g -Djava.util.logging.config.file=logging.properties"
-# debug option: NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+NCJAVA_OPTS="-Djava.util.logging.config.file=logging.properties"
+# debug option: NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Djava.util.logging.config.file=logging.properties"
 # Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
diff --git a/pregelix/pregelix-dist/src/main/resources/scripts/startcc.sh b/pregelix/pregelix-dist/src/main/resources/scripts/startcc.sh
index d7a0ead..41026a6 100644
--- a/pregelix/pregelix-dist/src/main/resources/scripts/startcc.sh
+++ b/pregelix/pregelix-dist/src/main/resources/scripts/startcc.sh
@@ -49,17 +49,50 @@
 
 #Export JAVA_HOME and JAVA_OPTS
 export JAVA_HOME=$JAVA_HOME
-export JAVA_OPTS=$CCJAVA_OPTS
+
+#get the OS
+OS_NAME=`uname -a|awk '{print $1}'`
+LINUX_OS='Linux'
+
+if [ $OS_NAME = $LINUX_OS ];
+then
+        MEM_SIZE=`cat /proc/meminfo |grep MemTotal|awk '{print $2}'`
+	MEM_SIZE=$(($MEM_SIZE * 1000))
+else
+        MEM_SIZE=`sysctl -a | grep "hw.memsize ="|awk '{print $3}'`
+fi
+
+MEM_SIZE=$(($MEM_SIZE * 3 / 4))
+
+#Set JAVA_OPTS
+export JAVA_OPTS=$CCJAVA_OPTS" -Xmx"$MEM_SIZE
 
 PREGELIX_HOME=`pwd`
 
 #Enter the temp dir
 cd $CCTMP_DIR
 
-if [ -f "conf/topology.xml"  ]; then
-#Launch hyracks cc script with topology
-${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -heartbeat-period 5000 -max-heartbeat-lapse-periods 4 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "conf/topology.xml" &> $CCLOGS_DIR/cc.log &
-else
-#Launch hyracks cc script without toplogy
-${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -heartbeat-period 5000 -max-heartbeat-lapse-periods 4 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
+cmd=( "${PREGELIX_HOME}/bin/pregelixcc" )
+cmd+=( -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST
+       -heartbeat-period 5000 -max-heartbeat-lapse-periods 4 
+       -default-max-job-attempts 0 )
+
+if [ -n "$CC_CLIENTPORT" ]; then
+    cmd+=( -client-net-port $CC_CLIENTPORT )
 fi
+if [ -n "$CC_CLUSTERPORT" ]; then
+    cmd+=( -cluster-net-port $CC_CLUSTERPORT )
+fi
+if [ -n "$CC_HTTPPORT" ]; then
+    cmd+=( -http-port $CC_HTTPPORT )
+fi
+if [ -n "$JOB_HISTORY_SIZE" ]; then
+    cmd+=( -job-history-size $JOB_HISTORY_SIZE )
+fi
+if [ -f "${PREGELIX_HOME}/conf/topology.xml"  ]; then
+    cmd+=( -cluster-topology "${PREGELIX_HOME}/conf/topology.xml" )
+fi
+
+printf "\n\n\n********************************************\nStarting CC with command %s\n\n" "${cmd[*]}" >> "$CCLOGS_DIR/cc.log"
+#Start the pregelix CC
+${cmd[@]} >> "$CCLOGS_DIR/cc.log" 2>&1 &
diff --git a/pregelix/pregelix-dist/src/main/resources/scripts/startnc.sh b/pregelix/pregelix-dist/src/main/resources/scripts/startnc.sh
index 6727bd5..8e742ea 100644
--- a/pregelix/pregelix-dist/src/main/resources/scripts/startnc.sh
+++ b/pregelix/pregelix-dist/src/main/resources/scripts/startnc.sh
@@ -68,13 +68,33 @@
 #Get node ID
 NODEID=`hostname | cut -d '.' -f 1`
 
-#Set JAVA_OPTS
-export JAVA_OPTS=$NCJAVA_OPTS
-
 PREGELIX_HOME=`pwd`
 
 #Enter the temp dir
 cd $NCTMP_DIR
 
+#get the OS
+OS_NAME=`uname -a|awk '{print $1}'`
+LINUX_OS='Linux'
+
+if [ $OS_NAME = $LINUX_OS ];
+then
+	MEM_SIZE=`cat /proc/meminfo |grep MemTotal|awk '{print $2}'`
+	MEM_SIZE=$(($MEM_SIZE * 1000))
+else
+	MEM_SIZE=`sysctl -a | grep "hw.memsize ="|awk '{print $3}'`
+fi
+
+MEM_SIZE=$(($MEM_SIZE * 3 / 4))
+
+#Set JAVA_OPTS
+export JAVA_OPTS=$NCJAVA_OPTS" -Xmx"$MEM_SIZE
+
 #Launch hyracks nc
-${PREGELIX_HOME}/bin/pregelixnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR  -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+cmd=( "${PREGELIX_HOME}/bin/pregelixnc" )
+cmd+=( -cc-host $CCHOST -cc-port $CC_CLUSTERPORT 
+	   -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR
+	   -node-id $NODEID -iodevices "${IO_DIRS}" );
+
+printf "\n\n\n********************************************\nStarting NC with command %s\n\n" "${cmd[*]}" >> "$NCLOGS_DIR/$NODEID.log"
+${cmd[@]} >> "$NCLOGS_DIR/$NODEID.log" 2>&1 &
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
index 393c8c9..9fb0958 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
@@ -70,6 +70,9 @@
 
         @Option(name = "-ckp-interval", usage = "checkpointing interval -- for fault-tolerance", required = false)
         public int ckpInterval = -1;
+
+        @Option(name = "-dyn-opt", usage = "whether to enable dynamic optimization -- for better performance", required = false)
+        public String dynamicOptimization = "false";
     }
 
     public static void run(String[] args, PregelixJob job) throws Exception {
@@ -121,6 +124,7 @@
     }
 
     private static void setJobSpecificSettings(PregelixJob job, Options options) {
+        job.setEnableDynamicOptimization(Boolean.parseBoolean(options.dynamicOptimization));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, options.numVertices);
         job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
         job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID, options.sourceId);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java
index cf753bb..1fe7616 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java
@@ -133,7 +133,7 @@
 
     }
 
-    private static long readTriangleCountingResult(Configuration conf) {
+    protected static long readTriangleCountingResult(Configuration conf) {
         try {
             VLongWritable count = (VLongWritable) IterationUtils.readGlobalAggregateValue(conf,
                     BspUtils.getJobId(conf), TriangleCountingAggregator.class.getName());
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingWithAggregateHadoopCountersVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingWithAggregateHadoopCountersVertex.java
new file mode 100644
index 0000000..7cf2840
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingWithAggregateHadoopCountersVertex.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.trianglecounting;
+
+import org.apache.hadoop.mapreduce.Counters;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.HadoopCountersAggregator;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * The triangle counting example -- counting the triangles in an undirected graph.
+ */
+public class TriangleCountingWithAggregateHadoopCountersVertex extends TriangleCountingVertex {
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(TriangleCountingWithAggregateHadoopCountersVertex.class.getSimpleName());
+        job.setVertexClass(TriangleCountingWithAggregateHadoopCountersVertex.class);
+        job.addGlobalAggregatorClass(TriangleCountingAggregator.class);
+        job.setCounterAggregatorClass(TriangleHadoopCountersAggregator.class);
+        job.setVertexInputFormatClass(TextTriangleCountingInputFormat.class);
+        job.setVertexOutputFormatClass(TriangleCountingVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        Client.run(args, job);
+        System.out.println("triangle count in last iteration: " + readTriangleCountingResult(job.getConfiguration()));
+        System.out.println("aggregate counter (including all iterations):\n" + BspUtils.getCounters(job));
+    }
+
+    public static class TriangleHadoopCountersAggregator extends
+            HadoopCountersAggregator<VLongWritable, VLongWritable, VLongWritable, VLongWritable, Counters> {
+        private Counters counters;
+
+        @Override
+        public void init() {
+            counters = new Counters();
+        }
+
+        @Override
+        public void step(Vertex<VLongWritable, VLongWritable, VLongWritable, VLongWritable> v)
+                throws HyracksDataException {
+            counters.findCounter("TriangleCounting", "total-ids").increment(1);
+            counters.findCounter("TriangleCounting", "total-values").increment(v.getVertexValue().get());
+        }
+
+        @Override
+        public void step(Counters partialResult) {
+            counters.incrAllCounters(partialResult);
+        }
+
+        @Override
+        public Counters finishPartial() {
+            return counters;
+        }
+
+        @Override
+        public Counters finishFinal() {
+            return counters;
+        }
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
index b4e17b6..5855fd3 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
@@ -35,6 +35,8 @@
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
+import edu.uci.ics.pregelix.core.optimizer.NoOpOptimizer;
 import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
 import edu.uci.ics.pregelix.example.PageRankVertex;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimulatedPageRankVertexInputFormat;
@@ -83,7 +85,9 @@
         FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
         FileUtils.cleanDirectory(new File(EXPECT_RESULT_DIR));
         FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
-        giraphTestJobGen = new JobGenOuterJoin(job);
+
+        IOptimizer dynamicOptimizer = new NoOpOptimizer();
+        giraphTestJobGen = new JobGenOuterJoin(job, dynamicOptimizer);
     }
 
     private void cleanupStores() throws IOException {
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index 6ccefd2..c7eff1e 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -79,6 +79,7 @@
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
         job.setCheckpointHook(ConservativeCheckpointHook.class);
+        job.setEnableDynamicOptimization(true);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
@@ -95,6 +96,7 @@
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
         job.setCheckpointHook(ConservativeCheckpointHook.class);
+        job.setEnableDynamicOptimization(true);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
@@ -109,6 +111,7 @@
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
         job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID, 0);
+        job.setDynamicVertexValueSize(true);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
@@ -123,6 +126,7 @@
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
         job.setCheckpointHook(ConservativeCheckpointHook.class);
+        job.setEnableDynamicOptimization(true);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
@@ -137,6 +141,7 @@
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.setEnableDynamicOptimization(true);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
@@ -152,6 +157,7 @@
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+        job.setEnableDynamicOptimization(true);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
@@ -211,6 +217,7 @@
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
         job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID, 0);
+        job.setDynamicVertexValueSize(true);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
@@ -225,6 +232,7 @@
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.setEnableDynamicOptimization(true);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
@@ -237,6 +245,7 @@
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH3);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
+        job.setDynamicVertexValueSize(true);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/Record.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/Record.java
new file mode 100644
index 0000000..10f5829
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/Record.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.util;
+
+@SuppressWarnings("rawtypes")
+public class Record implements Comparable {
+
+    private String recordText;
+
+    public Record(String text) {
+        recordText = text;
+    }
+
+    @Override
+    public int hashCode() {
+        return recordText.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return recordText;
+    }
+
+    @Override
+    public int compareTo(Object o) {
+        if (!(o instanceof Record)) {
+            throw new IllegalStateException("uncomparable items");
+        }
+        Record record = (Record) o;
+        boolean equal = equalStrings(recordText, record.recordText);
+        if (equal) {
+            return 0;
+        } else {
+            return recordText.compareTo(record.recordText);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof Record)) {
+            return false;
+        }
+        Record record = (Record) o;
+        return equalStrings(recordText, record.recordText);
+    }
+
+    private boolean equalStrings(String s1, String s2) {
+        String[] rowsOne = s1.split("\n");
+        String[] rowsTwo = s2.split("\n");
+
+        if (rowsOne.length != rowsTwo.length)
+            return false;
+
+        for (int i = 0; i < rowsOne.length; i++) {
+            String row1 = rowsOne[i];
+            String row2 = rowsTwo[i];
+
+            if (row1.equals(row2))
+                continue;
+
+            boolean spaceOrTab = false;
+            spaceOrTab = row1.contains(" ");
+            String[] fields1 = spaceOrTab ? row1.split(" ") : row1.split("\t");
+            String[] fields2 = spaceOrTab ? row2.split(" ") : row2.split("\t");
+
+            for (int j = 0; j < fields1.length; j++) {
+                if (j >= fields2.length) {
+                    return false;
+                }
+                if (fields1[j].equals(fields2[j])) {
+                    continue;
+                } else if (fields1[j].indexOf('.') < 0) {
+                    return false;
+                } else {
+                    Double double1 = Double.parseDouble(fields1[j]);
+                    Double double2 = Double.parseDouble(fields2[j]);
+                    float float1 = (float) double1.doubleValue();
+                    float float2 = (float) double2.doubleValue();
+
+                    if (Math.abs(float1 - float2) < 1.0e-7)
+                        continue;
+                    else {
+                        return false;
+                    }
+                }
+            }
+        }
+        return true;
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
index fe07cf5..e6347ed 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
@@ -16,83 +16,82 @@
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.junit.Assert;
 
 public class TestUtils {
 
+    private static final String PREFIX = "part";
+
     public static void compareWithResultDir(File expectedFileDir, File actualFileDir) throws Exception {
-        String[] fileNames = expectedFileDir.list();
-        for (String fileName : fileNames) {
-            compareWithResult(new File(expectedFileDir, fileName), new File(actualFileDir, fileName));
-        }
+        Collection<Record> expectedRecords = loadRecords(expectedFileDir);
+        Collection<Record> actualRecords = loadRecords(actualFileDir);
+        boolean equal = collectionEqual(expectedRecords, actualRecords);
+        Assert.assertTrue(equal);
     }
 
-    public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
-        BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
-        BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
-        String lineExpected, lineActual;
-        int num = 1;
-        try {
-            while ((lineExpected = readerExpected.readLine()) != null) {
-                lineActual = readerActual.readLine();
-                if (lineActual == null) {
-                    throw new Exception("Actual result changed at line " + num + ":\n< " + lineExpected + "\n> ");
+    public static boolean collectionEqual(Collection<Record> c1, Collection<Record> c2) {
+        for (Record r1 : c1) {
+            boolean exists = false;
+            for (Record r2 : c2) {
+                if (r1.equals(r2)) {
+                    exists = true;
+                    break;
                 }
-                if (!equalStrings(lineExpected, lineActual)) {
-                    throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
-                            + lineActual);
-                }
-                ++num;
             }
-            lineActual = readerActual.readLine();
-            if (lineActual != null) {
-                throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineActual);
+            if (!exists) {
+                return false;
             }
-        } finally {
-            readerExpected.close();
-            readerActual.close();
         }
-    }
-
-    private static boolean equalStrings(String s1, String s2) {
-        String[] rowsOne = s1.split("\n");
-        String[] rowsTwo = s2.split("\n");
-
-        if (rowsOne.length != rowsTwo.length)
-            return false;
-
-        for (int i = 0; i < rowsOne.length; i++) {
-            String row1 = rowsOne[i];
-            String row2 = rowsTwo[i];
-
-            if (row1.equals(row2))
-                continue;
-
-            boolean spaceOrTab = false;
-            spaceOrTab = row1.contains(" ");
-            String[] fields1 = spaceOrTab ? row1.split(" ") : row1.split("\t");
-            String[] fields2 = spaceOrTab ? row2.split(" ") : row2.split("\t");
-
-            for (int j = 0; j < fields1.length; j++) {
-                if (fields1[j].equals(fields2[j])) {
-                    continue;
-                } else if (fields1[j].indexOf('.') < 0) {
-                    return false;
-                } else {
-                    Double double1 = Double.parseDouble(fields1[j]);
-                    Double double2 = Double.parseDouble(fields2[j]);
-                    float float1 = (float) double1.doubleValue();
-                    float float2 = (float) double2.doubleValue();
-
-                    if (Math.abs(float1 - float2) < 1.0e-7)
-                        continue;
-                    else {
-                        return false;
-                    }
+        for (Record r2 : c2) {
+            boolean exists = false;
+            for (Record r1 : c1) {
+                if (r2.equals(r1)) {
+                    exists = true;
+                    break;
                 }
             }
+            if (!exists) {
+                return false;
+            }
         }
         return true;
     }
 
+    public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
+        Collection<Record> expectedRecords = new ArrayList<Record>();
+        Collection<Record> actualRecords = new ArrayList<Record>();
+        populateResultFile(expectedRecords, expectedFile);
+        populateResultFile(actualRecords, actualFile);
+        boolean equal = expectedRecords.equals(actualRecords);
+        Assert.assertTrue(equal);
+    }
+
+    private static Collection<Record> loadRecords(File dir) throws Exception {
+        String[] fileNames = dir.list();
+        Collection<Record> records = new ArrayList<Record>();
+        for (String fileName : fileNames) {
+            if (fileName.startsWith(PREFIX)) {
+                File file = new File(dir, fileName);
+                populateResultFile(records, file);
+            }
+        }
+        return records;
+    }
+
+    private static void populateResultFile(Collection<Record> records, File file) throws FileNotFoundException,
+            IOException {
+        BufferedReader reader = new BufferedReader(new FileReader(file));
+        String line = null;
+        while ((line = reader.readLine()) != null) {
+            records.add(new Record(line));
+        }
+        reader.close();
+    }
+
 }
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
index 591446c..3091c83 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
@@ -80,6 +80,7 @@
 <property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
 <property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
 <property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
 <property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
 <property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
 <property><name>mapred.queue.names</name><value>default</value></property>
@@ -125,7 +126,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index 32c2a1a..b6af65c 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -29,7 +29,6 @@
 <property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleMinCombiner</value></property>
 <property><name>mapred.output.compress</name><value>false</value></property>
 <property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
 <property><name>mapred.reduce.max.attempts</name><value>4</value></property>
@@ -86,6 +85,7 @@
 <property><name>fs.checkpoint.period</name><value>3600</value></property>
 <property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
 <property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
 <property><name>fs.s3.maxRetries</name><value>4</value></property>
 <property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
 <property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml b/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
index d06068d..d908da8 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
@@ -124,7 +124,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml b/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
index 01d85a5..d5ec8f1 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/GraphMutation.xml
@@ -124,7 +124,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
index 072ea9e..b4c42e6 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique.xml
@@ -124,7 +124,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator,class edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
index 3ae367d..6cf075b 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
@@ -124,7 +124,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator,class edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
index 6cb617f..49e2e6f 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
@@ -125,7 +125,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator,class edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
index 76d6e87..8316c64 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
@@ -125,7 +125,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
index 1f52250..a894ccd 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
@@ -124,7 +124,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
index 9d2c9e1..a9f8925 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
@@ -1,146 +1,145 @@
 <?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>pregelix.updateIntensive</name><value>true</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>pregelix.framesize</name><value>2048</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>pregelix.numVertices</name><value>20</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.job.name</name><value>Message Overflow LSM</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
 <property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
 <property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.output.dir</name><value>/result</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
 <property><name>mapred.input.dir</name><value>file:/webmap</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
 <property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Message Overflow LSM</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>pregelix.updateIntensive</name><value>true</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.framesize</name><value>2048</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
 </configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
index f1c27ca..d29b2da 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
@@ -125,7 +125,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimulatedPageRankVertexInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>pregelix.incStateLength</name><value>false</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index dfb4e71..6fe04fb 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -30,7 +30,6 @@
 <property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
 <property><name>mapred.output.compress</name><value>false</value></property>
 <property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
 <property><name>mapred.reduce.max.attempts</name><value>4</value></property>
@@ -86,6 +85,7 @@
 <property><name>fs.checkpoint.period</name><value>3600</value></property>
 <property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
 <property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
 <property><name>fs.s3.maxRetries</name><value>4</value></property>
 <property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
 <property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index 49a6e20..d0f9759 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -30,7 +30,6 @@
 <property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
 <property><name>mapred.output.compress</name><value>false</value></property>
 <property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
 <property><name>mapred.reduce.max.attempts</name><value>4</value></property>
@@ -87,6 +86,7 @@
 <property><name>fs.checkpoint.period</name><value>3600</value></property>
 <property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
 <property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
 <property><name>fs.s3.maxRetries</name><value>4</value></property>
 <property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
 <property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
index 789ea32..0173390 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
@@ -80,6 +80,7 @@
 <property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
 <property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
 <property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
 <property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
 <property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
 <property><name>mapred.queue.names</name><value>default</value></property>
@@ -125,7 +126,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
index 796b1d1..a7a38e0 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
@@ -80,6 +80,7 @@
 <property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
 <property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
 <property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>pregelix.dynamicopt</name><value>true</value></property>
 <property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
 <property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
 <property><name>mapred.queue.names</name><value>default</value></property>
@@ -125,7 +126,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>pregelix.incStateLength</name><value>false</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
index 8834ead..225429a 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
@@ -1,146 +1,145 @@
 <?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>pregelix.numVertices</name><value>23</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.job.name</name><value>Reachibility</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
 <property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
 <property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
-<property><name>ReachibilityVertex.destId</name><value>10</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
 <property><name>mapred.input.dir</name><value>file:/webmapcomplex</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
 <property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Reachibility</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>23</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>ReachibilityVertex.destId</name><value>10</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
 </configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
index 234dbf9..bd9da92 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplexNoConnectivity.xml
@@ -1,146 +1,145 @@
 <?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
 <property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
 <property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
 <property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>pregelix.numVertices</name><value>23</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.job.name</name><value>Reachibility</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
 <property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
 <property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
-<property><name>ReachibilityVertex.destId</name><value>25</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
 <property><name>mapred.input.dir</name><value>file:/webmapcomplex</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
 <property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
 <property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Reachibility</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>23</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>ReachibilityVertex.destId</name><value>25</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
 </configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
index b1c57c5..87c35f7 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
@@ -126,9 +126,9 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimulatedPageRankVertexInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
 <property><name>topology.script.number.args</name><value>100</value></property>
 <property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
index f1d2dc6..b757514 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
@@ -126,9 +126,9 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
 <property><name>topology.script.number.args</name><value>100</value></property>
 <property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml b/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
index 951ac6f..80cea20 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
@@ -123,9 +123,10 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.trianglecounting.TextTriangleCountingInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>class edu.uci.ics.pregelix.api.util.GlobalCountAggregator,class edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingAggregator</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
 <property><name>topology.script.number.args</name><value>100</value></property>
 <property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>