1. update script to add result-distribution paramteres; 2. fix rack-aware scheduler for boundary cases
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3163 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh b/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
index 6e0f90e..23a4c36 100755
--- a/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
+++ b/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
@@ -46,4 +46,4 @@
cd $NCTMP_DIR
#Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh b/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
index 03ce4e7..35c4794 100755
--- a/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
+++ b/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
@@ -5,6 +5,10 @@
PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
if [ "$PID" == "" ]; then
+ PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
USERID=`id | sed 's/^uid=//;s/(.*$//'`
PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
fi
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
index 5371c84..2b9e899 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
@@ -8,6 +8,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
+import java.util.logging.Logger;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.InputSplit;
@@ -19,6 +20,7 @@
@SuppressWarnings("deprecation")
public class RackAwareNcCollectionBuilder implements INcCollectionBuilder {
+ private static final Logger LOGGER = Logger.getLogger(RackAwareNcCollectionBuilder.class.getName());
private ClusterTopology topology;
public RackAwareNcCollectionBuilder(ClusterTopology topology) {
@@ -30,13 +32,18 @@
final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
final int[] workloads, final int slotLimit) {
try {
- final Map<List<Integer>, String> pathToNCs = new HashMap<List<Integer>, String>();
+ final Map<List<Integer>, List<String>> pathToNCs = new HashMap<List<Integer>, List<String>>();
for (int i = 0; i < NCs.length; i++) {
List<Integer> path = new ArrayList<Integer>();
String ipAddress = InetAddress.getByAddress(
ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress()).getHostAddress();
topology.lookupNetworkTerminal(ipAddress, path);
- pathToNCs.put(path, NCs[i]);
+ List<String> ncs = pathToNCs.get(path);
+ if (ncs == null) {
+ ncs = new ArrayList<String>();
+ pathToNCs.put(path, ncs);
+ }
+ ncs.add(NCs[i]);
}
final TreeMap<List<Integer>, IntWritable> availableIpsToSlots = new TreeMap<List<Integer>, IntWritable>(
@@ -86,12 +93,14 @@
* get all the IP addresses from the name
*/
InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
+ boolean inTopology = false;
for (InetAddress ip : allIps) {
List<Integer> splitPath = new ArrayList<Integer>();
boolean inCluster = topology.lookupNetworkTerminal(ip.getHostAddress(), splitPath);
if (!inCluster) {
continue;
}
+ inTopology = true;
/**
* if the node controller exists
*/
@@ -110,6 +119,26 @@
}
}
+
+ if (!inTopology) {
+ LOGGER.info(locs[j] + "'s IP address is not in the cluster toplogy file!");
+ /**
+ * if the machine is not in the toplogy file
+ */
+ List<Integer> candidatePath = null;
+ for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
+ if (entry.getValue().get() > 0) {
+ candidatePath = entry.getKey();
+ break;
+ }
+ }
+ /** the split path is empty */
+ if (candidatePath != null) {
+ if (availableIpsToSlots.get(candidatePath).get() > 0) {
+ currentCandidatePath = candidatePath;
+ }
+ }
+ }
}
} else {
for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
@@ -120,7 +149,7 @@
}
}
- if (currentCandidatePath.size() > 0) {
+ if (currentCandidatePath != null && currentCandidatePath.size() > 0) {
/**
* Update the entry of the selected IP
*/
@@ -132,10 +161,12 @@
/**
* Update the entry of the selected NC
*/
- String candidateNc = pathToNCs.get(currentCandidatePath);
- int ncIndex = ncNameToIndex.get(candidateNc);
- if (workloads[ncIndex] < slotLimit) {
- return candidateNc;
+ List<String> candidateNcs = pathToNCs.get(currentCandidatePath);
+ for (String candidate : candidateNcs) {
+ int ncIndex = ncNameToIndex.get(candidate);
+ if (workloads[ncIndex] < slotLimit) {
+ return candidate;
+ }
}
}
/** not scheduled */
@@ -154,7 +185,11 @@
int commonLength = Math.min(splitPath.size(), candidatePath.size());
int distance = 0;
for (int i = 0; i < commonLength; i++) {
- distance = distance * 10 + Math.abs(splitPath.get(i) - candidatePath.get(i));
+ distance = distance * 100 + Math.abs(splitPath.get(i) - candidatePath.get(i));
+ }
+ List<Integer> restElements = splitPath.size() > candidatePath.size() ? splitPath : candidatePath;
+ for (int i = commonLength; i < restElements.size(); i++) {
+ distance = distance * 100 + Math.abs(restElements.get(i));
}
return distance;
}