1. add deployment retry 2. support plan switch
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
index 0724a01..2407c10 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -181,36 +181,44 @@
      * @throws HyracksException
      */
     private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC) throws HyracksException {
-        try {
-            List<URL> downloadedFileURLs = new ArrayList<URL>();
-            File dir = new File(deploymentDir);
-            if (!dir.exists()) {
-                FileUtils.forceMkdir(dir);
-            }
-            for (URL url : urls) {
-                String urlString = url.toString();
-                int slashIndex = urlString.lastIndexOf('/');
-                String fileName = urlString.substring(slashIndex + 1).split("&")[1];
-                String filePath = deploymentDir + File.separator + fileName;
-                File targetFile = new File(filePath);
-                if (isNC) {
-                    HttpClient hc = new DefaultHttpClient();
-                    HttpGet get = new HttpGet(url.toString());
-                    HttpResponse response = hc.execute(get);
-                    InputStream is = response.getEntity().getContent();
-                    OutputStream os = new FileOutputStream(targetFile);
-                    try {
-                        IOUtils.copyLarge(is, os);
-                    } finally {
-                        os.close();
-                        is.close();
-                    }
+        //retry 10 times at maximum for downloading binaries
+        int retryCount = 10;
+        int tried = 0;
+        Exception trace = null;
+        while (tried < retryCount) {
+            try {
+                tried++;
+                List<URL> downloadedFileURLs = new ArrayList<URL>();
+                File dir = new File(deploymentDir);
+                if (!dir.exists()) {
+                    FileUtils.forceMkdir(dir);
                 }
-                downloadedFileURLs.add(targetFile.toURI().toURL());
+                for (URL url : urls) {
+                    String urlString = url.toString();
+                    int slashIndex = urlString.lastIndexOf('/');
+                    String fileName = urlString.substring(slashIndex + 1).split("&")[1];
+                    String filePath = deploymentDir + File.separator + fileName;
+                    File targetFile = new File(filePath);
+                    if (isNC) {
+                        HttpClient hc = new DefaultHttpClient();
+                        HttpGet get = new HttpGet(url.toString());
+                        HttpResponse response = hc.execute(get);
+                        InputStream is = response.getEntity().getContent();
+                        OutputStream os = new FileOutputStream(targetFile);
+                        try {
+                            IOUtils.copyLarge(is, os);
+                        } finally {
+                            os.close();
+                            is.close();
+                        }
+                    }
+                    downloadedFileURLs.add(targetFile.toURI().toURL());
+                }
+                return downloadedFileURLs;
+            } catch (Exception e) {
+                trace = e;
             }
-            return downloadedFileURLs;
-        } catch (Exception e) {
-            throw new HyracksException(e);
         }
+        throw new HyracksException(trace);
     }
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index d6a6f3d..c86e563 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -47,10 +47,7 @@
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.base.IDriver;
 import edu.uci.ics.pregelix.core.jobgen.JobGen;
-import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
+import edu.uci.ics.pregelix.core.jobgen.JobGenFactory;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
 import edu.uci.ics.pregelix.core.util.ExceptionUtilities;
 import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
@@ -180,24 +177,7 @@
     }
 
     private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob) {
-        JobGen jobGen;
-        switch (planChoice) {
-            case INNER_JOIN:
-                jobGen = new JobGenInnerJoin(currentJob);
-                break;
-            case OUTER_JOIN:
-                jobGen = new JobGenOuterJoin(currentJob);
-                break;
-            case OUTER_JOIN_SORT:
-                jobGen = new JobGenOuterJoinSort(currentJob);
-                break;
-            case OUTER_JOIN_SINGLE_SORT:
-                jobGen = new JobGenOuterJoinSingleSort(currentJob);
-                break;
-            default:
-                jobGen = new JobGenInnerJoin(currentJob);
-        }
-        return jobGen;
+        return JobGenFactory.createJobGen(planChoice, currentJob);
     }
 
     private long loadData(PregelixJob currentJob, JobGen jobGen, DeploymentId deploymentId) throws IOException,
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index f703fcc..b9f2d70 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -31,12 +31,14 @@
 import java.util.UUID;
 import java.util.logging.Logger;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -93,6 +95,7 @@
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.api.util.ReflectionUtils;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
 import edu.uci.ics.pregelix.core.base.IJobGen;
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
@@ -110,11 +113,14 @@
 import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeIndexBulkReLoadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
 import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
 import edu.uci.ics.pregelix.runtime.bootstrap.VirtualBufferCacheProvider;
+import edu.uci.ics.pregelix.runtime.function.ExtractLiveVertexIdFunctionFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.RecoveryRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
@@ -131,7 +137,7 @@
     protected PregelixJob pregelixJob;
     protected IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
     protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
-    protected String jobId = UUID.randomUUID().toString();
+    protected String jobId;
     protected int frameSize = ClusterConfig.getFrameSize();
     protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
 
@@ -149,6 +155,10 @@
     }
 
     private void init(PregelixJob job) {
+        jobId = BspUtils.getJobId(job.getConfiguration());
+        if (jobId == null) {
+            jobId = UUID.randomUUID().toString();
+        }
         conf = job.getConfiguration();
         pregelixJob = job;
         initJobConfiguration();
@@ -779,4 +789,88 @@
     /** generate clean-up job */
     public abstract JobSpecification[] generateCleanup() throws HyracksException;
 
+    /**
+     * Switch the plan to a desired one
+     * 
+     * @param iteration
+     *            , the latest completed iteration number
+     * @param plan
+     *            , plan choice
+     * @return the list of jobspecification for preparing plan switch and the new jobgen
+     */
+    public Pair<List<JobSpecification>, JobGen> switchPlan(int iteration, Plan plan) throws HyracksException {
+        /**
+         * bulk-load a live vertex btree
+         */
+        List<JobSpecification> list = new ArrayList<JobSpecification>();
+        list.add(bulkLoadLiveVertexBTree(iteration));
+        JobGen jobGen = new JobGenInnerJoin(pregelixJob);
+        return Pair.of(list, jobGen);
+    }
+
+    /**
+     * Build a jobspec to bulkload the live vertex btree
+     * 
+     * @param iteration
+     * @return the job specification
+     * @throws HyracksException
+     */
+    private JobSpecification bulkLoadLiveVertexBTree(int iteration) throws HyracksException {
+        Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+        Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+        JobSpecification spec = new JobSpecification();
+
+        /**
+         * construct empty tuple operator
+         */
+        EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+        /**
+         * construct btree search and function call update operator
+         */
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+        RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
+                vertexIdClass.getName(), vertexClass.getName());
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
+
+        ITypeTraits[] typeTraits = new ITypeTraits[2];
+        typeTraits[0] = new TypeTraits(false);
+        typeTraits[1] = new TypeTraits(false);
+        IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+        IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+        IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+                conf, vertexIdClass.getName(), vertexClass.getName());
+        RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
+                MsgList.class.getName());
+        TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
+                recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
+                getIndexDataflowHelperFactory(), inputRdFactory, 1, new ExtractLiveVertexIdFunctionFactory(),
+                preHookFactory, null, rdFinal);
+        ClusterConfig.setLocationConstraint(spec, scanner);
+
+        /**
+         * construct bulk-load index operator
+         */
+        IFileSplitProvider secondaryFileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
+        int[] fieldPermutation = new int[] { 0, 1 };
+        int[] keyFields = new int[] { 0 };
+        IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
+        indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
+                WritableComparator.get(vertexIdClass).getClass());
+        TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
+                storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
+                fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
+        ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+        /** connect job spec */
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, btreeBulkLoad, 0);
+        spec.addRoot(btreeBulkLoad);
+
+        return spec;
+    }
+
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
new file mode 100644
index 0000000..ed580de
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.jobgen;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
+
+public class JobGenFactory {
+
+    public static JobGen createJobGen(Plan planChoice, PregelixJob currentJob) {
+        JobGen jobGen = null;
+        switch (planChoice) {
+            case INNER_JOIN:
+                jobGen = new JobGenInnerJoin(currentJob);
+                break;
+            case OUTER_JOIN:
+                jobGen = new JobGenOuterJoin(currentJob);
+                break;
+            case OUTER_JOIN_SORT:
+                jobGen = new JobGenOuterJoinSort(currentJob);
+                break;
+            case OUTER_JOIN_SINGLE_SORT:
+                jobGen = new JobGenOuterJoinSingleSort(currentJob);
+                break;
+            default:
+                jobGen = new JobGenInnerJoin(currentJob);
+        }
+        return jobGen;
+    }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java
new file mode 100644
index 0000000..ae9463f
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.function;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+
+@SuppressWarnings("rawtypes")
+public class ExtractLiveVertexIdFunctionFactory implements IUpdateFunctionFactory {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IUpdateFunction createFunction() {
+        return new IUpdateFunction() {
+            // for writing intermediate data
+            private final ArrayTupleBuilder alive = new ArrayTupleBuilder(2);
+
+            // for writing out to alive message channel
+            private IFrameWriter writerAlive;
+            private FrameTupleAppender appenderAlive;
+            private ByteBuffer bufferAlive;
+
+            private MsgList dummyMessageList = new MsgList();
+            private Vertex vertex;
+
+            @Override
+            public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
+                    throws HyracksDataException {
+                this.writerAlive = writers[0];
+                this.bufferAlive = ctx.allocateFrame();
+                this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderAlive.reset(bufferAlive, true);
+            }
+
+            @Override
+            public void process(Object[] tuple) throws HyracksDataException {
+                try {
+                    // vertex Id, vertex
+                    alive.reset();
+                    vertex = (Vertex) tuple[1];
+                    if (!vertex.isHalted()) {
+                        alive.reset();
+                        DataOutput outputAlive = alive.getDataOutput();
+                        vertex.getVertexId().write(outputAlive);
+                        alive.addFieldEndOffset();
+                        dummyMessageList.write(outputAlive);
+                        alive.addFieldEndOffset();
+                        FrameTupleUtils.flushTuple(appenderAlive, alive, writerAlive);
+                    }
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
+            }
+
+            @Override
+            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+                    throws HyracksDataException {
+
+            }
+        };
+    }
+}