1. update script to add result-distribution paramteres; 2. fix rack-aware scheduler for boundary cases
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3163 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix/hivesterix-dist/src/main/resources/scripts/startnc.sh b/hivesterix/hivesterix-dist/src/main/resources/scripts/startnc.sh
index 6e0f90e..23a4c36 100644
--- a/hivesterix/hivesterix-dist/src/main/resources/scripts/startnc.sh
+++ b/hivesterix/hivesterix-dist/src/main/resources/scripts/startnc.sh
@@ -46,4 +46,4 @@
cd $NCTMP_DIR
#Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/hivesterix/hivesterix-dist/src/main/resources/scripts/stopnc.sh b/hivesterix/hivesterix-dist/src/main/resources/scripts/stopnc.sh
index 03ce4e7..35c4794 100644
--- a/hivesterix/hivesterix-dist/src/main/resources/scripts/stopnc.sh
+++ b/hivesterix/hivesterix-dist/src/main/resources/scripts/stopnc.sh
@@ -5,6 +5,10 @@
PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
if [ "$PID" == "" ]; then
+ PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
USERID=`id | sed 's/^uid=//;s/(.*$//'`
PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
fi
diff --git a/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml b/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml
index 4aac091..4710706 100644
--- a/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml
+++ b/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml
@@ -1,12 +1,7 @@
<cluster-topology>
- <network-switch name="switch1">
- <network-switch name="switch2">
- <terminal name="nc0"/>
- <terminal name="nc3"/>
- </network-switch>
- <network-switch name="switch3">
- <terminal name="nc1"/>
- <terminal name="nc4"/>
+ <network-switch name="Global">
+ <network-switch name="local">
+ <terminal name="127.0.0.1"/>
</network-switch>
</network-switch>
</cluster-topology>
\ No newline at end of file
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
index 0d2c78a..5e4e21e 100644
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
@@ -25,6 +25,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
@@ -86,7 +87,8 @@
JobConf conf = ConfUtil.getJobConf(fileDesc.getInputFileFormatClass(), filePath);
String[] locConstraints = ConfUtil.getNCs();
Map<String, NodeControllerInfo> ncNameToNcInfos = ConfUtil.getNodeControllerInfo();
- Scheduler scheduler = new Scheduler(ncNameToNcInfos);
+ ClusterTopology topology = ConfUtil.getClusterTopology();
+ Scheduler scheduler = new Scheduler(ncNameToNcInfos, topology);
InputSplit[] splits = conf.getInputFormat().getSplits(conf, locConstraints.length);
String[] schedule = scheduler.getLocationConstraints(splits);
IOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(jobSpec, recDescriptor, conf, splits,
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh b/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
index 6e0f90e..23a4c36 100755
--- a/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
+++ b/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
@@ -46,4 +46,4 @@
cd $NCTMP_DIR
#Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh b/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
index 03ce4e7..35c4794 100755
--- a/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
+++ b/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
@@ -5,6 +5,10 @@
PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
if [ "$PID" == "" ]; then
+ PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
USERID=`id | sed 's/^uid=//;s/(.*$//'`
PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
fi
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
index 5371c84..2b9e899 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
@@ -8,6 +8,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
+import java.util.logging.Logger;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.InputSplit;
@@ -19,6 +20,7 @@
@SuppressWarnings("deprecation")
public class RackAwareNcCollectionBuilder implements INcCollectionBuilder {
+ private static final Logger LOGGER = Logger.getLogger(RackAwareNcCollectionBuilder.class.getName());
private ClusterTopology topology;
public RackAwareNcCollectionBuilder(ClusterTopology topology) {
@@ -30,13 +32,18 @@
final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
final int[] workloads, final int slotLimit) {
try {
- final Map<List<Integer>, String> pathToNCs = new HashMap<List<Integer>, String>();
+ final Map<List<Integer>, List<String>> pathToNCs = new HashMap<List<Integer>, List<String>>();
for (int i = 0; i < NCs.length; i++) {
List<Integer> path = new ArrayList<Integer>();
String ipAddress = InetAddress.getByAddress(
ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress()).getHostAddress();
topology.lookupNetworkTerminal(ipAddress, path);
- pathToNCs.put(path, NCs[i]);
+ List<String> ncs = pathToNCs.get(path);
+ if (ncs == null) {
+ ncs = new ArrayList<String>();
+ pathToNCs.put(path, ncs);
+ }
+ ncs.add(NCs[i]);
}
final TreeMap<List<Integer>, IntWritable> availableIpsToSlots = new TreeMap<List<Integer>, IntWritable>(
@@ -86,12 +93,14 @@
* get all the IP addresses from the name
*/
InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
+ boolean inTopology = false;
for (InetAddress ip : allIps) {
List<Integer> splitPath = new ArrayList<Integer>();
boolean inCluster = topology.lookupNetworkTerminal(ip.getHostAddress(), splitPath);
if (!inCluster) {
continue;
}
+ inTopology = true;
/**
* if the node controller exists
*/
@@ -110,6 +119,26 @@
}
}
+
+ if (!inTopology) {
+ LOGGER.info(locs[j] + "'s IP address is not in the cluster toplogy file!");
+ /**
+ * if the machine is not in the toplogy file
+ */
+ List<Integer> candidatePath = null;
+ for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
+ if (entry.getValue().get() > 0) {
+ candidatePath = entry.getKey();
+ break;
+ }
+ }
+ /** the split path is empty */
+ if (candidatePath != null) {
+ if (availableIpsToSlots.get(candidatePath).get() > 0) {
+ currentCandidatePath = candidatePath;
+ }
+ }
+ }
}
} else {
for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
@@ -120,7 +149,7 @@
}
}
- if (currentCandidatePath.size() > 0) {
+ if (currentCandidatePath != null && currentCandidatePath.size() > 0) {
/**
* Update the entry of the selected IP
*/
@@ -132,10 +161,12 @@
/**
* Update the entry of the selected NC
*/
- String candidateNc = pathToNCs.get(currentCandidatePath);
- int ncIndex = ncNameToIndex.get(candidateNc);
- if (workloads[ncIndex] < slotLimit) {
- return candidateNc;
+ List<String> candidateNcs = pathToNCs.get(currentCandidatePath);
+ for (String candidate : candidateNcs) {
+ int ncIndex = ncNameToIndex.get(candidate);
+ if (workloads[ncIndex] < slotLimit) {
+ return candidate;
+ }
}
}
/** not scheduled */
@@ -154,7 +185,11 @@
int commonLength = Math.min(splitPath.size(), candidatePath.size());
int distance = 0;
for (int i = 0; i < commonLength; i++) {
- distance = distance * 10 + Math.abs(splitPath.get(i) - candidatePath.get(i));
+ distance = distance * 100 + Math.abs(splitPath.get(i) - candidatePath.get(i));
+ }
+ List<Integer> restElements = splitPath.size() > candidatePath.size() ? splitPath : candidatePath;
+ for (int i = commonLength; i < restElements.size(); i++) {
+ distance = distance * 100 + Math.abs(restElements.get(i));
}
return distance;
}
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
index 6e0f90e..b059aad 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
@@ -46,4 +46,4 @@
cd $NCTMP_DIR
#Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh b/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
index 03ce4e7..35c4794 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
@@ -5,6 +5,10 @@
PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
if [ "$PID" == "" ]; then
+ PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
USERID=`id | sed 's/^uid=//;s/(.*$//'`
PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
fi