add job concatenation support
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java
index bc6c0cf..c72f392 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.pregelix.core.base;
 
+import java.util.List;
+
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 
@@ -29,6 +31,11 @@
 
     public void runJob(PregelixJob job, String ipAddress, int port) throws HyracksException;
 
+    public void runJobs(List<PregelixJob> jobs, String ipAddress, int port) throws HyracksException;
+
     public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
             throws HyracksException;
+
+    public void runJobs(List<PregelixJob> jobs, Plan planChoice, String ipAddress, int port, boolean profiling)
+            throws HyracksException;
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 5325397..5c8cb33 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -17,15 +17,19 @@
 
 import java.io.File;
 import java.io.FilenameFilter;
+import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -36,6 +40,7 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.base.IDriver;
 import edu.uci.ics.pregelix.core.jobgen.JobGen;
 import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
@@ -48,11 +53,9 @@
 @SuppressWarnings("rawtypes")
 public class Driver implements IDriver {
     private static final Log LOG = LogFactory.getLog(Driver.class);
-    private JobGen jobGen;
-    private boolean profiling;
-
     private IHyracksClientConnection hcc;
     private Class exampleClass;
+    private boolean profiling = false;
 
     public Driver(Class exampleClass) {
         this.exampleClass = exampleClass;
@@ -64,86 +67,57 @@
     }
 
     @Override
+    public void runJobs(List<PregelixJob> jobs, String ipAddress, int port) throws HyracksException {
+        runJobs(jobs, Plan.OUTER_JOIN, ipAddress, port, false);
+    }
+
+    @Override
     public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
             throws HyracksException {
+        runJobs(Collections.singletonList(job), planChoice, ipAddress, port, profiling);
+    }
+
+    @Override
+    public void runJobs(List<PregelixJob> jobs, Plan planChoice, String ipAddress, int port, boolean profiling)
+            throws HyracksException {
         try {
-            /** add hadoop configurations */
-            URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
-            if (hadoopCore != null) {
-                job.getConfiguration().addResource(hadoopCore);
+            if (jobs.size() <= 0) {
+                throw new HyracksException("Please submit at least one job for execution!");
             }
-            URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
-            if (hadoopMapRed != null) {
-                job.getConfiguration().addResource(hadoopMapRed);
-            }
-            URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
-            if (hadoopHdfs != null) {
-                job.getConfiguration().addResource(hadoopHdfs);
-            }
-            ClusterConfig.loadClusterConfig(ipAddress, port);
-
-            LOG.info("job started");
-            long start = System.currentTimeMillis();
-            long end = start;
-            long time = 0;
-
             this.profiling = profiling;
+            PregelixJob currentJob = jobs.get(0);
+            PregelixJob lastJob = currentJob;
+            JobGen jobGen = null;
 
-            switch (planChoice) {
-                case INNER_JOIN:
-                    jobGen = new JobGenInnerJoin(job);
-                    break;
-                case OUTER_JOIN:
-                    jobGen = new JobGenOuterJoin(job);
-                    break;
-                case OUTER_JOIN_SORT:
-                    jobGen = new JobGenOuterJoinSort(job);
-                    break;
-                case OUTER_JOIN_SINGLE_SORT:
-                    jobGen = new JobGenOuterJoinSingleSort(job);
-                    break;
-                default:
-                    jobGen = new JobGenInnerJoin(job);
+            /** prepare job -- deploy jars */
+            DeploymentId deploymentId = prepareJobs(ipAddress, port);
+            LOG.info("job started");
+
+            for (int i = 0; i < jobs.size(); i++) {
+                lastJob = currentJob;
+                currentJob = jobs.get(i);
+
+                /** add hadoop configurations */
+                addHadoopConfiguration(currentJob, ipAddress, port);
+
+                /** load the data */
+                if (i == 0 || compatible(lastJob, currentJob)) {
+                    if (i != 0) {
+                        finishJobs(jobGen, deploymentId);
+                    }
+                    jobGen = selectJobGen(planChoice, currentJob);
+                    loadData(currentJob, jobGen, deploymentId);
+                } else {
+                    jobGen.reset(currentJob);
+                }
+
+                /** run loop-body jobs */
+                runLoopBody(deploymentId, currentJob, jobGen);
+                runClearState(deploymentId, jobGen);
             }
 
-            if (hcc == null)
-                hcc = new HyracksConnection(ipAddress, port);
-
-            URLClassLoader classLoader = (URLClassLoader) exampleClass.getClassLoader();
-            List<File> jars = new ArrayList<File>();
-            URL[] urls = classLoader.getURLs();
-            for (URL url : urls)
-                if (url.toString().endsWith(".jar"))
-                    jars.add(new File(url.getPath()));
-            DeploymentId deploymentId = installApplication(jars);
-
-            start = System.currentTimeMillis();
-            FileSystem dfs = FileSystem.get(job.getConfiguration());
-            dfs.delete(FileOutputFormat.getOutputPath(job), true);
-            runCreate(deploymentId, jobGen);
-            runDataLoad(deploymentId, jobGen);
-            end = System.currentTimeMillis();
-            time = end - start;
-            LOG.info("data loading finished " + time + "ms");
-            int i = 1;
-            boolean terminate = false;
-            do {
-                start = System.currentTimeMillis();
-                runLoopBodyIteration(deploymentId, jobGen, i);
-                end = System.currentTimeMillis();
-                time = end - start;
-                LOG.info("iteration " + i + " finished " + time + "ms");
-                terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId())
-                        || IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
-                i++;
-            } while (!terminate);
-
-            start = System.currentTimeMillis();
-            runHDFSWRite(deploymentId, jobGen);
-            runCleanup(deploymentId, jobGen);
-            end = System.currentTimeMillis();
-            time = end - start;
-            LOG.info("result writing finished " + time + "ms");
+            /** finish the jobs */
+            finishJobs(jobGen, deploymentId);
             hcc.unDeployBinary(deploymentId);
             LOG.info("job finished");
         } catch (Exception e) {
@@ -151,6 +125,122 @@
         }
     }
 
+    private boolean compatible(PregelixJob lastJob, PregelixJob currentJob) {
+        Class lastVertexIdClass = BspUtils.getVertexIndexClass(lastJob.getConfiguration());
+        Class lastVertexValueClass = BspUtils.getVertexValueClass(lastJob.getConfiguration());
+        Class lastEdgeValueClass = BspUtils.getEdgeValueClass(lastJob.getConfiguration());
+        Path lastOutputPath = FileOutputFormat.getOutputPath(lastJob);
+
+        Class currentVertexIdClass = BspUtils.getVertexIndexClass(currentJob.getConfiguration());
+        Class currentVertexValueClass = BspUtils.getVertexValueClass(currentJob.getConfiguration());
+        Class currentEdegeValueClass = BspUtils.getEdgeValueClass(currentJob.getConfiguration());
+        Path[] currentInputPaths = FileInputFormat.getInputPaths(currentJob);
+
+        return lastVertexIdClass.equals(currentVertexIdClass)
+                && lastVertexValueClass.equals(currentVertexValueClass)
+                && lastEdgeValueClass.equals(currentEdegeValueClass)
+                && (currentInputPaths.length == 0 || (currentInputPaths.length == 1 && lastOutputPath
+                        .equals(currentInputPaths[0])));
+    }
+
+    private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob) {
+        JobGen jobGen;
+        switch (planChoice) {
+            case INNER_JOIN:
+                jobGen = new JobGenInnerJoin(currentJob);
+                break;
+            case OUTER_JOIN:
+                jobGen = new JobGenOuterJoin(currentJob);
+                break;
+            case OUTER_JOIN_SORT:
+                jobGen = new JobGenOuterJoinSort(currentJob);
+                break;
+            case OUTER_JOIN_SINGLE_SORT:
+                jobGen = new JobGenOuterJoinSingleSort(currentJob);
+                break;
+            default:
+                jobGen = new JobGenInnerJoin(currentJob);
+        }
+        return jobGen;
+    }
+
+    private long loadData(PregelixJob currentJob, JobGen jobGen, DeploymentId deploymentId) throws IOException,
+            Exception {
+        long start;
+        long end;
+        long time;
+        start = System.currentTimeMillis();
+        FileSystem dfs = FileSystem.get(currentJob.getConfiguration());
+        Path outputPath = FileOutputFormat.getOutputPath(currentJob);
+        if (outputPath != null) {
+            dfs.delete(outputPath, true);
+        }
+        runCreate(deploymentId, jobGen);
+        runDataLoad(deploymentId, jobGen);
+        end = System.currentTimeMillis();
+        time = end - start;
+        LOG.info("data loading finished " + time + "ms");
+        return time;
+    }
+
+    private void finishJobs(JobGen jobGen, DeploymentId deploymentId) throws Exception {
+        long start;
+        long end;
+        long time;
+        start = System.currentTimeMillis();
+        runHDFSWRite(deploymentId, jobGen);
+        runCleanup(deploymentId, jobGen);
+        end = System.currentTimeMillis();
+        time = end - start;
+        LOG.info("result writing finished " + time + "ms");
+    }
+
+    private DeploymentId prepareJobs(String ipAddress, int port) throws Exception {
+        if (hcc == null)
+            hcc = new HyracksConnection(ipAddress, port);
+
+        URLClassLoader classLoader = (URLClassLoader) exampleClass.getClassLoader();
+        List<File> jars = new ArrayList<File>();
+        URL[] urls = classLoader.getURLs();
+        for (URL url : urls)
+            if (url.toString().endsWith(".jar"))
+                jars.add(new File(url.getPath()));
+        DeploymentId deploymentId = installApplication(jars);
+        return deploymentId;
+    }
+
+    private void addHadoopConfiguration(PregelixJob job, String ipAddress, int port) throws HyracksException {
+        URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+        if (hadoopCore != null) {
+            job.getConfiguration().addResource(hadoopCore);
+        }
+        URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+        if (hadoopMapRed != null) {
+            job.getConfiguration().addResource(hadoopMapRed);
+        }
+        URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+        if (hadoopHdfs != null) {
+            job.getConfiguration().addResource(hadoopHdfs);
+        }
+        ClusterConfig.loadClusterConfig(ipAddress, port);
+    }
+
+    private void runLoopBody(DeploymentId deploymentId, PregelixJob job, JobGen jobGen) throws Exception {
+        int i = 1;
+        boolean terminate = false;
+        long start, end, time;
+        do {
+            start = System.currentTimeMillis();
+            runLoopBodyIteration(deploymentId, jobGen, i);
+            end = System.currentTimeMillis();
+            time = end - start;
+            LOG.info("iteration " + i + " finished " + time + "ms");
+            terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId())
+                    || IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
+            i++;
+        } while (!terminate);
+    }
+
     private void runCreate(DeploymentId deploymentId, JobGen jobGen) throws Exception {
         try {
             JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
@@ -196,6 +286,15 @@
         }
     }
 
+    private void runClearState(DeploymentId deploymentId, JobGen jobGen) throws Exception {
+        try {
+            JobSpecification clear = jobGen.generateClearState();
+            execute(deploymentId, clear);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
     private void runJobArray(DeploymentId deploymentId, JobSpecification[] jobs) throws Exception {
         for (JobSpecification job : jobs) {
             execute(deploymentId, job);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 42359eb..931ecc3 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -79,6 +79,7 @@
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
 import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.ClearStateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
@@ -99,8 +100,8 @@
     protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
     protected static final int tableSize = 10485767;
     protected static final String PRIMARY_INDEX = "primary";
-    protected final Configuration conf;
-    protected final PregelixJob giraphJob;
+    protected Configuration conf;
+    protected PregelixJob pregelixJob;
     protected IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
     protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
     protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
@@ -111,9 +112,9 @@
     protected static final String SECONDARY_INDEX_EVEN = "secondary2";
 
     public JobGen(PregelixJob job) {
-        this.conf = job.getConfiguration();
-        this.giraphJob = job;
-        this.initJobConfiguration();
+        conf = job.getConfiguration();
+        pregelixJob = job;
+        initJobConfiguration();
         job.setJobId(jobId);
 
         // set the frame size to be the one user specified if the user did
@@ -128,6 +129,13 @@
         }
     }
 
+    public void reset(PregelixJob job) {
+        conf = job.getConfiguration();
+        pregelixJob = job;
+        initJobConfiguration();
+        job.setJobId(jobId);
+    }
+
     @SuppressWarnings({ "rawtypes", "unchecked" })
     private void initJobConfiguration() {
         Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS, Vertex.class);
@@ -202,7 +210,7 @@
         VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
         List<InputSplit> splits = new ArrayList<InputSplit>();
         try {
-            splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+            splits = inputFormat.getSplits(pregelixJob, fileSplitProvider.getFileSplits().length);
             LOGGER.info("number of splits: " + splits.size());
             for (InputSplit split : splits)
                 LOGGER.info(split.toString());
@@ -280,7 +288,7 @@
         VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
         List<InputSplit> splits = new ArrayList<InputSplit>();
         try {
-            splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+            splits = inputFormat.getSplits(pregelixJob, fileSplitProvider.getFileSplits().length);
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
@@ -457,6 +465,17 @@
     }
 
     /***
+     * generate a "clear state" job
+     */
+    public JobSpecification generateClearState() throws HyracksException {
+        JobSpecification spec = new JobSpecification();
+        ClearStateOperatorDescriptor clearState = new ClearStateOperatorDescriptor(spec, jobId);
+        ClusterConfig.setLocationConstraint(spec, clearState);
+        spec.addRoot(clearState);
+        return spec;
+    }
+
+    /***
      * drop the sindex
      * 
      * @return JobSpecification
@@ -494,7 +513,7 @@
                     NoOpIOOperationCallback.INSTANCE, 0.01);
         } else {
             return new BTreeDataflowHelperFactory();
-       }
+        }
     }
 
     /** generate non-first iteration job */
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ClearStateOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ClearStateOperatorDescriptor.java
new file mode 100644
index 0000000..d86557b
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ClearStateOperatorDescriptor.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+/**
+ * Clear the state of the RuntimeContext in one slave
+ * 
+ * @author yingyib
+ */
+public class ClearStateOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private String jobId;
+
+    public ClearStateOperatorDescriptor(JobSpecification spec, String jobId) {
+        super(spec, 0, 0);
+        this.jobId = jobId;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return new IOperatorNodePushable() {
+
+            @Override
+            public void initialize() throws HyracksDataException {
+                RuntimeContext context = (RuntimeContext) ctx.getJobletContext().getApplicationContext()
+                        .getApplicationObject();
+                context.clearState(jobId);
+            }
+
+            @Override
+            public void deinitialize() throws HyracksDataException {
+
+            }
+
+            @Override
+            public int getInputArity() {
+                return 0;
+            }
+
+            @Override
+            public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
+                    throws HyracksDataException {
+
+            }
+
+            @Override
+            public IFrameWriter getInputFrameWriter(int index) {
+                return null;
+            }
+
+            @Override
+            public String getDisplayName() {
+                return "Clear State Operator";
+            }
+
+        };
+    }
+
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index e25a46a..496d066 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -56,11 +56,12 @@
     private final IBufferCache bufferCache;
     private final IVirtualBufferCache vBufferCache;
     private final IFileMapManager fileMapManager;
-    private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
-    private final Map<String, Long> giraphJobIdToSuperStep = new ConcurrentHashMap<String, Long>();
-    private final Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
     private final IOManager ioManager;
     private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
+    private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
+    private final Map<String, Long> jobIdToSuperStep = new ConcurrentHashMap<String, Long>();
+    private final Map<String, Boolean> jobIdToMove = new ConcurrentHashMap<String, Boolean>();
+    
     private final ThreadFactory threadFactory = new ThreadFactory() {
         public Thread newThread(Runnable r) {
             return new Thread(r);
@@ -100,6 +101,18 @@
         System.gc();
     }
 
+    public void clearState(String jobId) throws HyracksDataException {
+        for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
+            for (FileReference fileRef : entry.getValue())
+                fileRef.delete();
+
+        iterationToFiles.clear();
+        appStateMap.clear();
+        jobIdToMove.remove(jobId);
+        jobIdToSuperStep.remove(jobId);
+        System.gc();
+    }
+
     public ILocalResourceRepository getLocalResourceRepository() {
         return localResourceRepository;
     }
@@ -132,14 +145,14 @@
         return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
     }
 
-    public synchronized void setVertexProperties(String giraphJobId, long numVertices, long numEdges) {
-        Boolean toMove = giraphJobIdToMove.get(giraphJobId);
+    public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges) {
+        Boolean toMove = jobIdToMove.get(jobId);
         if (toMove == null || toMove == true) {
-            if (giraphJobIdToSuperStep.get(giraphJobId) == null) {
-                giraphJobIdToSuperStep.put(giraphJobId, 0L);
+            if (jobIdToSuperStep.get(jobId) == null) {
+                jobIdToSuperStep.put(jobId, 0L);
             }
 
-            long superStep = giraphJobIdToSuperStep.get(giraphJobId);
+            long superStep = jobIdToSuperStep.get(jobId);
             List<FileReference> files = iterationToFiles.remove(superStep - 1);
             if (files != null) {
                 for (FileReference fileRef : files)
@@ -149,15 +162,15 @@
             Vertex.setSuperstep(++superStep);
             Vertex.setNumVertices(numVertices);
             Vertex.setNumEdges(numEdges);
-            giraphJobIdToSuperStep.put(giraphJobId, superStep);
-            giraphJobIdToMove.put(giraphJobId, false);
+            jobIdToSuperStep.put(jobId, superStep);
+            jobIdToMove.put(jobId, false);
             LOGGER.info("start iteration " + Vertex.getSuperstep());
         }
         System.gc();
     }
 
     public synchronized void endSuperStep(String giraphJobId) {
-        giraphJobIdToMove.put(giraphJobId, true);
+        jobIdToMove.put(giraphJobId, true);
         LOGGER.info("end iteration " + Vertex.getSuperstep());
     }
 
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
index f60387a..f99321a 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
@@ -16,6 +16,7 @@
 package edu.uci.ics.pregelix.example.client;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -74,6 +75,13 @@
         driver.runJob(job, options.planChoice, options.ipAddress, options.port, Boolean.parseBoolean(options.profiling));
     }
 
+    public static void run(String[] args, List<PregelixJob> jobs) throws Exception {
+        Options options = prepareJobs(args, jobs);
+        Driver driver = new Driver(Client.class);
+        driver.runJobs(jobs, options.planChoice, options.ipAddress, options.port,
+                Boolean.parseBoolean(options.profiling));
+    }
+
     private static Options prepareJob(String[] args, PregelixJob job) throws CmdLineException, IOException {
         Options options = new Options();
         CmdLineParser parser = new CmdLineParser(options);
@@ -84,6 +92,32 @@
         for (int i = 1; i < inputs.length; i++)
             FileInputFormat.addInputPaths(job, inputs[i]);
         FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
+        setJobSpecificSettings(job, options);
+        return options;
+    }
+
+    private static Options prepareJobs(String[] args, List<PregelixJob> jobs) throws CmdLineException, IOException {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        parser.parseArgument(args);
+
+        for (int j = 0; j < jobs.size(); j++) {
+            PregelixJob job = jobs.get(j);
+            String[] inputs = options.inputPaths.split(";");
+            if (j == 0) {
+                FileInputFormat.setInputPaths(job, inputs[0]);
+                for (int i = 1; i < inputs.length; i++)
+                    FileInputFormat.addInputPaths(job, inputs[i]);
+            }
+            if (j == jobs.size() - 1) {
+                FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
+            }
+            setJobSpecificSettings(job, options);
+        }
+        return options;
+    }
+
+    private static void setJobSpecificSettings(PregelixJob job, Options options) {
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, options.numVertices);
         job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
         job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID, options.sourceId);
@@ -91,7 +125,6 @@
         job.getConfiguration().setLong(ReachabilityVertex.DEST_ID, options.destId);
         if (options.numIteration > 0)
             job.getConfiguration().setLong(PageRankVertex.ITERATIONS, options.numIteration);
-        return options;
     }
 
 }
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
index f5116ca..aa0dfdd 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
@@ -31,6 +31,8 @@
 import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
 
 /**
+ * This test case tests the error message propagation.
+ * 
  * @author yingyib
  */
 public class FailureVertexTest {
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
new file mode 100644
index 0000000..0d6f863
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.PageRankVertex;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class JobConcatenationTest {
+
+    private static String INPUTPATH = "data/webmap";
+    private static String OUTPUTPAH = "actual/result";
+    private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal";
+
+    @Test
+    public void test() throws Exception {
+        TestCluster testCluster = new TestCluster();
+
+        try {
+            List<PregelixJob> jobs = new ArrayList<PregelixJob>();
+            PregelixJob job1 = new PregelixJob(PageRankVertex.class.getName());
+            job1.setVertexClass(PageRankVertex.class);
+            job1.setVertexInputFormatClass(TextPageRankInputFormat.class);
+            job1.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+            job1.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+            job1.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+            FileInputFormat.setInputPaths(job1, INPUTPATH);
+            job1.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+
+            PregelixJob job2 = new PregelixJob(PageRankVertex.class.getName());
+            job2.setVertexClass(PageRankVertex.class);
+            job2.setVertexInputFormatClass(TextPageRankInputFormat.class);
+            job2.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+            job2.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+            job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+            FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH));
+            job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+
+            jobs.add(job1);
+            jobs.add(job2);
+
+            testCluster.setUp();
+            Driver driver = new Driver(PageRankVertex.class);
+            driver.runJobs(jobs, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+            TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+        } catch (Exception e) {
+            throw e;
+        } finally {
+            testCluster.tearDown();
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index eba75c9..b4e1dd8 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -179,10 +179,7 @@
                 vertex.setOutputWriters(writers);
                 vertex.setOutputAppenders(appenders);
                 vertex.setOutputTupleBuilders(tbs);
-
-                if (!msgIterator.hasNext() && vertex.isHalted()) {
-                    return;
-                }
+                
                 if (vertex.isHalted()) {
                     vertex.activate();
                 }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
index 99bcac5..cd2012a 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
@@ -26,11 +26,11 @@
 public class PreSuperStepRuntimeHookFactory implements IRuntimeHookFactory {
     private static final long serialVersionUID = 1L;
     private final IConfigurationFactory confFactory;
-    private final String giraphJobId;
+    private final String jobId;
 
-    public PreSuperStepRuntimeHookFactory(String giraphJobId, IConfigurationFactory confFactory) {
+    public PreSuperStepRuntimeHookFactory(String jobId, IConfigurationFactory confFactory) {
         this.confFactory = confFactory;
-        this.giraphJobId = giraphJobId;
+        this.jobId = jobId;
     }
 
     @Override
@@ -40,7 +40,7 @@
             @Override
             public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
                 Configuration conf = confFactory.createConfiguration(ctx);
-                IterationUtils.setProperties(giraphJobId, ctx, conf);
+                IterationUtils.setProperties(jobId, ctx, conf);
             }
 
         };