svn merge -r2854:2897 https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization


git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2898 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index a0c821a..7f65bc7 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -97,7 +97,9 @@
             @SuppressWarnings("unchecked")
             @Override
             public void initialize() throws HyracksDataException {
+                ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
                 try {
+                    Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
                     IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
                     writer.open();
                     InputFormat inputFormat = conf.getInputFormat();
@@ -133,6 +135,8 @@
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
         };
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 8a1ba6d..6ece1aa 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -106,6 +106,7 @@
             @SuppressWarnings("unchecked")
             @Override
             public void initialize() throws HyracksDataException {
+                ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
                 try {
                     Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
                     IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
@@ -146,6 +147,8 @@
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
         };
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 1b6f195..f07a246 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -72,22 +72,23 @@
     public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
             throws HyracksException {
         applicationName = exampleClass.getSimpleName() + UUID.randomUUID();
-        /** add hadoop configurations */
-        URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
-        job.getConfiguration().addResource(hadoopCore);
-        URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
-        job.getConfiguration().addResource(hadoopMapRed);
-        URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
-        job.getConfiguration().addResource(hadoopHdfs);
-        ClusterConfig.loadClusterConfig(ipAddress, port);
-
-        LOG.info("job started");
-        long start = System.currentTimeMillis();
-        long end = start;
-        long time = 0;
-
-        this.profiling = profiling;
         try {
+            /** add hadoop configurations */
+            URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+            job.getConfiguration().addResource(hadoopCore);
+            URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+            job.getConfiguration().addResource(hadoopMapRed);
+            URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+            job.getConfiguration().addResource(hadoopHdfs);
+            ClusterConfig.loadClusterConfig(ipAddress, port);
+
+            LOG.info("job started");
+            long start = System.currentTimeMillis();
+            long end = start;
+            long time = 0;
+
+            this.profiling = profiling;
+
             switch (planChoice) {
                 case INNER_JOIN:
                     jobGen = new JobGenInnerJoin(job);
@@ -146,6 +147,16 @@
             LOG.info("result writing finished " + time + "ms");
             LOG.info("job finished");
         } catch (Exception e) {
+            try {
+                /**
+                 * destroy application if there is any exception
+                 */
+                if (hcc != null) {
+                    destroyApplication(applicationName);
+                }
+            } catch (Exception e2) {
+                throw new HyracksException(e2);
+            }
             throw new HyracksException(e);
         }
     }
@@ -224,8 +235,8 @@
         LOG.info("jar deployment finished " + (end - start) + "ms");
     }
 
-    public void destroyApplication(String jarFile) throws Exception {
-        hcc.destroyApplication(applicationName);
+    public void destroyApplication(String appName) throws Exception {
+        hcc.destroyApplication(appName);
     }
 
 }
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index 02e1625..b6d4da7 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -54,6 +54,7 @@
     public static final String ITERATIONS = "HyracksPageRankVertex.iteration";
     private DoubleWritable outputValue = new DoubleWritable();
     private DoubleWritable tmpVertexValue = new DoubleWritable();
+    private int maxIteration = -1;
 
     /**
      * Test whether combiner is called by summing up the messages.
@@ -97,7 +98,9 @@
 
     @Override
     public void compute(Iterator<DoubleWritable> msgIterator) {
-        int maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 10);
+        if (maxIteration < 0) {
+            maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 10);
+        }
         if (getSuperstep() == 1) {
             tmpVertexValue.set(1.0 / getNumVertices());
             setVertexValue(tmpVertexValue);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
index daafc82..2792d88 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
@@ -85,6 +85,7 @@
     }
 
     private ByteWritable tmpVertexValue = new ByteWritable();
+    private long sourceId = -1;
 
     /** The source vertex id */
     public static final String SOURCE_ID = "ReachibilityVertex.sourceId";
@@ -101,7 +102,7 @@
      * @return True if the source id
      */
     private boolean isSource(VLongWritable v) {
-        return (v.get() == getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT));
+        return (v.get() == sourceId);
     }
 
     /**
@@ -116,6 +117,9 @@
     @Override
     public void compute(Iterator<ByteWritable> msgIterator) {
         if (getSuperstep() == 1) {
+            if (sourceId < 0) {
+                sourceId = getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
+            }
             boolean isSource = isSource(getVertexId());
             if (isSource) {
                 tmpVertexValue.set((byte) 1);
@@ -160,7 +164,7 @@
         }
         voteToHalt();
     }
-    
+
     @Override
     public String toString() {
         return getVertexId() + " " + getVertexValue();
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
index 7a5bba6..5a556fa 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
@@ -38,7 +38,7 @@
 
 public class RunJobTestCase extends TestCase {
     private static final String NC1 = "nc1";
-    private static final String HYRACKS_APP_NAME = "giraph";
+    private static final String HYRACKS_APP_NAME = "pregelix";
     private static String HDFS_INPUTPATH = "/webmap";
     private static String HDFS_OUTPUTPAH = "/result";
 
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
index b5abd94..79a5c3c 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
@@ -41,168 +41,176 @@
 
 @SuppressWarnings("deprecation")
 public class RunJobTestSuite extends TestSuite {
-    private static final Logger LOGGER = Logger.getLogger(RunJobTestSuite.class.getName());
+	private static final Logger LOGGER = Logger.getLogger(RunJobTestSuite.class
+			.getName());
 
-    private static final String ACTUAL_RESULT_DIR = "actual";
-    private static final String EXPECTED_RESULT_DIR = "src/test/resources/expected";
-    private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
-    private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
-    private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
-    private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
-    private static final String PATH_TO_IGNORE = "src/test/resources/ignore.txt";
-    private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
-    private static final String FILE_EXTENSION_OF_RESULTS = "result";
+	private static final String ACTUAL_RESULT_DIR = "actual";
+	private static final String EXPECTED_RESULT_DIR = "src/test/resources/expected";
+	private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+	private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+	private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+	private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
+	private static final String PATH_TO_IGNORE = "src/test/resources/ignore.txt";
+	private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
+	private static final String FILE_EXTENSION_OF_RESULTS = "result";
 
-    private static final String DATA_PATH = "data/webmap/webmap_link.txt";
-    private static final String HDFS_PATH = "/webmap/";
+	private static final String DATA_PATH = "data/webmap/webmap_link.txt";
+	private static final String HDFS_PATH = "/webmap/";
 
-    private static final String DATA_PATH2 = "data/webmapcomplex/webmap_link.txt";
-    private static final String HDFS_PATH2 = "/webmapcomplex/";
+	private static final String DATA_PATH2 = "data/webmapcomplex/webmap_link.txt";
+	private static final String HDFS_PATH2 = "/webmapcomplex/";
 
-    private static final String DATA_PATH3 = "data/clique/clique.txt";
-    private static final String HDFS_PATH3 = "/clique/";
+	private static final String DATA_PATH3 = "data/clique/clique.txt";
+	private static final String HDFS_PATH3 = "/clique/";
 
-    private static final String HYRACKS_APP_NAME = "giraph";
-    private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
-    private MiniDFSCluster dfsCluster;
+	private static final String HYRACKS_APP_NAME = "pregelix";
+	private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
+			+ File.separator + "conf.xml";
+	private MiniDFSCluster dfsCluster;
 
-    private JobConf conf = new JobConf();
-    private int numberOfNC = 2;
+	private JobConf conf = new JobConf();
+	private int numberOfNC = 2;
 
-    public void setUp() throws Exception {
-        ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
-        ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
-        cleanupStores();
-        PregelixHyracksIntegrationUtil.init();
-        PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
-        LOGGER.info("Hyracks mini-cluster started");
-        FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
-        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
-        startHDFS();
-    }
+	public void setUp() throws Exception {
+		ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+		ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+		cleanupStores();
+		PregelixHyracksIntegrationUtil.init();
+		PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
+		LOGGER.info("Hyracks mini-cluster started");
+		FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+		FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+		startHDFS();
+	}
 
-    private void cleanupStores() throws IOException {
-        FileUtils.forceMkdir(new File("teststore"));
-        FileUtils.forceMkdir(new File("build"));
-        FileUtils.cleanDirectory(new File("teststore"));
-        FileUtils.cleanDirectory(new File("build"));
-    }
+	private void cleanupStores() throws IOException {
+		FileUtils.forceMkdir(new File("teststore"));
+		FileUtils.forceMkdir(new File("build"));
+		FileUtils.cleanDirectory(new File("teststore"));
+		FileUtils.cleanDirectory(new File("build"));
+	}
 
-    private void startHDFS() throws IOException {
-        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
-        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
-        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
-        FileSystem lfs = FileSystem.getLocal(new Configuration());
-        lfs.delete(new Path("build"), true);
-        System.setProperty("hadoop.log.dir", "logs");
-        dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
-        FileSystem dfs = FileSystem.get(conf);
-        Path src = new Path(DATA_PATH);
-        Path dest = new Path(HDFS_PATH);
-        dfs.mkdirs(dest);
-        dfs.copyFromLocalFile(src, dest);
+	private void startHDFS() throws IOException {
+		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+		FileSystem lfs = FileSystem.getLocal(new Configuration());
+		lfs.delete(new Path("build"), true);
+		System.setProperty("hadoop.log.dir", "logs");
+		dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+		FileSystem dfs = FileSystem.get(conf);
+		Path src = new Path(DATA_PATH);
+		Path dest = new Path(HDFS_PATH);
+		dfs.mkdirs(dest);
+		dfs.copyFromLocalFile(src, dest);
 
-        src = new Path(DATA_PATH2);
-        dest = new Path(HDFS_PATH2);
-        dfs.mkdirs(dest);
-        dfs.copyFromLocalFile(src, dest);
+		src = new Path(DATA_PATH2);
+		dest = new Path(HDFS_PATH2);
+		dfs.mkdirs(dest);
+		dfs.copyFromLocalFile(src, dest);
 
-        src = new Path(DATA_PATH3);
-        dest = new Path(HDFS_PATH3);
-        dfs.mkdirs(dest);
-        dfs.copyFromLocalFile(src, dest);
+		src = new Path(DATA_PATH3);
+		dest = new Path(HDFS_PATH3);
+		dfs.mkdirs(dest);
+		dfs.copyFromLocalFile(src, dest);
 
-        DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
-        conf.writeXml(confOutput);
-        confOutput.flush();
-        confOutput.close();
-    }
+		DataOutputStream confOutput = new DataOutputStream(
+				new FileOutputStream(new File(HADOOP_CONF_PATH)));
+		conf.writeXml(confOutput);
+		confOutput.flush();
+		confOutput.close();
+	}
 
-    /**
-     * cleanup hdfs cluster
-     */
-    private void cleanupHDFS() throws Exception {
-        dfsCluster.shutdown();
-    }
+	/**
+	 * cleanup hdfs cluster
+	 */
+	private void cleanupHDFS() throws Exception {
+		dfsCluster.shutdown();
+	}
 
-    public void tearDown() throws Exception {
-        PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
-        PregelixHyracksIntegrationUtil.deinit();
-        LOGGER.info("Hyracks mini-cluster shut down");
-        cleanupHDFS();
-    }
+	public void tearDown() throws Exception {
+		PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
+		PregelixHyracksIntegrationUtil.deinit();
+		LOGGER.info("Hyracks mini-cluster shut down");
+		cleanupHDFS();
+	}
 
-    public static Test suite() throws Exception {
-        List<String> ignores = getFileList(PATH_TO_IGNORE);
-        List<String> onlys = getFileList(PATH_TO_ONLY);
-        File testData = new File(PATH_TO_JOBS);
-        File[] queries = testData.listFiles();
-        RunJobTestSuite testSuite = new RunJobTestSuite();
-        testSuite.setUp();
-        boolean onlyEnabled = false;
+	public static Test suite() throws Exception {
+		List<String> ignores = getFileList(PATH_TO_IGNORE);
+		List<String> onlys = getFileList(PATH_TO_ONLY);
+		File testData = new File(PATH_TO_JOBS);
+		File[] queries = testData.listFiles();
+		RunJobTestSuite testSuite = new RunJobTestSuite();
+		testSuite.setUp();
+		boolean onlyEnabled = false;
 
-        if (onlys.size() > 0) {
-            onlyEnabled = true;
-        }
-        for (File qFile : queries) {
-            if (isInList(ignores, qFile.getName()))
-                continue;
+		if (onlys.size() > 0) {
+			onlyEnabled = true;
+		}
+		for (File qFile : queries) {
+			if (isInList(ignores, qFile.getName()))
+				continue;
 
-            if (qFile.isFile()) {
-                if (onlyEnabled && !isInList(onlys, qFile.getName())) {
-                    continue;
-                } else {
-                    String resultFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName());
-                    String expectedFileName = EXPECTED_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName());
-                    testSuite.addTest(new RunJobTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile.getAbsolutePath()
-                            .toString(), resultFileName, expectedFileName));
-                }
-            }
-        }
-        return testSuite;
-    }
+			if (qFile.isFile()) {
+				if (onlyEnabled && !isInList(onlys, qFile.getName())) {
+					continue;
+				} else {
+					String resultFileName = ACTUAL_RESULT_DIR + File.separator
+							+ jobExtToResExt(qFile.getName());
+					String expectedFileName = EXPECTED_RESULT_DIR
+							+ File.separator + jobExtToResExt(qFile.getName());
+					testSuite.addTest(new RunJobTestCase(HADOOP_CONF_PATH,
+							qFile.getName(),
+							qFile.getAbsolutePath().toString(), resultFileName,
+							expectedFileName));
+				}
+			}
+		}
+		return testSuite;
+	}
 
-    /**
-     * Runs the tests and collects their result in a TestResult.
-     */
-    @Override
-    public void run(TestResult result) {
-        try {
-            int testCount = countTestCases();
-            for (int i = 0; i < testCount; i++) {
-                // cleanupStores();
-                Test each = this.testAt(i);
-                if (result.shouldStop())
-                    break;
-                runTest(each, result);
-            }
-            tearDown();
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
+	/**
+	 * Runs the tests and collects their result in a TestResult.
+	 */
+	@Override
+	public void run(TestResult result) {
+		try {
+			int testCount = countTestCases();
+			for (int i = 0; i < testCount; i++) {
+				// cleanupStores();
+				Test each = this.testAt(i);
+				if (result.shouldStop())
+					break;
+				runTest(each, result);
+			}
+			tearDown();
+		} catch (Exception e) {
+			throw new IllegalStateException(e);
+		}
+	}
 
-    protected static List<String> getFileList(String ignorePath) throws FileNotFoundException, IOException {
-        BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
-        String s = null;
-        List<String> ignores = new ArrayList<String>();
-        while ((s = reader.readLine()) != null) {
-            ignores.add(s);
-        }
-        reader.close();
-        return ignores;
-    }
+	protected static List<String> getFileList(String ignorePath)
+			throws FileNotFoundException, IOException {
+		BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+		String s = null;
+		List<String> ignores = new ArrayList<String>();
+		while ((s = reader.readLine()) != null) {
+			ignores.add(s);
+		}
+		reader.close();
+		return ignores;
+	}
 
-    private static String jobExtToResExt(String fname) {
-        int dot = fname.lastIndexOf('.');
-        return fname.substring(0, dot + 1) + FILE_EXTENSION_OF_RESULTS;
-    }
+	private static String jobExtToResExt(String fname) {
+		int dot = fname.lastIndexOf('.');
+		return fname.substring(0, dot + 1) + FILE_EXTENSION_OF_RESULTS;
+	}
 
-    private static boolean isInList(List<String> onlys, String name) {
-        for (String only : onlys)
-            if (name.indexOf(only) >= 0)
-                return true;
-        return false;
-    }
+	private static boolean isInList(List<String> onlys, String name) {
+		for (String only : onlys)
+			if (name.indexOf(only) >= 0)
+				return true;
+		return false;
+	}
 
 }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
index e7caeaf..76c725e 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
@@ -34,7 +34,7 @@
 
     @Override
     public void stop() throws Exception {
-        LOGGER.info("Stopping Giraph NC Bootstrap");
+        LOGGER.info("Stopping NC Bootstrap");
         RuntimeContext rCtx = (RuntimeContext) appCtx.getApplicationObject();
         rCtx.close();
     }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 1b8fce4..a0dca3d 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -104,11 +104,13 @@
             private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
             private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
             private Configuration conf;
+            private boolean dynamicStateLength;
 
             @Override
             public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
                     throws HyracksDataException {
                 this.conf = confFactory.createConfiguration();
+                this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
                 this.aggregator = BspUtils.createGlobalAggregator(conf);
                 this.aggregator.init();
 
@@ -241,8 +243,8 @@
             public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
                 try {
                     if (vertex != null && vertex.hasUpdate()) {
-                        if (!BspUtils.getDynamicVertexValueSize(conf)) {
-                            //in-place update
+                        if (!dynamicStateLength) {
+                            // in-place update
                             int fieldCount = tupleRef.getFieldCount();
                             for (int i = 1; i < fieldCount; i++) {
                                 byte[] data = tupleRef.getFieldData(i);
@@ -251,12 +253,12 @@
                                 vertex.write(output);
                             }
                         } else {
-                            //write the vertex id
+                            // write the vertex id
                             DataOutput tbOutput = cloneUpdateTb.getDataOutput();
                             vertex.getVertexId().write(tbOutput);
                             cloneUpdateTb.addFieldEndOffset();
 
-                            //write the vertex value
+                            // write the vertex value
                             vertex.write(tbOutput);
                             cloneUpdateTb.addFieldEndOffset();
                         }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index a4d54c8..3d8a355 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -79,7 +79,7 @@
             private ByteBuffer bufferGlobalAggregate;
             private GlobalAggregator aggregator;
 
-            //for writing out the global aggregate
+            // for writing out the global aggregate
             private IFrameWriter writerTerminate;
             private FrameTupleAppender appenderTerminate;
             private ByteBuffer bufferTerminate;
@@ -107,11 +107,13 @@
             private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
             private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
             private Configuration conf;
+            private boolean dynamicStateLength;
 
             @Override
             public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
                     throws HyracksDataException {
                 this.conf = confFactory.createConfiguration();
+                this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
                 this.aggregator = BspUtils.createGlobalAggregator(conf);
                 this.aggregator.init();
 
@@ -215,7 +217,8 @@
             private void writeOutGlobalAggregate() throws HyracksDataException {
                 try {
                     /**
-                     * get partial aggregate result and flush to the final aggregator
+                     * get partial aggregate result and flush to the final
+                     * aggregator
                      */
                     Writable agg = aggregator.finishPartial();
                     agg.write(tbGlobalAggregate.getDataOutput());
@@ -244,8 +247,8 @@
             public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
                 try {
                     if (vertex != null && vertex.hasUpdate()) {
-                        if (!BspUtils.getDynamicVertexValueSize(conf)) {
-                            //in-place update
+                        if (!dynamicStateLength) {
+                            // in-place update
                             int fieldCount = tupleRef.getFieldCount();
                             for (int i = 1; i < fieldCount; i++) {
                                 byte[] data = tupleRef.getFieldData(i);
@@ -254,12 +257,12 @@
                                 vertex.write(output);
                             }
                         } else {
-                            //write the vertex id
+                            // write the vertex id
                             DataOutput tbOutput = cloneUpdateTb.getDataOutput();
                             vertex.getVertexId().write(tbOutput);
                             cloneUpdateTb.addFieldEndOffset();
 
-                            //write the vertex value
+                            // write the vertex value
                             vertex.write(tbOutput);
                             cloneUpdateTb.addFieldEndOffset();
                         }