Merge branch 'master' into zheilbron/hyracks_msr_demo

Conflicts:
	hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 075de03..06e79de 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -80,6 +80,8 @@
     public static final String RECOVERY_COUNT = "pregelix.recoveryCount";
     /** the checkpoint interval */
     public static final String CKP_INTERVAL = "pregelix.ckpinterval";
+    /** the dynamic optimization */
+    public static final String DYNAMIC_OPTIMIZATION = "pregelix.dynamicopt";
     /** comma */
     public static final String COMMA_STR = ",";
 
@@ -260,6 +262,15 @@
     final public void setCheckpointingInterval(int ckpInterval) {
         getConfiguration().setInt(CKP_INTERVAL, ckpInterval);
     }
+    
+    /**
+     * Indicate if dynamic optimization is enabled
+     * 
+     * @param dynamicOpt
+     */
+    final public void setEnableDynamicOptimization(boolean dynamicOpt){
+        getConfiguration().setBoolean(DYNAMIC_OPTIMIZATION, dynamicOpt);
+    }
 
     @Override
     public String toString() {
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index a5d9cd7..f44942f 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -671,6 +671,16 @@
     public static int getRecoveryCount(Configuration conf) {
         return conf.getInt(PregelixJob.RECOVERY_COUNT, 0);
     }
+    
+    /***
+     * Get enable dynamic optimization
+     * 
+     * @param conf Configuration
+     * @return true if enabled; otherwise false
+     */
+    public static boolean getEnableDynamicOptimization(Configuration conf){
+        return conf.getBoolean(PregelixJob.DYNAMIC_OPTIMIZATION, true);
+    }
 
     /***
      * Get the user-set checkpoint interval
diff --git a/pregelix-core/pom.xml b/pregelix-core/pom.xml
index 3d1699f..9ae263c 100644
--- a/pregelix-core/pom.xml
+++ b/pregelix-core/pom.xml
@@ -358,5 +358,12 @@
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-client</artifactId>
+			<version>0.2.10-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
 	</dependencies>
 </project>
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index d6a6f3d..b3a90e9 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -21,6 +21,7 @@
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
@@ -42,21 +43,24 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.client.stats.Counters;
+import edu.uci.ics.hyracks.client.stats.impl.ClientCounterContext;
 import edu.uci.ics.pregelix.api.job.ICheckpointHook;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.base.IDriver;
 import edu.uci.ics.pregelix.core.jobgen.JobGen;
-import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
+import edu.uci.ics.pregelix.core.jobgen.JobGenFactory;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.DynamicOptimizer;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
 import edu.uci.ics.pregelix.core.util.ExceptionUtilities;
 import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
 
 @SuppressWarnings("rawtypes")
 public class Driver implements IDriver {
+    public static final String[] COUNTERS = { Counters.NUM_PROCESSOR, Counters.SYSTEM_LOAD, Counters.MEMORY_USAGE,
+            Counters.DISK_READ, Counters.DISK_WRITE, Counters.NETWORK_IO_READ, Counters.NETWORK_IO_WRITE };
     private static final Log LOG = LogFactory.getLog(Driver.class);
     private IHyracksClientConnection hcc;
     private Class exampleClass;
@@ -93,6 +97,8 @@
             PregelixJob currentJob = jobs.get(0);
             PregelixJob lastJob = currentJob;
             addHadoopConfiguration(currentJob, ipAddress, port, true);
+            ClientCounterContext counterContext = new ClientCounterContext(ipAddress, 16001,
+                    Arrays.asList(ClusterConfig.getNCNames()));
             JobGen jobGen = null;
 
             /** prepare job -- deploy jars */
@@ -105,6 +111,7 @@
             int retryCount = 0;
             int maxRetryCount = 3;
             jobGen = selectJobGen(planChoice, currentJob);
+            IOptimizer dynamicOptimzier = new DynamicOptimizer();
 
             do {
                 try {
@@ -131,7 +138,10 @@
                             jobGen.reset(currentJob);
                         }
 
-                        /** run loop-body jobs */
+                        /** run loop-body jobs with dynamic optimizer if it is enabled */
+                        if (BspUtils.getEnableDynamicOptimization(currentJob.getConfiguration())) {
+                            jobGen = dynamicOptimzier.optimize(counterContext, jobGen, i);
+                        }
                         runLoopBody(deploymentId, currentJob, jobGen, i, lastSnapshotJobIndex, lastSnapshotSuperstep,
                                 ckpHook, failed);
                         runClearState(deploymentId, jobGen);
@@ -156,6 +166,13 @@
                 }
             } while (failed && retryCount < maxRetryCount);
             LOG.info("job finished");
+            StringBuffer counterBuffer = new StringBuffer();
+            counterBuffer.append("performance counters\n");
+            for (String counter : COUNTERS) {
+                counterBuffer.append("\t" + counter + ": " + counterContext.getCounter(counter, false).get() + "\n");
+            }
+            LOG.info(counterBuffer.toString());
+            counterContext.stop();
         } catch (Exception e) {
             throw new HyracksException(e);
         }
@@ -180,24 +197,7 @@
     }
 
     private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob) {
-        JobGen jobGen;
-        switch (planChoice) {
-            case INNER_JOIN:
-                jobGen = new JobGenInnerJoin(currentJob);
-                break;
-            case OUTER_JOIN:
-                jobGen = new JobGenOuterJoin(currentJob);
-                break;
-            case OUTER_JOIN_SORT:
-                jobGen = new JobGenOuterJoinSort(currentJob);
-                break;
-            case OUTER_JOIN_SINGLE_SORT:
-                jobGen = new JobGenOuterJoinSingleSort(currentJob);
-                break;
-            default:
-                jobGen = new JobGenInnerJoin(currentJob);
-        }
-        return jobGen;
+        return JobGenFactory.createJobGen(planChoice, currentJob);
     }
 
     private long loadData(PregelixJob currentJob, JobGen jobGen, DeploymentId deploymentId) throws IOException,
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index f703fcc..c1f6aae 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -31,12 +31,14 @@
 import java.util.UUID;
 import java.util.logging.Logger;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -93,6 +95,7 @@
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.api.util.ReflectionUtils;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
 import edu.uci.ics.pregelix.core.base.IJobGen;
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
@@ -110,11 +113,14 @@
 import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeIndexBulkReLoadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
 import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
 import edu.uci.ics.pregelix.runtime.bootstrap.VirtualBufferCacheProvider;
+import edu.uci.ics.pregelix.runtime.function.ExtractLiveVertexIdFunctionFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.RecoveryRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
@@ -131,7 +137,7 @@
     protected PregelixJob pregelixJob;
     protected IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
     protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
-    protected String jobId = UUID.randomUUID().toString();
+    protected String jobId = UUID.randomUUID().toString();;
     protected int frameSize = ClusterConfig.getFrameSize();
     protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
 
@@ -147,6 +153,13 @@
     public JobGen(PregelixJob job) {
         init(job);
     }
+    
+    public JobGen(PregelixJob job, String jobId) {
+        if(jobId!=null){
+            this.jobId = jobId;
+        }
+        init(job);
+    }
 
     private void init(PregelixJob job) {
         conf = job.getConfiguration();
@@ -779,4 +792,88 @@
     /** generate clean-up job */
     public abstract JobSpecification[] generateCleanup() throws HyracksException;
 
+    /**
+     * Switch the plan to a desired one
+     * 
+     * @param iteration
+     *            , the latest completed iteration number
+     * @param plan
+     *            , plan choice
+     * @return the list of jobspecification for preparing plan switch and the new jobgen
+     */
+    public Pair<List<JobSpecification>, JobGen> switchPlan(int iteration, Plan plan) throws HyracksException {
+        /**
+         * bulk-load a live vertex btree
+         */
+        List<JobSpecification> list = new ArrayList<JobSpecification>();
+        list.add(bulkLoadLiveVertexBTree(iteration));
+        JobGen jobGen = new JobGenInnerJoin(pregelixJob, jobId);
+        return Pair.of(list, jobGen);
+    }
+
+    /**
+     * Build a jobspec to bulkload the live vertex btree
+     * 
+     * @param iteration
+     * @return the job specification
+     * @throws HyracksException
+     */
+    private JobSpecification bulkLoadLiveVertexBTree(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /**
+         * construct btree search and function call update operator
+         */
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
+
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, vertexIdClass.getName(), vertexClass.getName());
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
+                MsgList.class.getName());
+        TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
+                recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
+                getIndexDataflowHelperFactory(), inputRdFactory, 1, new ExtractLiveVertexIdFunctionFactory(),
+                preHookFactory, null, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        /**
+         * construct bulk-load index operator
+         */
+        IFileSplitProvider secondaryFileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
+        int[] fieldPermutation = new int[] { 0, 1 };
+        int[] keyFields = new int[] { 0 };
+        IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
+        indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
+                WritableComparator.get(vertexIdClass).getClass());
+        TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
+                storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
+                fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
+        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+        /** connect job spec */
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, btreeBulkLoad, 0);
+        spec.addRoot(btreeBulkLoad);
+
+        return spec;
+    }
+
 }
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
new file mode 100644
index 0000000..ed580de
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.jobgen;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
+
+public class JobGenFactory {
+
+    public static JobGen createJobGen(Plan planChoice, PregelixJob currentJob) {
+        JobGen jobGen = null;
+        switch (planChoice) {
+            case INNER_JOIN:
+                jobGen = new JobGenInnerJoin(currentJob);
+                break;
+            case OUTER_JOIN:
+                jobGen = new JobGenOuterJoin(currentJob);
+                break;
+            case OUTER_JOIN_SORT:
+                jobGen = new JobGenOuterJoinSort(currentJob);
+                break;
+            case OUTER_JOIN_SINGLE_SORT:
+                jobGen = new JobGenOuterJoinSingleSort(currentJob);
+                break;
+            default:
+                jobGen = new JobGenInnerJoin(currentJob);
+        }
+        return jobGen;
+    }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 1bad401..7bdb069 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -96,6 +96,10 @@
     public JobGenInnerJoin(PregelixJob job) {
         super(job);
     }
+    
+    public JobGenInnerJoin(PregelixJob job, String jobId) {
+        super(job, jobId);
+    }
 
     protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index d01c069..68e6706 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -72,6 +72,10 @@
     public JobGenOuterJoin(PregelixJob job) {
         super(job);
     }
+    
+    public JobGenOuterJoin(PregelixJob job, String jobId) {
+        super(job, jobId);
+    }
 
     @Override
     protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 4480b97..3e4b213 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -72,6 +72,10 @@
     public JobGenOuterJoinSingleSort(PregelixJob job) {
         super(job);
     }
+    
+    public JobGenOuterJoinSingleSort(PregelixJob job, String jobId) {
+        super(job, jobId);
+    }
 
     @Override
     protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 89fbdcd..5c1a4b8 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -256,6 +256,10 @@
         return locations;
     }
 
+    public static String[] getNCNames() {
+        return NCs;
+    }
+
     public static void addToBlackListNodes(Collection<String> nodes) {
         blackListNodes.addAll(nodes);
     }
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
new file mode 100644
index 0000000..01fc81b
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.optimizer;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.pregelix.core.jobgen.JobGen;
+
+public class DynamicOptimizer implements IOptimizer {
+
+    @Override
+    public JobGen optimize(ICounterContext counterContext, JobGen jobGen, int iteration) {
+        return jobGen;
+    }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
new file mode 100644
index 0000000..b5913c4
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.optimizer;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.pregelix.core.jobgen.JobGen;
+
+public interface IOptimizer {
+
+    public JobGen optimize(ICounterContext counterContext, JobGen jobGen, int iteration);
+    
+}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java
new file mode 100644
index 0000000..ae9463f
--- /dev/null
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.function;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+
+@SuppressWarnings("rawtypes")
+public class ExtractLiveVertexIdFunctionFactory implements IUpdateFunctionFactory {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IUpdateFunction createFunction() {
+        return new IUpdateFunction() {
+            // for writing intermediate data
+            private final ArrayTupleBuilder alive = new ArrayTupleBuilder(2);
+
+            // for writing out to alive message channel
+            private IFrameWriter writerAlive;
+            private FrameTupleAppender appenderAlive;
+            private ByteBuffer bufferAlive;
+
+            private MsgList dummyMessageList = new MsgList();
+            private Vertex vertex;
+
+            @Override
+            public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
+                    throws HyracksDataException {
+                this.writerAlive = writers[0];
+                this.bufferAlive = ctx.allocateFrame();
+                this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderAlive.reset(bufferAlive, true);
+            }
+
+            @Override
+            public void process(Object[] tuple) throws HyracksDataException {
+                try {
+                    // vertex Id, vertex
+                    alive.reset();
+                    vertex = (Vertex) tuple[1];
+                    if (!vertex.isHalted()) {
+                        alive.reset();
+                        DataOutput outputAlive = alive.getDataOutput();
+                        vertex.getVertexId().write(outputAlive);
+                        alive.addFieldEndOffset();
+                        dummyMessageList.write(outputAlive);
+                        alive.addFieldEndOffset();
+                        FrameTupleUtils.flushTuple(appenderAlive, alive, writerAlive);
+                    }
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
+            }
+
+            @Override
+            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+                    throws HyracksDataException {
+
+            }
+        };
+    }
+}