Merged fullstack_asterix_stabilization -r 3157:3163

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_ioc@3165 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix/hivesterix-dist/src/main/resources/scripts/startnc.sh b/hivesterix/hivesterix-dist/src/main/resources/scripts/startnc.sh
index 6e0f90e..23a4c36 100644
--- a/hivesterix/hivesterix-dist/src/main/resources/scripts/startnc.sh
+++ b/hivesterix/hivesterix-dist/src/main/resources/scripts/startnc.sh
@@ -46,4 +46,4 @@
 cd $NCTMP_DIR
 
 #Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/hivesterix/hivesterix-dist/src/main/resources/scripts/stopnc.sh b/hivesterix/hivesterix-dist/src/main/resources/scripts/stopnc.sh
index 03ce4e7..35c4794 100644
--- a/hivesterix/hivesterix-dist/src/main/resources/scripts/stopnc.sh
+++ b/hivesterix/hivesterix-dist/src/main/resources/scripts/stopnc.sh
@@ -5,6 +5,10 @@
 PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 
 if [ "$PID" == "" ]; then
+  PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
   USERID=`id | sed 's/^uid=//;s/(.*$//'`
   PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 fi
diff --git a/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml b/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml
index 4aac091..4710706 100644
--- a/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml
+++ b/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml
@@ -1,12 +1,7 @@
 <cluster-topology>
-    <network-switch name="switch1">
-        <network-switch name="switch2">
-            <terminal name="nc0"/>
-            <terminal name="nc3"/>
-        </network-switch>
-        <network-switch name="switch3">
-            <terminal name="nc1"/>
-            <terminal name="nc4"/>
+    <network-switch name="Global">
+        <network-switch name="local">
+            <terminal name="127.0.0.1"/>
         </network-switch>
     </network-switch>
 </cluster-topology>
\ No newline at end of file
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
index 0d2c78a..5e4e21e 100644
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

 import edu.uci.ics.hyracks.api.job.JobSpecification;

+import edu.uci.ics.hyracks.api.topology.ClusterTopology;

 import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;

 import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;

 

@@ -86,7 +87,8 @@
             JobConf conf = ConfUtil.getJobConf(fileDesc.getInputFileFormatClass(), filePath);

             String[] locConstraints = ConfUtil.getNCs();

             Map<String, NodeControllerInfo> ncNameToNcInfos = ConfUtil.getNodeControllerInfo();

-            Scheduler scheduler = new Scheduler(ncNameToNcInfos);

+            ClusterTopology topology = ConfUtil.getClusterTopology();

+            Scheduler scheduler = new Scheduler(ncNameToNcInfos, topology);

             InputSplit[] splits = conf.getInputFormat().getSplits(conf, locConstraints.length);

             String[] schedule = scheduler.getLocationConstraints(splits);

             IOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(jobSpec, recDescriptor, conf, splits,

diff --git a/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh b/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
index 6e0f90e..23a4c36 100755
--- a/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
+++ b/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
@@ -46,4 +46,4 @@
 cd $NCTMP_DIR
 
 #Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh b/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
index 03ce4e7..35c4794 100755
--- a/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
+++ b/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
@@ -5,6 +5,10 @@
 PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 
 if [ "$PID" == "" ]; then
+  PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
   USERID=`id | sed 's/^uid=//;s/(.*$//'`
   PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 fi
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
index 5371c84..2b9e899 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
@@ -8,6 +8,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
+import java.util.logging.Logger;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.InputSplit;
@@ -19,6 +20,7 @@
 
 @SuppressWarnings("deprecation")
 public class RackAwareNcCollectionBuilder implements INcCollectionBuilder {
+    private static final Logger LOGGER = Logger.getLogger(RackAwareNcCollectionBuilder.class.getName());
     private ClusterTopology topology;
 
     public RackAwareNcCollectionBuilder(ClusterTopology topology) {
@@ -30,13 +32,18 @@
             final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
             final int[] workloads, final int slotLimit) {
         try {
-            final Map<List<Integer>, String> pathToNCs = new HashMap<List<Integer>, String>();
+            final Map<List<Integer>, List<String>> pathToNCs = new HashMap<List<Integer>, List<String>>();
             for (int i = 0; i < NCs.length; i++) {
                 List<Integer> path = new ArrayList<Integer>();
                 String ipAddress = InetAddress.getByAddress(
                         ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress()).getHostAddress();
                 topology.lookupNetworkTerminal(ipAddress, path);
-                pathToNCs.put(path, NCs[i]);
+                List<String> ncs = pathToNCs.get(path);
+                if (ncs == null) {
+                    ncs = new ArrayList<String>();
+                    pathToNCs.put(path, ncs);
+                }
+                ncs.add(NCs[i]);
             }
 
             final TreeMap<List<Integer>, IntWritable> availableIpsToSlots = new TreeMap<List<Integer>, IntWritable>(
@@ -86,12 +93,14 @@
                                  * get all the IP addresses from the name
                                  */
                                 InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
+                                boolean inTopology = false;
                                 for (InetAddress ip : allIps) {
                                     List<Integer> splitPath = new ArrayList<Integer>();
                                     boolean inCluster = topology.lookupNetworkTerminal(ip.getHostAddress(), splitPath);
                                     if (!inCluster) {
                                         continue;
                                     }
+                                    inTopology = true;
                                     /**
                                      * if the node controller exists
                                      */
@@ -110,6 +119,26 @@
 
                                     }
                                 }
+
+                                if (!inTopology) {
+                                    LOGGER.info(locs[j] + "'s IP address is not in the cluster toplogy file!");
+                                    /**
+                                     * if the machine is not in the toplogy file
+                                     */
+                                    List<Integer> candidatePath = null;
+                                    for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
+                                        if (entry.getValue().get() > 0) {
+                                            candidatePath = entry.getKey();
+                                            break;
+                                        }
+                                    }
+                                    /** the split path is empty */
+                                    if (candidatePath != null) {
+                                        if (availableIpsToSlots.get(candidatePath).get() > 0) {
+                                            currentCandidatePath = candidatePath;
+                                        }
+                                    }
+                                }
                             }
                         } else {
                             for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
@@ -120,7 +149,7 @@
                             }
                         }
 
-                        if (currentCandidatePath.size() > 0) {
+                        if (currentCandidatePath != null && currentCandidatePath.size() > 0) {
                             /**
                              * Update the entry of the selected IP
                              */
@@ -132,10 +161,12 @@
                             /**
                              * Update the entry of the selected NC
                              */
-                            String candidateNc = pathToNCs.get(currentCandidatePath);
-                            int ncIndex = ncNameToIndex.get(candidateNc);
-                            if (workloads[ncIndex] < slotLimit) {
-                                return candidateNc;
+                            List<String> candidateNcs = pathToNCs.get(currentCandidatePath);
+                            for (String candidate : candidateNcs) {
+                                int ncIndex = ncNameToIndex.get(candidate);
+                                if (workloads[ncIndex] < slotLimit) {
+                                    return candidate;
+                                }
                             }
                         }
                         /** not scheduled */
@@ -154,7 +185,11 @@
                     int commonLength = Math.min(splitPath.size(), candidatePath.size());
                     int distance = 0;
                     for (int i = 0; i < commonLength; i++) {
-                        distance = distance * 10 + Math.abs(splitPath.get(i) - candidatePath.get(i));
+                        distance = distance * 100 + Math.abs(splitPath.get(i) - candidatePath.get(i));
+                    }
+                    List<Integer> restElements = splitPath.size() > candidatePath.size() ? splitPath : candidatePath;
+                    for (int i = commonLength; i < restElements.size(); i++) {
+                        distance = distance * 100 + Math.abs(restElements.get(i));
                     }
                     return distance;
                 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
index 6d31855..3f7997b 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
@@ -151,6 +151,10 @@
      * @throws HyracksDataException
      */
     public String[] getLocationConstraints(InputSplit[] splits) throws HyracksException {
+        if (splits == null) {
+            /** deal the case when the splits array is null */
+            return new String[] {};
+        }
         int[] workloads = new int[NCs.length];
         Arrays.fill(workloads, 0);
         String[] locations = new String[splits.length];
@@ -189,7 +193,8 @@
                     dataLocalCount++;
                 }
             }
-            LOGGER.info("Data local rate: " + ((float) dataLocalCount / (float) (scheduled.length)));
+            LOGGER.info("Data local rate: "
+                    + (scheduled.length == 0 ? 0.0 : ((float) dataLocalCount / (float) (scheduled.length))));
             /**
              * push non-data-local lower-bounds slots to each machine
              */
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
index cb97ca1..cde187d 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
 
 /**
  * The scheduler conduct data-local scheduling for data reading on HDFS.
@@ -53,6 +54,17 @@
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
         scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos);
     }
+    
+    /**
+     * The constructor of the scheduler.
+     * 
+     * @param ncNameToNcInfos
+     *            the mapping from nc names to nc infos
+     * @throws HyracksException
+     */
+    public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder builder) throws HyracksException {
+        scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos, builder);
+    }
 
     /**
      * Set location constraints for a file scan operator with a list of file splits
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
index 90967a0..1f394a9 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -277,4 +277,65 @@
 
     }
 
+    /**
+     * Test boundary cases where splits array is empty or null
+     * 
+     * @throws Exception
+     */
+    public void testSchedulercBoundary() throws Exception {
+        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
+        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
+                .getAddress(), 5098)));
+        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
+                .getAddress(), 5098)));
+        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
+                .getAddress(), 5098)));
+        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
+                .getAddress(), 5098)));
+        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
+                .getAddress(), 5098)));
+        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
+                .getAddress(), 5098)));
+
+        /** test empty file splits */
+        InputSplit[] fileSplits = new InputSplit[0];
+        String[] expectedResults = new String[] {};
+
+        Scheduler scheduler = new Scheduler(ncNameToNcInfos);
+        String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
+
+        for (int i = 0; i < locationConstraints.length; i++) {
+            Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+        }
+
+        ClusterTopology topology = parseTopology();
+        scheduler = new Scheduler(ncNameToNcInfos, topology);
+        locationConstraints = scheduler.getLocationConstraints(fileSplits);
+        for (int i = 0; i < locationConstraints.length; i++) {
+            Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+        }
+
+        fileSplits = null;
+        expectedResults = new String[] {};
+
+        scheduler = new Scheduler(ncNameToNcInfos);
+        locationConstraints = scheduler.getLocationConstraints(fileSplits);
+        for (int i = 0; i < locationConstraints.length; i++) {
+            Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+        }
+
+        scheduler = new Scheduler(ncNameToNcInfos, topology);
+        locationConstraints = scheduler.getLocationConstraints(fileSplits);
+        for (int i = 0; i < locationConstraints.length; i++) {
+            Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+        }
+
+    }
+
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 3a98fd9..cd49184 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -49,510 +49,525 @@
  */
 @SuppressWarnings("rawtypes")
 public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
-        implements Writable {
-    private static long superstep = 0;
-    /** Class-wide number of vertices */
-    private static long numVertices = -1;
-    /** Class-wide number of edges */
-    private static long numEdges = -1;
-    /** Vertex id */
-    private I vertexId = null;
-    /** Vertex value */
-    private V vertexValue = null;
-    /** Map of destination vertices and their edge values */
-    private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
-    /** If true, do not do anymore computation on this vertex. */
-    boolean halt = false;
-    /** List of incoming messages from the previous superstep */
-    private final List<M> msgList = new ArrayList<M>();
-    /** map context */
-    private static TaskAttemptContext context = null;
-    /** a delegate for hyracks stuff */
-    private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
-    /** this vertex is updated or not */
-    private boolean updated = false;
-    /** has outgoing messages */
-    private boolean hasMessage = false;
-    /** created new vertex */
-    private boolean createdNewLiveVertex = false;
+		implements Writable {
+	private static long superstep = 0;
+	/** Class-wide number of vertices */
+	private static long numVertices = -1;
+	/** Class-wide number of edges */
+	private static long numEdges = -1;
+	/** Vertex id */
+	private I vertexId = null;
+	/** Vertex value */
+	private V vertexValue = null;
+	/** Map of destination vertices and their edge values */
+	private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
+	/** If true, do not do anymore computation on this vertex. */
+	boolean halt = false;
+	/** List of incoming messages from the previous superstep */
+	private final List<M> msgList = new ArrayList<M>();
+	/** map context */
+	private static TaskAttemptContext context = null;
+	/** a delegate for hyracks stuff */
+	private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(
+			this);
+	/** this vertex is updated or not */
+	private boolean updated = false;
+	/** has outgoing messages */
+	private boolean hasMessage = false;
+	/** created new vertex */
+	private boolean createdNewLiveVertex = false;
 
-    /**
-     * use object pool for re-using objects
-     */
-    private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
-    private List<M> msgPool = new ArrayList<M>();
-    private List<V> valuePool = new ArrayList<V>();
-    private int usedEdge = 0;
-    private int usedMessage = 0;
-    private int usedValue = 0;
+	/**
+	 * use object pool for re-using objects
+	 */
+	private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
+	private List<M> msgPool = new ArrayList<M>();
+	private List<V> valuePool = new ArrayList<V>();
+	private int usedEdge = 0;
+	private int usedMessage = 0;
+	private int usedValue = 0;
 
-    /**
-     * The key method that users need to implement
-     * 
-     * @param msgIterator
-     *            an iterator of incoming messages
-     */
-    public abstract void compute(Iterator<M> msgIterator);
+	/**
+	 * The key method that users need to implement
+	 * 
+	 * @param msgIterator
+	 *            an iterator of incoming messages
+	 */
+	public abstract void compute(Iterator<M> msgIterator);
 
-    /**
-     * Add an edge for the vertex.
-     * 
-     * @param targetVertexId
-     * @param edgeValue
-     * @return successful or not
-     */
-    public final boolean addEdge(I targetVertexId, E edgeValue) {
-        Edge<I, E> edge = this.allocateEdge();
-        edge.setDestVertexId(targetVertexId);
-        edge.setEdgeValue(edgeValue);
-        destEdgeList.add(edge);
-        return true;
-    }
+	/**
+	 * Add an edge for the vertex.
+	 * 
+	 * @param targetVertexId
+	 * @param edgeValue
+	 * @return successful or not
+	 */
+	public final boolean addEdge(I targetVertexId, E edgeValue) {
+		Edge<I, E> edge = this.allocateEdge();
+		edge.setDestVertexId(targetVertexId);
+		edge.setEdgeValue(edgeValue);
+		destEdgeList.add(edge);
+		updated = true;
+		return true;
+	}
 
-    /**
-     * Initialize a new vertex
-     * 
-     * @param vertexId
-     * @param vertexValue
-     * @param edges
-     * @param messages
-     */
-    public void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages) {
-        if (vertexId != null) {
-            setVertexId(vertexId);
-        }
-        if (vertexValue != null) {
-            setVertexValue(vertexValue);
-        }
-        destEdgeList.clear();
-        if (edges != null && !edges.isEmpty()) {
-            for (Map.Entry<I, E> entry : edges.entrySet()) {
-                destEdgeList.add(new Edge<I, E>(entry.getKey(), entry.getValue()));
-            }
-        }
-        if (messages != null && !messages.isEmpty()) {
-            msgList.addAll(messages);
-        }
-    }
+	/**
+	 * Initialize a new vertex
+	 * 
+	 * @param vertexId
+	 * @param vertexValue
+	 * @param edges
+	 * @param messages
+	 */
+	public void initialize(I vertexId, V vertexValue, Map<I, E> edges,
+			List<M> messages) {
+		if (vertexId != null) {
+			setVertexId(vertexId);
+		}
+		if (vertexValue != null) {
+			setVertexValue(vertexValue);
+		}
+		destEdgeList.clear();
+		if (edges != null && !edges.isEmpty()) {
+			for (Map.Entry<I, E> entry : edges.entrySet()) {
+				destEdgeList.add(new Edge<I, E>(entry.getKey(), entry
+						.getValue()));
+			}
+		}
+		if (messages != null && !messages.isEmpty()) {
+			msgList.addAll(messages);
+		}
+	}
 
-    /**
-     * reset a vertex object: clear its internal states
-     */
-    public void reset() {
-        usedEdge = 0;
-        usedMessage = 0;
-        usedValue = 0;
-    }
+	/**
+	 * reset a vertex object: clear its internal states
+	 */
+	public void reset() {
+		usedEdge = 0;
+		usedMessage = 0;
+		usedValue = 0;
+		updated = false;
+	}
 
-    /**
-     * Set the vertex id
-     * 
-     * @param vertexId
-     */
-    public final void setVertexId(I vertexId) {
-        this.vertexId = vertexId;
-        delegate.setVertexId(vertexId);
-    }
+	/**
+	 * Set the vertex id
+	 * 
+	 * @param vertexId
+	 */
+	public final void setVertexId(I vertexId) {
+		this.vertexId = vertexId;
+		delegate.setVertexId(vertexId);
+	}
 
-    /**
-     * Get the vertex id
-     * 
-     * @return vertex id
-     */
-    public final I getVertexId() {
-        return vertexId;
-    }
+	/**
+	 * Get the vertex id
+	 * 
+	 * @return vertex id
+	 */
+	public final I getVertexId() {
+		return vertexId;
+	}
 
-    /**
-     * Get the vertex value
-     * 
-     * @return the vertex value
-     */
-    public final V getVertexValue() {
-        return vertexValue;
-    }
+	/**
+	 * Get the vertex value
+	 * 
+	 * @return the vertex value
+	 */
+	public final V getVertexValue() {
+		return vertexValue;
+	}
 
-    /**
-     * Set the vertex value
-     * 
-     * @param vertexValue
-     */
-    public final void setVertexValue(V vertexValue) {
-        this.vertexValue = vertexValue;
-        this.updated = true;
-    }
+	/**
+	 * Set the vertex value
+	 * 
+	 * @param vertexValue
+	 */
+	public final void setVertexValue(V vertexValue) {
+		this.vertexValue = vertexValue;
+		this.updated = true;
+	}
 
-    /***
-     * Send a message to a specific vertex
-     * 
-     * @param id
-     *            the receiver vertex id
-     * @param msg
-     *            the message
-     */
-    public final void sendMsg(I id, M msg) {
-        if (msg == null) {
-            throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
-        }
-        delegate.sendMsg(id, msg);
-        this.hasMessage = true;
-    }
+	/***
+	 * Send a message to a specific vertex
+	 * 
+	 * @param id
+	 *            the receiver vertex id
+	 * @param msg
+	 *            the message
+	 */
+	public final void sendMsg(I id, M msg) {
+		if (msg == null) {
+			throw new IllegalArgumentException(
+					"sendMsg: Cannot send null message to " + id);
+		}
+		delegate.sendMsg(id, msg);
+		this.hasMessage = true;
+	}
 
-    /**
-     * Send a message to all direct outgoing neighbors
-     * 
-     * @param msg
-     *            the message
-     */
-    public final void sendMsgToAllEdges(M msg) {
-        if (msg == null) {
-            throw new IllegalArgumentException("sendMsgToAllEdges: Cannot send null message to all edges");
-        }
-        for (Edge<I, E> edge : destEdgeList) {
-            sendMsg(edge.getDestVertexId(), msg);
-        }
-    }
+	/**
+	 * Send a message to all direct outgoing neighbors
+	 * 
+	 * @param msg
+	 *            the message
+	 */
+	public final void sendMsgToAllEdges(M msg) {
+		if (msg == null) {
+			throw new IllegalArgumentException(
+					"sendMsgToAllEdges: Cannot send null message to all edges");
+		}
+		for (Edge<I, E> edge : destEdgeList) {
+			sendMsg(edge.getDestVertexId(), msg);
+		}
+	}
 
-    /**
-     * Vote to halt. Once all vertex vote to halt and no more messages, a
-     * Pregelix job will terminate.
-     */
-    public final void voteToHalt() {
-        halt = true;
-    }
+	/**
+	 * Vote to halt. Once all vertex vote to halt and no more messages, a
+	 * Pregelix job will terminate.
+	 */
+	public final void voteToHalt() {
+		halt = true;
+		updated = true;
+	}
 
-    /**
-     * @return the vertex is halted (true) or not (false)
-     */
-    public final boolean isHalted() {
-        return halt;
-    }
+	/**
+	 * @return the vertex is halted (true) or not (false)
+	 */
+	public final boolean isHalted() {
+		return halt;
+	}
 
-    @Override
-    final public void readFields(DataInput in) throws IOException {
-        reset();
-        if (vertexId == null)
-            vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
-        vertexId.readFields(in);
-        delegate.setVertexId(vertexId);
-        boolean hasVertexValue = in.readBoolean();
+	@Override
+	final public void readFields(DataInput in) throws IOException {
+		reset();
+		if (vertexId == null)
+			vertexId = BspUtils.<I> createVertexIndex(getContext()
+					.getConfiguration());
+		vertexId.readFields(in);
+		delegate.setVertexId(vertexId);
+		boolean hasVertexValue = in.readBoolean();
 
-        if (hasVertexValue) {
-            vertexValue = allocateValue();
-            vertexValue.readFields(in);
-            delegate.setVertex(this);
-        }
-        destEdgeList.clear();
-        long edgeMapSize = SerDeUtils.readVLong(in);
-        for (long i = 0; i < edgeMapSize; ++i) {
-            Edge<I, E> edge = allocateEdge();
-            edge.setConf(getContext().getConfiguration());
-            edge.readFields(in);
-            addEdge(edge);
-        }
-        msgList.clear();
-        long msgListSize = SerDeUtils.readVLong(in);
-        for (long i = 0; i < msgListSize; ++i) {
-            M msg = allocateMessage();
-            msg.readFields(in);
-            msgList.add(msg);
-        }
-        halt = in.readBoolean();
-        updated = false;
-        hasMessage = false;
-        createdNewLiveVertex = false;
-    }
+		if (hasVertexValue) {
+			vertexValue = allocateValue();
+			vertexValue.readFields(in);
+			delegate.setVertex(this);
+		}
+		destEdgeList.clear();
+		long edgeMapSize = SerDeUtils.readVLong(in);
+		for (long i = 0; i < edgeMapSize; ++i) {
+			Edge<I, E> edge = allocateEdge();
+			edge.setConf(getContext().getConfiguration());
+			edge.readFields(in);
+			addEdge(edge);
+		}
+		msgList.clear();
+		long msgListSize = SerDeUtils.readVLong(in);
+		for (long i = 0; i < msgListSize; ++i) {
+			M msg = allocateMessage();
+			msg.readFields(in);
+			msgList.add(msg);
+		}
+		halt = in.readBoolean();
+		updated = false;
+		hasMessage = false;
+		createdNewLiveVertex = false;
+	}
 
-    @Override
-    public void write(DataOutput out) throws IOException {
-        vertexId.write(out);
-        out.writeBoolean(vertexValue != null);
-        if (vertexValue != null) {
-            vertexValue.write(out);
-        }
-        SerDeUtils.writeVLong(out, destEdgeList.size());
-        for (Edge<I, E> edge : destEdgeList) {
-            edge.write(out);
-        }
-        SerDeUtils.writeVLong(out, msgList.size());
-        for (M msg : msgList) {
-            msg.write(out);
-        }
-        out.writeBoolean(halt);
-    }
+	@Override
+	public void write(DataOutput out) throws IOException {
+		vertexId.write(out);
+		out.writeBoolean(vertexValue != null);
+		if (vertexValue != null) {
+			vertexValue.write(out);
+		}
+		SerDeUtils.writeVLong(out, destEdgeList.size());
+		for (Edge<I, E> edge : destEdgeList) {
+			edge.write(out);
+		}
+		SerDeUtils.writeVLong(out, msgList.size());
+		for (M msg : msgList) {
+			msg.write(out);
+		}
+		out.writeBoolean(halt);
+	}
 
-    /**
-     * Get the list of incoming messages
-     * 
-     * @return the list of messages
-     */
-    public List<M> getMsgList() {
-        return msgList;
-    }
+	/**
+	 * Get the list of incoming messages
+	 * 
+	 * @return the list of messages
+	 */
+	public List<M> getMsgList() {
+		return msgList;
+	}
 
-    /**
-     * Get outgoing edge list
-     * 
-     * @return a list of outgoing edges
-     */
-    public List<Edge<I, E>> getEdges() {
-        return this.destEdgeList;
-    }
+	/**
+	 * Get outgoing edge list
+	 * 
+	 * @return a list of outgoing edges
+	 */
+	public List<Edge<I, E>> getEdges() {
+		return this.destEdgeList;
+	}
 
-    @Override
-    @SuppressWarnings("unchecked")
-    public String toString() {
-        Collections.sort(destEdgeList);
-        StringBuffer edgeBuffer = new StringBuffer();
-        edgeBuffer.append("(");
-        for (Edge<I, E> edge : destEdgeList) {
-            edgeBuffer.append(edge.getDestVertexId()).append(",");
-        }
-        edgeBuffer.append(")");
-        return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
-    }
+	@Override
+	@SuppressWarnings("unchecked")
+	public String toString() {
+		Collections.sort(destEdgeList);
+		StringBuffer edgeBuffer = new StringBuffer();
+		edgeBuffer.append("(");
+		for (Edge<I, E> edge : destEdgeList) {
+			edgeBuffer.append(edge.getDestVertexId()).append(",");
+		}
+		edgeBuffer.append(")");
+		return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue()
+				+ ", edges=" + edgeBuffer + ")";
+	}
 
-    /**
-     * Get the number of outgoing edges
-     * 
-     * @return the number of outging edges
-     */
-    public int getNumOutEdges() {
-        return destEdgeList.size();
-    }
+	/**
+	 * Get the number of outgoing edges
+	 * 
+	 * @return the number of outging edges
+	 */
+	public int getNumOutEdges() {
+		return destEdgeList.size();
+	}
 
-    /**
-     * Pregelix internal use only
-     * 
-     * @param writers
-     */
-    public void setOutputWriters(List<IFrameWriter> writers) {
-        delegate.setOutputWriters(writers);
-    }
+	/**
+	 * Pregelix internal use only
+	 * 
+	 * @param writers
+	 */
+	public void setOutputWriters(List<IFrameWriter> writers) {
+		delegate.setOutputWriters(writers);
+	}
 
-    /**
-     * Pregelix internal use only
-     * 
-     * @param writers
-     */
-    public void setOutputAppenders(List<FrameTupleAppender> appenders) {
-        delegate.setOutputAppenders(appenders);
-    }
+	/**
+	 * Pregelix internal use only
+	 * 
+	 * @param writers
+	 */
+	public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+		delegate.setOutputAppenders(appenders);
+	}
 
-    /**
-     * Pregelix internal use only
-     * 
-     * @param writers
-     */
-    public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
-        delegate.setOutputTupleBuilders(tbs);
-    }
+	/**
+	 * Pregelix internal use only
+	 * 
+	 * @param writers
+	 */
+	public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+		delegate.setOutputTupleBuilders(tbs);
+	}
 
-    /**
-     * Pregelix internal use only
-     * 
-     * @param writers
-     */
-    public void finishCompute() throws IOException {
-        delegate.finishCompute();
-    }
+	/**
+	 * Pregelix internal use only
+	 * 
+	 * @param writers
+	 */
+	public void finishCompute() throws IOException {
+		delegate.finishCompute();
+	}
 
-    /**
-     * Pregelix internal use only
-     */
-    public boolean hasUpdate() {
-        return this.updated;
-    }
+	/**
+	 * Pregelix internal use only
+	 */
+	public boolean hasUpdate() {
+		return this.updated;
+	}
 
-    /**
-     * Pregelix internal use only
-     */
-    public boolean hasMessage() {
-        return this.hasMessage;
-    }
+	/**
+	 * Pregelix internal use only
+	 */
+	public boolean hasMessage() {
+		return this.hasMessage;
+	}
 
-    /**
-     * Pregelix internal use only
-     */
-    public boolean createdNewLiveVertex() {
-        return this.createdNewLiveVertex;
-    }
+	/**
+	 * Pregelix internal use only
+	 */
+	public boolean createdNewLiveVertex() {
+		return this.createdNewLiveVertex;
+	}
 
-    /**
-     * sort the edges
-     */
-    @SuppressWarnings("unchecked")
-    public void sortEdges() {
-        Collections.sort(destEdgeList);
-    }
+	/**
+	 * sort the edges
+	 */
+	@SuppressWarnings("unchecked")
+	public void sortEdges() {
+		updated = true;
+		Collections.sort(destEdgeList);
+	}
 
-    /**
-     * Allocate a new edge from the edge pool
-     */
-    private Edge<I, E> allocateEdge() {
-        Edge<I, E> edge;
-        if (usedEdge < edgePool.size()) {
-            edge = edgePool.get(usedEdge);
-            usedEdge++;
-        } else {
-            edge = new Edge<I, E>();
-            edgePool.add(edge);
-            usedEdge++;
-        }
-        return edge;
-    }
+	/**
+	 * Allocate a new edge from the edge pool
+	 */
+	private Edge<I, E> allocateEdge() {
+		Edge<I, E> edge;
+		if (usedEdge < edgePool.size()) {
+			edge = edgePool.get(usedEdge);
+			usedEdge++;
+		} else {
+			edge = new Edge<I, E>();
+			edgePool.add(edge);
+			usedEdge++;
+		}
+		return edge;
+	}
 
-    /**
-     * Allocate a new message from the message pool
-     */
-    private M allocateMessage() {
-        M message;
-        if (usedMessage < msgPool.size()) {
-            message = msgPool.get(usedEdge);
-            usedMessage++;
-        } else {
-            message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
-            msgPool.add(message);
-            usedMessage++;
-        }
-        return message;
-    }
+	/**
+	 * Allocate a new message from the message pool
+	 */
+	private M allocateMessage() {
+		M message;
+		if (usedMessage < msgPool.size()) {
+			message = msgPool.get(usedEdge);
+			usedMessage++;
+		} else {
+			message = BspUtils.<M> createMessageValue(getContext()
+					.getConfiguration());
+			msgPool.add(message);
+			usedMessage++;
+		}
+		return message;
+	}
 
-    /**
-     * Set the global superstep for all the vertices (internal use)
-     * 
-     * @param superstep
-     *            New superstep
-     */
-    public static final void setSuperstep(long superstep) {
-        Vertex.superstep = superstep;
-    }
+	/**
+	 * Set the global superstep for all the vertices (internal use)
+	 * 
+	 * @param superstep
+	 *            New superstep
+	 */
+	public static final void setSuperstep(long superstep) {
+		Vertex.superstep = superstep;
+	}
 
-    /**
-     * Add an outgoing edge into the vertex
-     * 
-     * @param edge
-     *            the edge to be added
-     * @return true if the edge list changed as a result of this call
-     */
-    public boolean addEdge(Edge<I, E> edge) {
-        edge.setConf(getContext().getConfiguration());
-        return destEdgeList.add(edge);
-    }
+	/**
+	 * Add an outgoing edge into the vertex
+	 * 
+	 * @param edge
+	 *            the edge to be added
+	 * @return true if the edge list changed as a result of this call
+	 */
+	public boolean addEdge(Edge<I, E> edge) {
+		edge.setConf(getContext().getConfiguration());
+		updated = true;
+		return destEdgeList.add(edge);
+	}
 
-    /**
-     * remove an outgoing edge in the graph
-     * 
-     * @param edge
-     *            the edge to be removed
-     * @return true if the edge is in the edge list of the vertex
-     */
-    public boolean removeEdge(Edge<I, E> edge) {
-        return destEdgeList.remove(edge);
-    }
+	/**
+	 * remove an outgoing edge in the graph
+	 * 
+	 * @param edge
+	 *            the edge to be removed
+	 * @return true if the edge is in the edge list of the vertex
+	 */
+	public boolean removeEdge(Edge<I, E> edge) {
+		updated = true;
+		return destEdgeList.remove(edge);
+	}
 
-    /**
-     * Add a new vertex into the graph
-     * 
-     * @param vertexId
-     *            the vertex id
-     * @param vertex
-     *            the vertex
-     */
-    public final void addVertex(I vertexId, Vertex vertex) {
-        createdNewLiveVertex |= !vertex.isHalted();
-        delegate.addVertex(vertexId, vertex);
-    }
+	/**
+	 * Add a new vertex into the graph
+	 * 
+	 * @param vertexId
+	 *            the vertex id
+	 * @param vertex
+	 *            the vertex
+	 */
+	public final void addVertex(I vertexId, Vertex vertex) {
+		createdNewLiveVertex |= !vertex.isHalted();
+		delegate.addVertex(vertexId, vertex);
+	}
 
-    /**
-     * Delete a vertex from id
-     * 
-     * @param vertexId
-     *            the vertex id
-     */
-    public final void deleteVertex(I vertexId) {
-        delegate.deleteVertex(vertexId);
-    }
+	/**
+	 * Delete a vertex from id
+	 * 
+	 * @param vertexId
+	 *            the vertex id
+	 */
+	public final void deleteVertex(I vertexId) {
+		delegate.deleteVertex(vertexId);
+	}
 
-    /**
-     * Allocate a vertex value from the object pool
-     * 
-     * @return a vertex value instance
-     */
-    private V allocateValue() {
-        V value;
-        if (usedValue < valuePool.size()) {
-            value = valuePool.get(usedValue);
-            usedValue++;
-        } else {
-            value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
-            valuePool.add(value);
-            usedValue++;
-        }
-        return value;
-    }
+	/**
+	 * Allocate a vertex value from the object pool
+	 * 
+	 * @return a vertex value instance
+	 */
+	private V allocateValue() {
+		V value;
+		if (usedValue < valuePool.size()) {
+			value = valuePool.get(usedValue);
+			usedValue++;
+		} else {
+			value = BspUtils.<V> createVertexValue(getContext()
+					.getConfiguration());
+			valuePool.add(value);
+			usedValue++;
+		}
+		return value;
+	}
 
-    /**
-     * Get the current global superstep number
-     * 
-     * @return the current superstep number
-     */
-    public static final long getSuperstep() {
-        return superstep;
-    }
+	/**
+	 * Get the current global superstep number
+	 * 
+	 * @return the current superstep number
+	 */
+	public static final long getSuperstep() {
+		return superstep;
+	}
 
-    /**
-     * Set the total number of vertices from the last superstep.
-     * 
-     * @param numVertices
-     *            Aggregate vertices in the last superstep
-     */
-    public static final void setNumVertices(long numVertices) {
-        Vertex.numVertices = numVertices;
-    }
+	/**
+	 * Set the total number of vertices from the last superstep.
+	 * 
+	 * @param numVertices
+	 *            Aggregate vertices in the last superstep
+	 */
+	public static final void setNumVertices(long numVertices) {
+		Vertex.numVertices = numVertices;
+	}
 
-    /**
-     * Get the number of vertexes in the graph
-     * 
-     * @return the number of vertexes in the graph
-     */
-    public static final long getNumVertices() {
-        return numVertices;
-    }
+	/**
+	 * Get the number of vertexes in the graph
+	 * 
+	 * @return the number of vertexes in the graph
+	 */
+	public static final long getNumVertices() {
+		return numVertices;
+	}
 
-    /**
-     * Set the total number of edges from the last superstep.
-     * 
-     * @param numEdges
-     *            Aggregate edges in the last superstep
-     */
-    public static void setNumEdges(long numEdges) {
-        Vertex.numEdges = numEdges;
-    }
+	/**
+	 * Set the total number of edges from the last superstep.
+	 * 
+	 * @param numEdges
+	 *            Aggregate edges in the last superstep
+	 */
+	public static void setNumEdges(long numEdges) {
+		Vertex.numEdges = numEdges;
+	}
 
-    /**
-     * Get the number of edges from this graph
-     * 
-     * @return the number of edges in the graph
-     */
-    public static final long getNumEdges() {
-        return numEdges;
-    }
+	/**
+	 * Get the number of edges from this graph
+	 * 
+	 * @return the number of edges in the graph
+	 */
+	public static final long getNumEdges() {
+		return numEdges;
+	}
 
-    /**
-     * Pregelix internal use only
-     */
-    public static final TaskAttemptContext getContext() {
-        return context;
-    }
+	/**
+	 * Pregelix internal use only
+	 */
+	public static final TaskAttemptContext getContext() {
+		return context;
+	}
 
-    /**
-     * Pregelix internal use only
-     * 
-     * @param context
-     */
-    public static final void setContext(TaskAttemptContext context) {
-        Vertex.context = context;
-    }
+	/**
+	 * Pregelix internal use only
+	 * 
+	 * @param context
+	 */
+	public static final void setContext(TaskAttemptContext context) {
+		Vertex.context = context;
+	}
 
 }
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
index 6e0f90e..b059aad 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
@@ -46,4 +46,4 @@
 cd $NCTMP_DIR
 
 #Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR  -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh b/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
index 03ce4e7..35c4794 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
@@ -5,6 +5,10 @@
 PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 
 if [ "$PID" == "" ]; then
+  PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
   USERID=`id | sed 's/^uid=//;s/(.*$//'`
   PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 fi