1. update script to add result-distribution paramteres; 2. fix rack-aware scheduler for boundary cases

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3163 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix/hivesterix-dist/src/main/resources/scripts/startnc.sh b/hivesterix/hivesterix-dist/src/main/resources/scripts/startnc.sh
index 6e0f90e..23a4c36 100644
--- a/hivesterix/hivesterix-dist/src/main/resources/scripts/startnc.sh
+++ b/hivesterix/hivesterix-dist/src/main/resources/scripts/startnc.sh
@@ -46,4 +46,4 @@
 cd $NCTMP_DIR
 
 #Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/hivesterix/hivesterix-dist/src/main/resources/scripts/stopnc.sh b/hivesterix/hivesterix-dist/src/main/resources/scripts/stopnc.sh
index 03ce4e7..35c4794 100644
--- a/hivesterix/hivesterix-dist/src/main/resources/scripts/stopnc.sh
+++ b/hivesterix/hivesterix-dist/src/main/resources/scripts/stopnc.sh
@@ -5,6 +5,10 @@
 PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 
 if [ "$PID" == "" ]; then
+  PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
   USERID=`id | sed 's/^uid=//;s/(.*$//'`
   PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 fi
diff --git a/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml b/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml
index 4aac091..4710706 100644
--- a/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml
+++ b/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml
@@ -1,12 +1,7 @@
 <cluster-topology>
-    <network-switch name="switch1">
-        <network-switch name="switch2">
-            <terminal name="nc0"/>
-            <terminal name="nc3"/>
-        </network-switch>
-        <network-switch name="switch3">
-            <terminal name="nc1"/>
-            <terminal name="nc4"/>
+    <network-switch name="Global">
+        <network-switch name="local">
+            <terminal name="127.0.0.1"/>
         </network-switch>
     </network-switch>
 </cluster-topology>
\ No newline at end of file
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
index 0d2c78a..5e4e21e 100644
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

 import edu.uci.ics.hyracks.api.job.JobSpecification;

+import edu.uci.ics.hyracks.api.topology.ClusterTopology;

 import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;

 import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;

 

@@ -86,7 +87,8 @@
             JobConf conf = ConfUtil.getJobConf(fileDesc.getInputFileFormatClass(), filePath);

             String[] locConstraints = ConfUtil.getNCs();

             Map<String, NodeControllerInfo> ncNameToNcInfos = ConfUtil.getNodeControllerInfo();

-            Scheduler scheduler = new Scheduler(ncNameToNcInfos);

+            ClusterTopology topology = ConfUtil.getClusterTopology();

+            Scheduler scheduler = new Scheduler(ncNameToNcInfos, topology);

             InputSplit[] splits = conf.getInputFormat().getSplits(conf, locConstraints.length);

             String[] schedule = scheduler.getLocationConstraints(splits);

             IOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(jobSpec, recDescriptor, conf, splits,

diff --git a/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh b/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
index 6e0f90e..23a4c36 100755
--- a/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
+++ b/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
@@ -46,4 +46,4 @@
 cd $NCTMP_DIR
 
 #Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh b/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
index 03ce4e7..35c4794 100755
--- a/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
+++ b/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
@@ -5,6 +5,10 @@
 PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 
 if [ "$PID" == "" ]; then
+  PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
   USERID=`id | sed 's/^uid=//;s/(.*$//'`
   PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 fi
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
index 5371c84..2b9e899 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
@@ -8,6 +8,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
+import java.util.logging.Logger;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.InputSplit;
@@ -19,6 +20,7 @@
 
 @SuppressWarnings("deprecation")
 public class RackAwareNcCollectionBuilder implements INcCollectionBuilder {
+    private static final Logger LOGGER = Logger.getLogger(RackAwareNcCollectionBuilder.class.getName());
     private ClusterTopology topology;
 
     public RackAwareNcCollectionBuilder(ClusterTopology topology) {
@@ -30,13 +32,18 @@
             final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
             final int[] workloads, final int slotLimit) {
         try {
-            final Map<List<Integer>, String> pathToNCs = new HashMap<List<Integer>, String>();
+            final Map<List<Integer>, List<String>> pathToNCs = new HashMap<List<Integer>, List<String>>();
             for (int i = 0; i < NCs.length; i++) {
                 List<Integer> path = new ArrayList<Integer>();
                 String ipAddress = InetAddress.getByAddress(
                         ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress()).getHostAddress();
                 topology.lookupNetworkTerminal(ipAddress, path);
-                pathToNCs.put(path, NCs[i]);
+                List<String> ncs = pathToNCs.get(path);
+                if (ncs == null) {
+                    ncs = new ArrayList<String>();
+                    pathToNCs.put(path, ncs);
+                }
+                ncs.add(NCs[i]);
             }
 
             final TreeMap<List<Integer>, IntWritable> availableIpsToSlots = new TreeMap<List<Integer>, IntWritable>(
@@ -86,12 +93,14 @@
                                  * get all the IP addresses from the name
                                  */
                                 InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
+                                boolean inTopology = false;
                                 for (InetAddress ip : allIps) {
                                     List<Integer> splitPath = new ArrayList<Integer>();
                                     boolean inCluster = topology.lookupNetworkTerminal(ip.getHostAddress(), splitPath);
                                     if (!inCluster) {
                                         continue;
                                     }
+                                    inTopology = true;
                                     /**
                                      * if the node controller exists
                                      */
@@ -110,6 +119,26 @@
 
                                     }
                                 }
+
+                                if (!inTopology) {
+                                    LOGGER.info(locs[j] + "'s IP address is not in the cluster toplogy file!");
+                                    /**
+                                     * if the machine is not in the toplogy file
+                                     */
+                                    List<Integer> candidatePath = null;
+                                    for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
+                                        if (entry.getValue().get() > 0) {
+                                            candidatePath = entry.getKey();
+                                            break;
+                                        }
+                                    }
+                                    /** the split path is empty */
+                                    if (candidatePath != null) {
+                                        if (availableIpsToSlots.get(candidatePath).get() > 0) {
+                                            currentCandidatePath = candidatePath;
+                                        }
+                                    }
+                                }
                             }
                         } else {
                             for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
@@ -120,7 +149,7 @@
                             }
                         }
 
-                        if (currentCandidatePath.size() > 0) {
+                        if (currentCandidatePath != null && currentCandidatePath.size() > 0) {
                             /**
                              * Update the entry of the selected IP
                              */
@@ -132,10 +161,12 @@
                             /**
                              * Update the entry of the selected NC
                              */
-                            String candidateNc = pathToNCs.get(currentCandidatePath);
-                            int ncIndex = ncNameToIndex.get(candidateNc);
-                            if (workloads[ncIndex] < slotLimit) {
-                                return candidateNc;
+                            List<String> candidateNcs = pathToNCs.get(currentCandidatePath);
+                            for (String candidate : candidateNcs) {
+                                int ncIndex = ncNameToIndex.get(candidate);
+                                if (workloads[ncIndex] < slotLimit) {
+                                    return candidate;
+                                }
                             }
                         }
                         /** not scheduled */
@@ -154,7 +185,11 @@
                     int commonLength = Math.min(splitPath.size(), candidatePath.size());
                     int distance = 0;
                     for (int i = 0; i < commonLength; i++) {
-                        distance = distance * 10 + Math.abs(splitPath.get(i) - candidatePath.get(i));
+                        distance = distance * 100 + Math.abs(splitPath.get(i) - candidatePath.get(i));
+                    }
+                    List<Integer> restElements = splitPath.size() > candidatePath.size() ? splitPath : candidatePath;
+                    for (int i = commonLength; i < restElements.size(); i++) {
+                        distance = distance * 100 + Math.abs(restElements.get(i));
                     }
                     return distance;
                 }
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
index 6e0f90e..b059aad 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
@@ -46,4 +46,4 @@
 cd $NCTMP_DIR
 
 #Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR  -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh b/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
index 03ce4e7..35c4794 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
@@ -5,6 +5,10 @@
 PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 
 if [ "$PID" == "" ]; then
+  PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
   USERID=`id | sed 's/^uid=//;s/(.*$//'`
   PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 fi