1. update script to add result-distribution paramteres; 2. fix rack-aware scheduler for boundary cases

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3163 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh b/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
index 6e0f90e..23a4c36 100755
--- a/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
+++ b/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
@@ -46,4 +46,4 @@
 cd $NCTMP_DIR
 
 #Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh b/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
index 03ce4e7..35c4794 100755
--- a/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
+++ b/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
@@ -5,6 +5,10 @@
 PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 
 if [ "$PID" == "" ]; then
+  PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
   USERID=`id | sed 's/^uid=//;s/(.*$//'`
   PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 fi
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
index 5371c84..2b9e899 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
@@ -8,6 +8,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
+import java.util.logging.Logger;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.InputSplit;
@@ -19,6 +20,7 @@
 
 @SuppressWarnings("deprecation")
 public class RackAwareNcCollectionBuilder implements INcCollectionBuilder {
+    private static final Logger LOGGER = Logger.getLogger(RackAwareNcCollectionBuilder.class.getName());
     private ClusterTopology topology;
 
     public RackAwareNcCollectionBuilder(ClusterTopology topology) {
@@ -30,13 +32,18 @@
             final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
             final int[] workloads, final int slotLimit) {
         try {
-            final Map<List<Integer>, String> pathToNCs = new HashMap<List<Integer>, String>();
+            final Map<List<Integer>, List<String>> pathToNCs = new HashMap<List<Integer>, List<String>>();
             for (int i = 0; i < NCs.length; i++) {
                 List<Integer> path = new ArrayList<Integer>();
                 String ipAddress = InetAddress.getByAddress(
                         ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress()).getHostAddress();
                 topology.lookupNetworkTerminal(ipAddress, path);
-                pathToNCs.put(path, NCs[i]);
+                List<String> ncs = pathToNCs.get(path);
+                if (ncs == null) {
+                    ncs = new ArrayList<String>();
+                    pathToNCs.put(path, ncs);
+                }
+                ncs.add(NCs[i]);
             }
 
             final TreeMap<List<Integer>, IntWritable> availableIpsToSlots = new TreeMap<List<Integer>, IntWritable>(
@@ -86,12 +93,14 @@
                                  * get all the IP addresses from the name
                                  */
                                 InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
+                                boolean inTopology = false;
                                 for (InetAddress ip : allIps) {
                                     List<Integer> splitPath = new ArrayList<Integer>();
                                     boolean inCluster = topology.lookupNetworkTerminal(ip.getHostAddress(), splitPath);
                                     if (!inCluster) {
                                         continue;
                                     }
+                                    inTopology = true;
                                     /**
                                      * if the node controller exists
                                      */
@@ -110,6 +119,26 @@
 
                                     }
                                 }
+
+                                if (!inTopology) {
+                                    LOGGER.info(locs[j] + "'s IP address is not in the cluster toplogy file!");
+                                    /**
+                                     * if the machine is not in the toplogy file
+                                     */
+                                    List<Integer> candidatePath = null;
+                                    for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
+                                        if (entry.getValue().get() > 0) {
+                                            candidatePath = entry.getKey();
+                                            break;
+                                        }
+                                    }
+                                    /** the split path is empty */
+                                    if (candidatePath != null) {
+                                        if (availableIpsToSlots.get(candidatePath).get() > 0) {
+                                            currentCandidatePath = candidatePath;
+                                        }
+                                    }
+                                }
                             }
                         } else {
                             for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
@@ -120,7 +149,7 @@
                             }
                         }
 
-                        if (currentCandidatePath.size() > 0) {
+                        if (currentCandidatePath != null && currentCandidatePath.size() > 0) {
                             /**
                              * Update the entry of the selected IP
                              */
@@ -132,10 +161,12 @@
                             /**
                              * Update the entry of the selected NC
                              */
-                            String candidateNc = pathToNCs.get(currentCandidatePath);
-                            int ncIndex = ncNameToIndex.get(candidateNc);
-                            if (workloads[ncIndex] < slotLimit) {
-                                return candidateNc;
+                            List<String> candidateNcs = pathToNCs.get(currentCandidatePath);
+                            for (String candidate : candidateNcs) {
+                                int ncIndex = ncNameToIndex.get(candidate);
+                                if (workloads[ncIndex] < slotLimit) {
+                                    return candidate;
+                                }
                             }
                         }
                         /** not scheduled */
@@ -154,7 +185,11 @@
                     int commonLength = Math.min(splitPath.size(), candidatePath.size());
                     int distance = 0;
                     for (int i = 0; i < commonLength; i++) {
-                        distance = distance * 10 + Math.abs(splitPath.get(i) - candidatePath.get(i));
+                        distance = distance * 100 + Math.abs(splitPath.get(i) - candidatePath.get(i));
+                    }
+                    List<Integer> restElements = splitPath.size() > candidatePath.size() ? splitPath : candidatePath;
+                    for (int i = commonLength; i < restElements.size(); i++) {
+                        distance = distance * 100 + Math.abs(restElements.get(i));
                     }
                     return distance;
                 }