Merge branch 'master' of https://code.google.com/p/hyracks
diff --git a/hyracks/hyracks-client/pom.xml b/hyracks/hyracks-client/pom.xml
index 99ee616..6069d6d 100644
--- a/hyracks/hyracks-client/pom.xml
+++ b/hyracks/hyracks-client/pom.xml
@@ -1,60 +1,114 @@
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>hyracks-client</artifactId>
-  <name>hyracks-client</name>
-  <parent>
-    <groupId>edu.uci.ics.hyracks</groupId>
-    <artifactId>hyracks</artifactId>
-    <version>0.2.10-SNAPSHOT</version>
-  </parent>
+<!-- ! Copyright 2009-2013 by The Regents of the University of California 
+	! Licensed under the Apache License, Version 2.0 (the "License"); ! you may 
+	not use this file except in compliance with the License. ! you may obtain 
+	a copy of the License from ! ! http://www.apache.org/licenses/LICENSE-2.0 
+	! ! Unless required by applicable law or agreed to in writing, software ! 
+	distributed under the License is distributed on an "AS IS" BASIS, ! WITHOUT 
+	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the 
+	License for the specific language governing permissions and ! limitations 
+	under the License. ! -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<artifactId>hyracks-client</artifactId>
+	<name>hyracks-client</name>
+	<parent>
+		<groupId>edu.uci.ics.hyracks</groupId>
+		<artifactId>hyracks</artifactId>
+		<version>0.2.10-SNAPSHOT</version>
+	</parent>
 
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <version>2.0.2</version>
-        <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-  <dependencies>
- <dependency>
-    <groupId>edu.uci.ics.hyracks</groupId>
-    <artifactId>hyracks-api</artifactId>
-    <version>0.2.10-SNAPSHOT</version>
- </dependency>
- <dependency>
-    <groupId>edu.uci.ics.hyracks</groupId>
-    <artifactId>hyracks-net</artifactId>
-    <version>0.2.10-SNAPSHOT</version>
- </dependency>
- <dependency>
-    <groupId>edu.uci.ics.hyracks</groupId>
-    <artifactId>hyracks-comm</artifactId>
-    <version>0.2.10-SNAPSHOT</version>
- </dependency>
-  <dependency>
-    <groupId>edu.uci.ics.hyracks</groupId>
-    <artifactId>hyracks-dataflow-common</artifactId>
-    <version>0.2.10-SNAPSHOT</version>
- </dependency>
-  </dependencies>
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>2.0.2</version>
+				<configuration>
+					<source>1.7</source>
+					<target>1.7</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.7.2</version>
+				<configuration>
+					<forkMode>pertest</forkMode>
+					<argLine>-enableassertions -Xmx2047m -XX:MaxPermSize=300m
+						-Dfile.encoding=UTF-8
+						-Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
+					<includes>
+						<include>**/*TestSuite.java</include>
+						<include>**/*Test.java</include>
+					</includes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-clean-plugin</artifactId>
+				<version>2.5</version>
+				<configuration>
+					<filesets>
+						<fileset>
+							<directory>.</directory>
+							<includes>
+								<include>teststore*</include>
+								<include>edu*</include>
+								<include>actual*</include>
+								<include>build*</include>
+								<include>expect*</include>
+								<include>ClusterController*</include>
+								<include>edu.uci.*</include>
+								<include>dev*</include>
+							</includes>
+						</fileset>
+					</filesets>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+	<dependencies>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-api</artifactId>
+			<version>0.2.10-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-net</artifactId>
+			<version>0.2.10-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-comm</artifactId>
+			<version>0.2.10-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-dataflow-common</artifactId>
+			<version>0.2.10-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-control-cc</artifactId>
+			<version>0.2.10-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-control-nc</artifactId>
+			<version>0.2.10-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>com.googlecode.json-simple</groupId>
+			<artifactId>json-simple</artifactId>
+			<version>1.1</version>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.8.1</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
 </project>
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/AggregateCounter.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/AggregateCounter.java
new file mode 100644
index 0000000..450004d
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/AggregateCounter.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.client.stats;
+
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
+
+public class AggregateCounter extends Counter {
+
+    private long sum = 0;
+    private long numOfItems = 0;
+
+    public AggregateCounter(String name) {
+        super(name);
+    }
+
+    @Override
+    public long set(long value) {
+        long retVal = getRetValue();
+        sum += value;
+        numOfItems++;
+        return retVal;
+    }
+
+    @Override
+    public long get() {
+        long retVal = getRetValue();
+        return retVal;
+    }
+
+    public void reset() {
+        sum = 0;
+        numOfItems = 0;
+    }
+
+    private long getRetValue() {
+        long retVal = 0;
+        if (numOfItems != 0) {
+            retVal = sum / numOfItems;
+        } else {
+            retVal = 0;
+        }
+        return retVal;
+    }
+
+}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/Counters.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/Counters.java
new file mode 100644
index 0000000..da30e20
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/Counters.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.client.stats;
+
+public class Counters {
+    public static final String NUM_PROCESSOR = "num-processors";
+
+    public static final String SYSTEM_LOAD = "system-load-averages";
+
+    public static final String MEMORY_USAGE = "heap-used-sizes";
+
+    public static final String NETWORK_IO_READ = "net-payload-bytes-read";
+
+    public static final String NETWORK_IO_WRITE = "net-payload-bytes-written";
+
+    public static final String DISK_READ = "disk-reads";
+
+    public static final String DISK_WRITE = "disk-writes";
+}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/IClusterCounterContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/IClusterCounterContext.java
new file mode 100644
index 0000000..356ef34
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/IClusterCounterContext.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.client.stats;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+
+public interface IClusterCounterContext extends ICounterContext {
+
+    public ICounter getCounter(String machineName, String counterName, boolean create);
+    
+}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
new file mode 100644
index 0000000..46487eb
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
@@ -0,0 +1,259 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.client.stats.impl;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+import edu.uci.ics.hyracks.client.stats.AggregateCounter;
+import edu.uci.ics.hyracks.client.stats.Counters;
+import edu.uci.ics.hyracks.client.stats.IClusterCounterContext;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
+
+/**
+ * @author yingyib
+ */
+public class ClientCounterContext implements IClusterCounterContext {
+    private static String[] RESET_COUNTERS = { Counters.NETWORK_IO_READ, Counters.NETWORK_IO_WRITE,
+            Counters.MEMORY_USAGE, Counters.DISK_READ, Counters.DISK_WRITE, Counters.NUM_PROCESSOR };
+    private static String[] AGG_COUNTERS = { Counters.SYSTEM_LOAD };
+    private static int UPDATE_INTERVAL = 10000;
+
+    private final Map<String, Counter> counterMap = new HashMap<String, Counter>();
+    private final String baseURL;
+    private final List<String> slaveMachines = new ArrayList<String>();
+    private boolean stopped = false;
+    private Thread updateThread = new UpdateThread();
+
+    public ClientCounterContext(String hostName, int restPort, Collection<String> slaveMachines) {
+        this.baseURL = "http://" + hostName + ":" + restPort + "/rest/nodes/";
+        this.slaveMachines.addAll(slaveMachines);
+        for (String restCounterName : RESET_COUNTERS) {
+            counterMap.put(restCounterName, new AggregateCounter(restCounterName));
+        }
+        for (String aggCounterName : AGG_COUNTERS) {
+            counterMap.put(aggCounterName, new AggregateCounter(aggCounterName));
+        }
+        for (String slave : slaveMachines) {
+            for (String restCounterName : RESET_COUNTERS) {
+                counterMap.put(slave + "$" + restCounterName, new AggregateCounter(restCounterName));
+            }
+            for (String aggCounterName : AGG_COUNTERS) {
+                counterMap.put(slave + "$" + aggCounterName, new AggregateCounter(aggCounterName));
+            }
+        }
+        requestCounters();
+        updateThread.start();
+    }
+
+    /**
+     * Reset the counters
+     */
+    public void reset() {
+        for (String aggCounterName : RESET_COUNTERS) {
+            AggregateCounter aggCounter = (AggregateCounter) counterMap.get(aggCounterName);
+            aggCounter.reset();
+        }
+        for (String slave : slaveMachines) {
+            for (String aggCounterName : RESET_COUNTERS) {
+                AggregateCounter aggCounter = (AggregateCounter) counterMap.get(slave + "$" + aggCounterName);
+                aggCounter.reset();
+            }
+        }
+    }
+
+    public void resetAll() {
+        reset();
+        for (String aggCounterName : AGG_COUNTERS) {
+            AggregateCounter aggCounter = (AggregateCounter) counterMap.get(aggCounterName);
+            aggCounter.reset();
+        }
+        for (String slave : slaveMachines) {
+            for (String aggCounterName : AGG_COUNTERS) {
+                AggregateCounter aggCounter = (AggregateCounter) counterMap.get(slave + "$" + aggCounterName);
+                aggCounter.reset();
+            }
+        }
+    }
+
+    @Override
+    public synchronized ICounter getCounter(String name, boolean create) {
+        Counter counter = counterMap.get(name);
+        if (counter == null) {
+            throw new IllegalStateException("request an unknown counter: " + name + "!");
+        }
+        return counter;
+    }
+
+    @Override
+    public ICounter getCounter(String machineName, String counterName, boolean create) {
+        Counter counter = counterMap.get(machineName + "$" + counterName);
+        if (counter == null) {
+            throw new IllegalStateException("request an unknown counter: " + counterName + " on slave machine "
+                    + machineName + "!");
+        }
+        return counter;
+    }
+
+    /**
+     * request to each slave machine for all the counters
+     */
+    private synchronized void requestCounters() {
+        try {
+            reset();
+            for (String slave : slaveMachines) {
+                String slaveProfile = requestProfile(slave);
+                JSONParser parser = new JSONParser();
+                JSONObject jo = (JSONObject) parser.parse(slaveProfile);
+                Object counterObject = jo.get("result");
+                if (counterObject instanceof JSONObject) {
+                    updateCounterMapWithJSONArray(slave, (JSONObject) counterObject);
+                }
+            }
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    /**
+     * Update counters
+     * 
+     * @param jo
+     *            the Profile JSON object
+     */
+    private void updateCounterMapWithJSONArray(String slave, JSONObject jo) {
+        for (String counterName : RESET_COUNTERS) {
+            updateCounter(slave, jo, counterName);
+        }
+        
+        for (String counterName : AGG_COUNTERS) {
+            updateCounter(slave, jo, counterName);
+        }
+    }
+
+    private void updateCounter(String slave, JSONObject jo, String counterName) {
+        Object counterObject = jo.get(counterName);
+        long counterValue = extractCounterValue(counterObject);
+        // global counter
+        ICounter counter = getCounter(counterName, true);
+        counter.set(counterValue);
+        //local counters
+        ICounter localCounter = getCounter(slave, counterName, true);
+        localCounter.set(counterValue);
+    }
+
+    private long extractCounterValue(Object counterObject) {
+        long counterValue = 0;
+        if (counterObject instanceof JSONArray) {
+            JSONArray jArray = (JSONArray) counterObject;
+            Object[] values = jArray.toArray();
+            for (Object value : values) {
+                if (value instanceof Double) {
+                    Double dValue = (Double) value;
+                    counterValue += dValue.doubleValue();
+                } else if (value instanceof Long) {
+                    Long lValue = (Long) value;
+                    counterValue += lValue.longValue();
+                }
+            }
+            counterValue /= values.length;
+        } else {
+            Long val = (Long) counterObject;
+            counterValue = val.longValue();
+        }
+        return counterValue;
+    }
+
+    /**
+     * Request a counter from the slave machine
+     * 
+     * @param slaveMachine
+     * @return the JSON string from the slave machine
+     */
+    private String requestProfile(String slaveMachine) {
+        try {
+            String url = baseURL + slaveMachine;
+            URL obj = new URL(url);
+            HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+            con.setRequestMethod("GET");
+            int responseCode = con.getResponseCode();
+            if (responseCode != 200) {
+                throw new IllegalStateException("The http response code is wrong: " + responseCode);
+            }
+            BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
+            String inputLine;
+            StringBuffer response = new StringBuffer();
+            while ((inputLine = in.readLine()) != null) {
+                response.append(inputLine);
+            }
+            in.close();
+            return response.toString();
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    /**
+     * Stop the counter profiler
+     */
+    public void stop() {
+        synchronized (updateThread) {
+            stopped = true;
+            updateThread.notifyAll();
+        }
+        try {
+            updateThread.join();
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    /**
+     * The thread keep updating counters
+     */
+    private class UpdateThread extends Thread {
+
+        @Override
+        public synchronized void run() {
+            try {
+                while (true) {
+                    if (stopped) {
+                        break;
+                    }
+                    requestCounters();
+                    wait(UPDATE_INTERVAL);
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                notifyAll();
+            }
+        }
+
+    }
+
+}
diff --git a/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java
new file mode 100644
index 0000000..bbf212f
--- /dev/null
+++ b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.client.stats;
+
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+import edu.uci.ics.hyracks.client.stats.impl.ClientCounterContext;
+
+public class ClientCounterContextTest {
+
+    @Test
+    public void test() throws Exception {
+        HyracksUtils.init();
+        String[] ncs = new String[] { "nc1", "nc2" };
+        ClientCounterContext ccContext = new ClientCounterContext("localhost", 16001, Arrays.asList(ncs));
+        ccContext.resetAll();
+        synchronized (this) {
+            wait(20000);
+        }
+        String[] counters = { Counters.MEMORY_USAGE, Counters.NETWORK_IO_READ, Counters.NETWORK_IO_WRITE,
+                Counters.SYSTEM_LOAD, Counters.NUM_PROCESSOR, Counters.DISK_READ, Counters.DISK_WRITE };
+        for (String counterName : counters) {
+            ICounter counter = ccContext.getCounter(counterName, false);
+            System.out.println(counterName + ": " + counter.get());
+        }
+        for (String slave : ncs) {
+            for (String counterName : counters) {
+                ICounter counter = ccContext.getCounter(slave, counterName, false);
+                System.out.println(slave + " " + counterName + ": " + counter.get());
+            }
+        }
+        ccContext.stop();
+        HyracksUtils.deinit();
+    }
+}
diff --git a/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/HyracksUtils.java b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/HyracksUtils.java
new file mode 100644
index 0000000..a58aa9c
--- /dev/null
+++ b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/HyracksUtils.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.client.stats;
+
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class HyracksUtils {
+
+    public static final String NC1_ID = "nc1";
+    public static final String NC2_ID = "nc2";
+
+    public static final int DEFAULT_HYRACKS_CC_PORT = 1099;
+    public static final int TEST_HYRACKS_CC_PORT = 1099;
+    public static final int TEST_HYRACKS_CC_CLIENT_PORT = 2099;
+    public static final String CC_HOST = "localhost";
+
+    public static final int FRAME_SIZE = 65536;
+
+    private static ClusterControllerService cc;
+    private static NodeControllerService nc1;
+    private static NodeControllerService nc2;
+    private static IHyracksClientConnection hcc;
+
+    public static void init() throws Exception {
+        CCConfig ccConfig = new CCConfig();
+        ccConfig.clientNetIpAddress = CC_HOST;
+        ccConfig.clusterNetIpAddress = CC_HOST;
+        ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
+        ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
+        ccConfig.defaultMaxJobAttempts = 0;
+        ccConfig.jobHistorySize = 0;
+        ccConfig.profileDumpPeriod = -1;
+        ccConfig.heartbeatPeriod = 50;
+
+        // cluster controller
+        cc = new ClusterControllerService(ccConfig);
+        cc.start();
+
+        // two node controllers
+        NCConfig ncConfig1 = new NCConfig();
+        ncConfig1.ccHost = "localhost";
+        ncConfig1.clusterNetIPAddress = "localhost";
+        ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
+        ncConfig1.dataIPAddress = "127.0.0.1";
+        ncConfig1.datasetIPAddress = "127.0.0.1";
+        ncConfig1.nodeId = NC1_ID;
+        nc1 = new NodeControllerService(ncConfig1);
+        nc1.start();
+
+        NCConfig ncConfig2 = new NCConfig();
+        ncConfig2.ccHost = "localhost";
+        ncConfig2.clusterNetIPAddress = "localhost";
+        ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
+        ncConfig2.dataIPAddress = "127.0.0.1";
+        ncConfig2.datasetIPAddress = "127.0.0.1";
+        ncConfig2.nodeId = NC2_ID;
+        nc2 = new NodeControllerService(ncConfig2);
+        nc2.start();
+
+        // hyracks connection
+        hcc = new HyracksConnection(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
+    }
+
+    public static void deinit() throws Exception {
+        nc2.stop();
+        nc1.stop();
+        cc.stop();
+    }
+
+    public static void runJob(JobSpecification spec, String appName) throws Exception {
+        spec.setFrameSize(FRAME_SIZE);
+        JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+        hcc.waitForCompletion(jobId);
+    }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index 85ce104..63fc26b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -125,6 +125,10 @@
 
     private final long[] ipcMessageBytesReceived;
 
+    private final long[] diskReads;
+
+    private final long[] diskWrites;
+
     private int rrdPtr;
 
     private int lastHeartbeatDuration;
@@ -184,6 +188,9 @@
         ipcMessagesReceived = new long[RRD_SIZE];
         ipcMessageBytesReceived = new long[RRD_SIZE];
 
+        diskReads = new long[RRD_SIZE];
+        diskWrites = new long[RRD_SIZE];
+
         rrdPtr = 0;
     }
 
@@ -219,6 +226,8 @@
             ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
             ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
             ipcMessageBytesReceived[rrdPtr] = hbData.ipcMessageBytesReceived;
+            diskReads[rrdPtr] = hbData.diskReads;
+            diskWrites[rrdPtr] = hbData.diskWrites;
             rrdPtr = (rrdPtr + 1) % RRD_SIZE;
         }
     }
@@ -303,6 +312,8 @@
         o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
         o.put("ipc-messages-received", ipcMessagesReceived);
         o.put("ipc-message-bytes-received", ipcMessageBytesReceived);
+        o.put("disk-reads", diskReads);
+        o.put("disk-writes", diskWrites);
 
         return o;
     }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
index 0724a01..2407c10 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -181,36 +181,44 @@
      * @throws HyracksException
      */
     private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC) throws HyracksException {
-        try {
-            List<URL> downloadedFileURLs = new ArrayList<URL>();
-            File dir = new File(deploymentDir);
-            if (!dir.exists()) {
-                FileUtils.forceMkdir(dir);
-            }
-            for (URL url : urls) {
-                String urlString = url.toString();
-                int slashIndex = urlString.lastIndexOf('/');
-                String fileName = urlString.substring(slashIndex + 1).split("&")[1];
-                String filePath = deploymentDir + File.separator + fileName;
-                File targetFile = new File(filePath);
-                if (isNC) {
-                    HttpClient hc = new DefaultHttpClient();
-                    HttpGet get = new HttpGet(url.toString());
-                    HttpResponse response = hc.execute(get);
-                    InputStream is = response.getEntity().getContent();
-                    OutputStream os = new FileOutputStream(targetFile);
-                    try {
-                        IOUtils.copyLarge(is, os);
-                    } finally {
-                        os.close();
-                        is.close();
-                    }
+        //retry 10 times at maximum for downloading binaries
+        int retryCount = 10;
+        int tried = 0;
+        Exception trace = null;
+        while (tried < retryCount) {
+            try {
+                tried++;
+                List<URL> downloadedFileURLs = new ArrayList<URL>();
+                File dir = new File(deploymentDir);
+                if (!dir.exists()) {
+                    FileUtils.forceMkdir(dir);
                 }
-                downloadedFileURLs.add(targetFile.toURI().toURL());
+                for (URL url : urls) {
+                    String urlString = url.toString();
+                    int slashIndex = urlString.lastIndexOf('/');
+                    String fileName = urlString.substring(slashIndex + 1).split("&")[1];
+                    String filePath = deploymentDir + File.separator + fileName;
+                    File targetFile = new File(filePath);
+                    if (isNC) {
+                        HttpClient hc = new DefaultHttpClient();
+                        HttpGet get = new HttpGet(url.toString());
+                        HttpResponse response = hc.execute(get);
+                        InputStream is = response.getEntity().getContent();
+                        OutputStream os = new FileOutputStream(targetFile);
+                        try {
+                            IOUtils.copyLarge(is, os);
+                        } finally {
+                            os.close();
+                            is.close();
+                        }
+                    }
+                    downloadedFileURLs.add(targetFile.toURI().toURL());
+                }
+                return downloadedFileURLs;
+            } catch (Exception e) {
+                trace = e;
             }
-            return downloadedFileURLs;
-        } catch (Exception e) {
-            throw new HyracksException(e);
         }
+        throw new HyracksException(trace);
     }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
index e46449b..e999913 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -45,4 +45,6 @@
     public long ipcMessageBytesSent;
     public long ipcMessagesReceived;
     public long ipcMessageBytesReceived;
+    public long diskReads;
+    public long diskWrites;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 245d988..1cee330 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -66,6 +66,8 @@
 import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
 import edu.uci.ics.hyracks.control.nc.dataset.DatasetPartitionManager;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.profiling.IIOCounter;
+import edu.uci.ics.hyracks.control.nc.io.profiling.IOCounterFactory;
 import edu.uci.ics.hyracks.control.nc.net.DatasetNetworkManager;
 import edu.uci.ics.hyracks.control.nc.net.NetworkManager;
 import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
@@ -145,6 +147,8 @@
     private final MemoryManager memoryManager;
 
     private boolean shuttedDown = false;
+    
+    private IIOCounter ioCounter;
 
     public NodeControllerService(NCConfig ncConfig) throws Exception {
         this.ncConfig = ncConfig;
@@ -173,6 +177,7 @@
         registrationPending = true;
         getNodeControllerInfosAcceptor = new MutableObject<FutureValue<Map<String, NodeControllerInfo>>>();
         memoryManager = new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
+        ioCounter = new IOCounterFactory().getIOCounter();
     }
 
     public IHyracksRootContext getRootContext() {
@@ -429,6 +434,9 @@
             hbData.ipcMessagesReceived = ipcPC.getMessageReceivedCount();
             hbData.ipcMessageBytesReceived = ipcPC.getMessageBytesReceived();
 
+            hbData.diskReads = ioCounter.getReads();
+            hbData.diskWrites = ioCounter.getWrites();
+            
             try {
                 cc.nodeHeartbeat(id, hbData);
             } catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IIOCounter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IIOCounter.java
new file mode 100644
index 0000000..6774262
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IIOCounter.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.nc.io.profiling;
+
+public interface IIOCounter {
+
+    /**
+     * @return the number of block reads from the very beginning
+     */
+    public long getReads();
+
+    /**
+     * @return the number of block writes from the very beginning
+     */
+    public long getWrites();
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterDefault.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterDefault.java
new file mode 100644
index 0000000..f53f79c
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterDefault.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.nc.io.profiling;
+
+public class IOCounterDefault implements IIOCounter{
+
+    @Override
+    public long getReads() {
+        return 0;
+    }
+
+    @Override
+    public long getWrites() {
+        return 0;
+    }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterFactory.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterFactory.java
new file mode 100644
index 0000000..06ccfaf
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.nc.io.profiling;
+
+public class IOCounterFactory {
+
+    /**
+     * Get the IOCounter for the specific underlying OS
+     * 
+     * @return an IIOCounter instance
+     */
+    public IIOCounter getIOCounter() {
+        String osName = System.getProperty("os.name").toLowerCase();
+        if (osName.indexOf("nix") >= 0 || osName.indexOf("nux") >= 0 || osName.indexOf("aix") >= 0) {
+            return new IOCounterLinux();
+        } else if (osName.indexOf("mac") >= 0) {
+            return new IOCounterOSX();
+        } else {
+            return new IOCounterDefault();
+        }
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterLinux.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterLinux.java
new file mode 100644
index 0000000..1e8baa1
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterLinux.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.nc.io.profiling;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.StringTokenizer;
+
+public class IOCounterLinux implements IIOCounter {
+    public static final String COMMAND = "iostat";
+    public static final String COMMAND2 = "cat /proc/self/io";
+    public static final int PAGE_SIZE = 4096;
+
+    private final long baseReads;
+    private final long baseWrites;
+
+    public IOCounterLinux() {
+        baseReads = getReads();
+        baseWrites = getWrites();
+    }
+
+    @Override
+    public long getReads() {
+        try {
+            long reads = extractColumn(4);
+            return reads - baseReads;
+        } catch (IOException e) {
+            try {
+                long reads = extractRow(4);
+                return reads / PAGE_SIZE;
+            } catch (IOException e2) {
+                return 0;
+            }
+        }
+    }
+
+    @Override
+    public long getWrites() {
+        try {
+            long writes = extractColumn(5);
+            return writes - baseWrites;
+        } catch (IOException e) {
+            try {
+                long writes = extractRow(5);
+                long cancelledWrites = extractRow(6);
+                return (writes - cancelledWrites) / PAGE_SIZE;
+            } catch (IOException e2) {
+                return 0;
+            }
+        }
+    }
+
+    private long extractColumn(int columnIndex) throws IOException {
+        BufferedReader reader = exec(COMMAND);
+        String line = null;
+        boolean device = false;
+        long ios = 0;
+        while ((line = reader.readLine()) != null) {
+            if (line.contains("Blk_read")) {
+                device = true;
+                continue;
+            }
+            if (device == true) {
+                StringTokenizer tokenizer = new StringTokenizer(line);
+                int i = 0;
+                while (tokenizer.hasMoreTokens()) {
+                    String column = tokenizer.nextToken();
+                    if (i == columnIndex) {
+                        ios += Long.parseLong(column);
+                        break;
+                    }
+                    i++;
+                }
+            }
+        }
+        reader.close();
+        return ios;
+    }
+
+    private long extractRow(int rowIndex) throws IOException {
+        BufferedReader reader = exec(COMMAND2);
+        String line = null;
+        long ios = 0;
+        int i = 0;
+        while ((line = reader.readLine()) != null) {
+            if (i == rowIndex) {
+                StringTokenizer tokenizer = new StringTokenizer(line);
+                int j = 0;
+                while (tokenizer.hasMoreTokens()) {
+                    String column = tokenizer.nextToken();
+                    if (j == 1) {
+                        ios = Long.parseLong(column);
+                        break;
+                    }
+                    j++;
+                }
+            }
+            i++;
+        }
+        reader.close();
+        return ios;
+    }
+
+    private BufferedReader exec(String command) throws IOException {
+        Process p = Runtime.getRuntime().exec(command);
+        return new BufferedReader(new InputStreamReader(p.getInputStream()));
+    }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterOSX.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterOSX.java
new file mode 100644
index 0000000..5f56ca8
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterOSX.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.nc.io.profiling;
+
+public class IOCounterOSX implements IIOCounter {
+
+    @Override
+    public long getReads() {
+        return 0;
+    }
+
+    @Override
+    public long getWrites() {
+        return 0;
+    }
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 075de03..06e79de 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -80,6 +80,8 @@
     public static final String RECOVERY_COUNT = "pregelix.recoveryCount";
     /** the checkpoint interval */
     public static final String CKP_INTERVAL = "pregelix.ckpinterval";
+    /** the dynamic optimization */
+    public static final String DYNAMIC_OPTIMIZATION = "pregelix.dynamicopt";
     /** comma */
     public static final String COMMA_STR = ",";
 
@@ -260,6 +262,15 @@
     final public void setCheckpointingInterval(int ckpInterval) {
         getConfiguration().setInt(CKP_INTERVAL, ckpInterval);
     }
+    
+    /**
+     * Indicate if dynamic optimization is enabled
+     * 
+     * @param dynamicOpt
+     */
+    final public void setEnableDynamicOptimization(boolean dynamicOpt){
+        getConfiguration().setBoolean(DYNAMIC_OPTIMIZATION, dynamicOpt);
+    }
 
     @Override
     public String toString() {
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index a5d9cd7..f44942f 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -671,6 +671,16 @@
     public static int getRecoveryCount(Configuration conf) {
         return conf.getInt(PregelixJob.RECOVERY_COUNT, 0);
     }
+    
+    /***
+     * Get enable dynamic optimization
+     * 
+     * @param conf Configuration
+     * @return true if enabled; otherwise false
+     */
+    public static boolean getEnableDynamicOptimization(Configuration conf){
+        return conf.getBoolean(PregelixJob.DYNAMIC_OPTIMIZATION, true);
+    }
 
     /***
      * Get the user-set checkpoint interval
diff --git a/pregelix/pregelix-core/pom.xml b/pregelix/pregelix-core/pom.xml
index 3d1699f..9ae263c 100644
--- a/pregelix/pregelix-core/pom.xml
+++ b/pregelix/pregelix-core/pom.xml
@@ -358,5 +358,12 @@
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-client</artifactId>
+			<version>0.2.10-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
 	</dependencies>
 </project>
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index d6a6f3d..b3a90e9 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -21,6 +21,7 @@
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
@@ -42,21 +43,24 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.client.stats.Counters;
+import edu.uci.ics.hyracks.client.stats.impl.ClientCounterContext;
 import edu.uci.ics.pregelix.api.job.ICheckpointHook;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.base.IDriver;
 import edu.uci.ics.pregelix.core.jobgen.JobGen;
-import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
+import edu.uci.ics.pregelix.core.jobgen.JobGenFactory;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.DynamicOptimizer;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
 import edu.uci.ics.pregelix.core.util.ExceptionUtilities;
 import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
 
 @SuppressWarnings("rawtypes")
 public class Driver implements IDriver {
+    public static final String[] COUNTERS = { Counters.NUM_PROCESSOR, Counters.SYSTEM_LOAD, Counters.MEMORY_USAGE,
+            Counters.DISK_READ, Counters.DISK_WRITE, Counters.NETWORK_IO_READ, Counters.NETWORK_IO_WRITE };
     private static final Log LOG = LogFactory.getLog(Driver.class);
     private IHyracksClientConnection hcc;
     private Class exampleClass;
@@ -93,6 +97,8 @@
             PregelixJob currentJob = jobs.get(0);
             PregelixJob lastJob = currentJob;
             addHadoopConfiguration(currentJob, ipAddress, port, true);
+            ClientCounterContext counterContext = new ClientCounterContext(ipAddress, 16001,
+                    Arrays.asList(ClusterConfig.getNCNames()));
             JobGen jobGen = null;
 
             /** prepare job -- deploy jars */
@@ -105,6 +111,7 @@
             int retryCount = 0;
             int maxRetryCount = 3;
             jobGen = selectJobGen(planChoice, currentJob);
+            IOptimizer dynamicOptimzier = new DynamicOptimizer();
 
             do {
                 try {
@@ -131,7 +138,10 @@
                             jobGen.reset(currentJob);
                         }
 
-                        /** run loop-body jobs */
+                        /** run loop-body jobs with dynamic optimizer if it is enabled */
+                        if (BspUtils.getEnableDynamicOptimization(currentJob.getConfiguration())) {
+                            jobGen = dynamicOptimzier.optimize(counterContext, jobGen, i);
+                        }
                         runLoopBody(deploymentId, currentJob, jobGen, i, lastSnapshotJobIndex, lastSnapshotSuperstep,
                                 ckpHook, failed);
                         runClearState(deploymentId, jobGen);
@@ -156,6 +166,13 @@
                 }
             } while (failed && retryCount < maxRetryCount);
             LOG.info("job finished");
+            StringBuffer counterBuffer = new StringBuffer();
+            counterBuffer.append("performance counters\n");
+            for (String counter : COUNTERS) {
+                counterBuffer.append("\t" + counter + ": " + counterContext.getCounter(counter, false).get() + "\n");
+            }
+            LOG.info(counterBuffer.toString());
+            counterContext.stop();
         } catch (Exception e) {
             throw new HyracksException(e);
         }
@@ -180,24 +197,7 @@
     }
 
     private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob) {
-        JobGen jobGen;
-        switch (planChoice) {
-            case INNER_JOIN:
-                jobGen = new JobGenInnerJoin(currentJob);
-                break;
-            case OUTER_JOIN:
-                jobGen = new JobGenOuterJoin(currentJob);
-                break;
-            case OUTER_JOIN_SORT:
-                jobGen = new JobGenOuterJoinSort(currentJob);
-                break;
-            case OUTER_JOIN_SINGLE_SORT:
-                jobGen = new JobGenOuterJoinSingleSort(currentJob);
-                break;
-            default:
-                jobGen = new JobGenInnerJoin(currentJob);
-        }
-        return jobGen;
+        return JobGenFactory.createJobGen(planChoice, currentJob);
     }
 
     private long loadData(PregelixJob currentJob, JobGen jobGen, DeploymentId deploymentId) throws IOException,
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index f703fcc..c1f6aae 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -31,12 +31,14 @@
 import java.util.UUID;
 import java.util.logging.Logger;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -93,6 +95,7 @@
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.api.util.ReflectionUtils;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
 import edu.uci.ics.pregelix.core.base.IJobGen;
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
@@ -110,11 +113,14 @@
 import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeIndexBulkReLoadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
 import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
 import edu.uci.ics.pregelix.runtime.bootstrap.VirtualBufferCacheProvider;
+import edu.uci.ics.pregelix.runtime.function.ExtractLiveVertexIdFunctionFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.RecoveryRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
@@ -131,7 +137,7 @@
     protected PregelixJob pregelixJob;
     protected IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
     protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
-    protected String jobId = UUID.randomUUID().toString();
+    protected String jobId = UUID.randomUUID().toString();;
     protected int frameSize = ClusterConfig.getFrameSize();
     protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
 
@@ -147,6 +153,13 @@
     public JobGen(PregelixJob job) {
         init(job);
     }
+    
+    public JobGen(PregelixJob job, String jobId) {
+        if(jobId!=null){
+            this.jobId = jobId;
+        }
+        init(job);
+    }
 
     private void init(PregelixJob job) {
         conf = job.getConfiguration();
@@ -779,4 +792,88 @@
     /** generate clean-up job */
     public abstract JobSpecification[] generateCleanup() throws HyracksException;
 
+    /**
+     * Switch the plan to a desired one
+     * 
+     * @param iteration
+     *            , the latest completed iteration number
+     * @param plan
+     *            , plan choice
+     * @return the list of jobspecification for preparing plan switch and the new jobgen
+     */
+    public Pair<List<JobSpecification>, JobGen> switchPlan(int iteration, Plan plan) throws HyracksException {
+        /**
+         * bulk-load a live vertex btree
+         */
+        List<JobSpecification> list = new ArrayList<JobSpecification>();
+        list.add(bulkLoadLiveVertexBTree(iteration));
+        JobGen jobGen = new JobGenInnerJoin(pregelixJob, jobId);
+        return Pair.of(list, jobGen);
+    }
+
+    /**
+     * Build a jobspec to bulkload the live vertex btree
+     * 
+     * @param iteration
+     * @return the job specification
+     * @throws HyracksException
+     */
+    private JobSpecification bulkLoadLiveVertexBTree(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /**
+         * construct btree search and function call update operator
+         */
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
+
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, vertexIdClass.getName(), vertexClass.getName());
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
+                MsgList.class.getName());
+        TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
+                recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
+                getIndexDataflowHelperFactory(), inputRdFactory, 1, new ExtractLiveVertexIdFunctionFactory(),
+                preHookFactory, null, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        /**
+         * construct bulk-load index operator
+         */
+        IFileSplitProvider secondaryFileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
+        int[] fieldPermutation = new int[] { 0, 1 };
+        int[] keyFields = new int[] { 0 };
+        IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
+        indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
+                WritableComparator.get(vertexIdClass).getClass());
+        TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
+                storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
+                fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
+        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+        /** connect job spec */
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, btreeBulkLoad, 0);
+        spec.addRoot(btreeBulkLoad);
+
+        return spec;
+    }
+
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
new file mode 100644
index 0000000..ed580de
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.jobgen;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
+
+public class JobGenFactory {
+
+    public static JobGen createJobGen(Plan planChoice, PregelixJob currentJob) {
+        JobGen jobGen = null;
+        switch (planChoice) {
+            case INNER_JOIN:
+                jobGen = new JobGenInnerJoin(currentJob);
+                break;
+            case OUTER_JOIN:
+                jobGen = new JobGenOuterJoin(currentJob);
+                break;
+            case OUTER_JOIN_SORT:
+                jobGen = new JobGenOuterJoinSort(currentJob);
+                break;
+            case OUTER_JOIN_SINGLE_SORT:
+                jobGen = new JobGenOuterJoinSingleSort(currentJob);
+                break;
+            default:
+                jobGen = new JobGenInnerJoin(currentJob);
+        }
+        return jobGen;
+    }
+
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 1bad401..7bdb069 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -96,6 +96,10 @@
     public JobGenInnerJoin(PregelixJob job) {
         super(job);
     }
+    
+    public JobGenInnerJoin(PregelixJob job, String jobId) {
+        super(job, jobId);
+    }
 
     protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index d01c069..68e6706 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -72,6 +72,10 @@
     public JobGenOuterJoin(PregelixJob job) {
         super(job);
     }
+    
+    public JobGenOuterJoin(PregelixJob job, String jobId) {
+        super(job, jobId);
+    }
 
     @Override
     protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 4480b97..3e4b213 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -72,6 +72,10 @@
     public JobGenOuterJoinSingleSort(PregelixJob job) {
         super(job);
     }
+    
+    public JobGenOuterJoinSingleSort(PregelixJob job, String jobId) {
+        super(job, jobId);
+    }
 
     @Override
     protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 89fbdcd..5c1a4b8 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -256,6 +256,10 @@
         return locations;
     }
 
+    public static String[] getNCNames() {
+        return NCs;
+    }
+
     public static void addToBlackListNodes(Collection<String> nodes) {
         blackListNodes.addAll(nodes);
     }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
new file mode 100644
index 0000000..01fc81b
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.optimizer;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.pregelix.core.jobgen.JobGen;
+
+public class DynamicOptimizer implements IOptimizer {
+
+    @Override
+    public JobGen optimize(ICounterContext counterContext, JobGen jobGen, int iteration) {
+        return jobGen;
+    }
+
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
new file mode 100644
index 0000000..b5913c4
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.optimizer;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.pregelix.core.jobgen.JobGen;
+
+public interface IOptimizer {
+
+    public JobGen optimize(ICounterContext counterContext, JobGen jobGen, int iteration);
+    
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java
new file mode 100644
index 0000000..ae9463f
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.function;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+
+@SuppressWarnings("rawtypes")
+public class ExtractLiveVertexIdFunctionFactory implements IUpdateFunctionFactory {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IUpdateFunction createFunction() {
+        return new IUpdateFunction() {
+            // for writing intermediate data
+            private final ArrayTupleBuilder alive = new ArrayTupleBuilder(2);
+
+            // for writing out to alive message channel
+            private IFrameWriter writerAlive;
+            private FrameTupleAppender appenderAlive;
+            private ByteBuffer bufferAlive;
+
+            private MsgList dummyMessageList = new MsgList();
+            private Vertex vertex;
+
+            @Override
+            public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
+                    throws HyracksDataException {
+                this.writerAlive = writers[0];
+                this.bufferAlive = ctx.allocateFrame();
+                this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderAlive.reset(bufferAlive, true);
+            }
+
+            @Override
+            public void process(Object[] tuple) throws HyracksDataException {
+                try {
+                    // vertex Id, vertex
+                    alive.reset();
+                    vertex = (Vertex) tuple[1];
+                    if (!vertex.isHalted()) {
+                        alive.reset();
+                        DataOutput outputAlive = alive.getDataOutput();
+                        vertex.getVertexId().write(outputAlive);
+                        alive.addFieldEndOffset();
+                        dummyMessageList.write(outputAlive);
+                        alive.addFieldEndOffset();
+                        FrameTupleUtils.flushTuple(appenderAlive, alive, writerAlive);
+                    }
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
+            }
+
+            @Override
+            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+                    throws HyracksDataException {
+
+            }
+        };
+    }
+}