Merge fullstack_asterix_stabilization with fullstack_hyracks_result_distribution.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@3170 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
index 9092655..3eb2316 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
<fork>true</fork>
</configuration>
</plugin>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
index 8b7ecf0..2b05f44 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
@@ -17,8 +17,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
<fork>true</fork>
</configuration>
</plugin>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
index a28c698a..bb0601f 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
<fork>true</fork>
</configuration>
</plugin>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollection.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollection.java
new file mode 100644
index 0000000..c51c1dd
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollection.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.hyracks.hdfs.api;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+@SuppressWarnings("deprecation")
+public interface INcCollection {
+
+ public String findNearestAvailableSlot(InputSplit split);
+
+ public int numAvailableSlots();
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollectionBuilder.java
new file mode 100644
index 0000000..ef3ff23
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollectionBuilder.java
@@ -0,0 +1,18 @@
+package edu.uci.ics.hyracks.hdfs.api;
+
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+
+/**
+ * NC collections
+ *
+ * @author yingyib
+ */
+public interface INcCollectionBuilder {
+
+ public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
+ Map<String, List<String>> ipToNcMapping, Map<String, Integer> ncNameToIndex, String[] NCs, int[] workloads,
+ int slotLimit);
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
index 9cc9ebc..147e872 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
@@ -23,6 +23,7 @@
import java.io.Serializable;
import java.lang.reflect.Constructor;
+import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -38,6 +39,8 @@
splitBytes = splitsToBytes(splits);
if (splits.length > 0) {
splitClassName = splits[0].getClass().getName();
+ } else {
+ splitClassName = FileSplit.class.getName();
}
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
new file mode 100644
index 0000000..320b48b
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
@@ -0,0 +1,121 @@
+package edu.uci.ics.hyracks.hdfs.scheduler;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputSplit;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.hdfs.api.INcCollection;
+import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
+
+@SuppressWarnings("deprecation")
+public class IPProximityNcCollectionBuilder implements INcCollectionBuilder {
+
+ @Override
+ public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
+ final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
+ final int[] workloads, final int slotLimit) {
+ final TreeMap<BytesWritable, IntWritable> availableIpsToSlots = new TreeMap<BytesWritable, IntWritable>();
+ for (int i = 0; i < workloads.length; i++) {
+ if (workloads[i] < slotLimit) {
+ BytesWritable ip = new BytesWritable(ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress());
+ IntWritable availableSlot = availableIpsToSlots.get(ip);
+ if (availableSlot == null) {
+ availableSlot = new IntWritable(slotLimit - workloads[i]);
+ availableIpsToSlots.put(ip, availableSlot);
+ } else {
+ availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
+ }
+ }
+ }
+ return new INcCollection() {
+
+ @Override
+ public String findNearestAvailableSlot(InputSplit split) {
+ try {
+ String[] locs = split.getLocations();
+ int minDistance = Integer.MAX_VALUE;
+ BytesWritable currentCandidateIp = null;
+ if (locs == null || locs.length > 0) {
+ for (int j = 0; j < locs.length; j++) {
+ /**
+ * get all the IP addresses from the name
+ */
+ InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
+ for (InetAddress ip : allIps) {
+ BytesWritable splitIp = new BytesWritable(ip.getAddress());
+ /**
+ * if the node controller exists
+ */
+ BytesWritable candidateNcIp = availableIpsToSlots.floorKey(splitIp);
+ if (candidateNcIp == null) {
+ candidateNcIp = availableIpsToSlots.ceilingKey(splitIp);
+ }
+ if (candidateNcIp != null) {
+ if (availableIpsToSlots.get(candidateNcIp).get() > 0) {
+ byte[] candidateIP = candidateNcIp.getBytes();
+ byte[] splitIP = splitIp.getBytes();
+ int candidateInt = candidateIP[0] << 24 | (candidateIP[1] & 0xFF) << 16
+ | (candidateIP[2] & 0xFF) << 8 | (candidateIP[3] & 0xFF);
+ int splitInt = splitIP[0] << 24 | (splitIP[1] & 0xFF) << 16
+ | (splitIP[2] & 0xFF) << 8 | (splitIP[3] & 0xFF);
+ int distance = Math.abs(candidateInt - splitInt);
+ if (minDistance > distance) {
+ minDistance = distance;
+ currentCandidateIp = candidateNcIp;
+ }
+ }
+ }
+ }
+ }
+ } else {
+ for (Entry<BytesWritable, IntWritable> entry : availableIpsToSlots.entrySet()) {
+ if (entry.getValue().get() > 0) {
+ currentCandidateIp = entry.getKey();
+ break;
+ }
+ }
+ }
+
+ if (currentCandidateIp != null) {
+ /**
+ * Update the entry of the selected IP
+ */
+ IntWritable availableSlot = availableIpsToSlots.get(currentCandidateIp);
+ availableSlot.set(availableSlot.get() - 1);
+ if (availableSlot.get() == 0) {
+ availableIpsToSlots.remove(currentCandidateIp);
+ }
+ /**
+ * Update the entry of the selected NC
+ */
+ List<String> dataLocations = ipToNcMapping.get(InetAddress.getByAddress(
+ currentCandidateIp.getBytes()).getHostAddress());
+ for (String nc : dataLocations) {
+ int ncIndex = ncNameToIndex.get(nc);
+ if (workloads[ncIndex] < slotLimit) {
+ return nc;
+ }
+ }
+ }
+ /** not scheduled */
+ return null;
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public int numAvailableSlots() {
+ return availableIpsToSlots.size();
+ }
+
+ };
+ }
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
new file mode 100644
index 0000000..2b9e899
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
@@ -0,0 +1,202 @@
+package edu.uci.ics.hyracks.hdfs.scheduler;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputSplit;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+import edu.uci.ics.hyracks.hdfs.api.INcCollection;
+import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
+
+@SuppressWarnings("deprecation")
+public class RackAwareNcCollectionBuilder implements INcCollectionBuilder {
+ private static final Logger LOGGER = Logger.getLogger(RackAwareNcCollectionBuilder.class.getName());
+ private ClusterTopology topology;
+
+ public RackAwareNcCollectionBuilder(ClusterTopology topology) {
+ this.topology = topology;
+ }
+
+ @Override
+ public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
+ final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
+ final int[] workloads, final int slotLimit) {
+ try {
+ final Map<List<Integer>, List<String>> pathToNCs = new HashMap<List<Integer>, List<String>>();
+ for (int i = 0; i < NCs.length; i++) {
+ List<Integer> path = new ArrayList<Integer>();
+ String ipAddress = InetAddress.getByAddress(
+ ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress()).getHostAddress();
+ topology.lookupNetworkTerminal(ipAddress, path);
+ List<String> ncs = pathToNCs.get(path);
+ if (ncs == null) {
+ ncs = new ArrayList<String>();
+ pathToNCs.put(path, ncs);
+ }
+ ncs.add(NCs[i]);
+ }
+
+ final TreeMap<List<Integer>, IntWritable> availableIpsToSlots = new TreeMap<List<Integer>, IntWritable>(
+ new Comparator<List<Integer>>() {
+
+ @Override
+ public int compare(List<Integer> l1, List<Integer> l2) {
+ int commonLength = Math.min(l1.size(), l2.size());
+ for (int i = 0; i < commonLength; i++) {
+ Integer value1 = l1.get(i);
+ Integer value2 = l2.get(i);
+ int cmp = value1 > value2 ? 1 : (value1 < value2 ? -1 : 0);
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+ return l1.size() > l2.size() ? 1 : (l1.size() < l2.size() ? -1 : 0);
+ }
+
+ });
+ for (int i = 0; i < workloads.length; i++) {
+ if (workloads[i] < slotLimit) {
+ List<Integer> path = new ArrayList<Integer>();
+ String ipAddress = InetAddress.getByAddress(
+ ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress()).getHostAddress();
+ topology.lookupNetworkTerminal(ipAddress, path);
+ IntWritable availableSlot = availableIpsToSlots.get(path);
+ if (availableSlot == null) {
+ availableSlot = new IntWritable(slotLimit - workloads[i]);
+ availableIpsToSlots.put(path, availableSlot);
+ } else {
+ availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
+ }
+ }
+ }
+ return new INcCollection() {
+
+ @Override
+ public String findNearestAvailableSlot(InputSplit split) {
+ try {
+ String[] locs = split.getLocations();
+ int minDistance = Integer.MAX_VALUE;
+ List<Integer> currentCandidatePath = null;
+ if (locs == null || locs.length > 0) {
+ for (int j = 0; j < locs.length; j++) {
+ /**
+ * get all the IP addresses from the name
+ */
+ InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
+ boolean inTopology = false;
+ for (InetAddress ip : allIps) {
+ List<Integer> splitPath = new ArrayList<Integer>();
+ boolean inCluster = topology.lookupNetworkTerminal(ip.getHostAddress(), splitPath);
+ if (!inCluster) {
+ continue;
+ }
+ inTopology = true;
+ /**
+ * if the node controller exists
+ */
+ List<Integer> candidatePath = availableIpsToSlots.floorKey(splitPath);
+ if (candidatePath == null) {
+ candidatePath = availableIpsToSlots.ceilingKey(splitPath);
+ }
+ if (candidatePath != null) {
+ if (availableIpsToSlots.get(candidatePath).get() > 0) {
+ int distance = distance(splitPath, candidatePath);
+ if (minDistance > distance) {
+ minDistance = distance;
+ currentCandidatePath = candidatePath;
+ }
+ }
+
+ }
+ }
+
+ if (!inTopology) {
+ LOGGER.info(locs[j] + "'s IP address is not in the cluster toplogy file!");
+ /**
+ * if the machine is not in the toplogy file
+ */
+ List<Integer> candidatePath = null;
+ for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
+ if (entry.getValue().get() > 0) {
+ candidatePath = entry.getKey();
+ break;
+ }
+ }
+ /** the split path is empty */
+ if (candidatePath != null) {
+ if (availableIpsToSlots.get(candidatePath).get() > 0) {
+ currentCandidatePath = candidatePath;
+ }
+ }
+ }
+ }
+ } else {
+ for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
+ if (entry.getValue().get() > 0) {
+ currentCandidatePath = entry.getKey();
+ break;
+ }
+ }
+ }
+
+ if (currentCandidatePath != null && currentCandidatePath.size() > 0) {
+ /**
+ * Update the entry of the selected IP
+ */
+ IntWritable availableSlot = availableIpsToSlots.get(currentCandidatePath);
+ availableSlot.set(availableSlot.get() - 1);
+ if (availableSlot.get() == 0) {
+ availableIpsToSlots.remove(currentCandidatePath);
+ }
+ /**
+ * Update the entry of the selected NC
+ */
+ List<String> candidateNcs = pathToNCs.get(currentCandidatePath);
+ for (String candidate : candidateNcs) {
+ int ncIndex = ncNameToIndex.get(candidate);
+ if (workloads[ncIndex] < slotLimit) {
+ return candidate;
+ }
+ }
+ }
+ /** not scheduled */
+ return null;
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public int numAvailableSlots() {
+ return availableIpsToSlots.size();
+ }
+
+ private int distance(List<Integer> splitPath, List<Integer> candidatePath) {
+ int commonLength = Math.min(splitPath.size(), candidatePath.size());
+ int distance = 0;
+ for (int i = 0; i < commonLength; i++) {
+ distance = distance * 100 + Math.abs(splitPath.get(i) - candidatePath.get(i));
+ }
+ List<Integer> restElements = splitPath.size() > candidatePath.size() ? splitPath : candidatePath;
+ for (int i = commonLength; i < restElements.size(); i++) {
+ distance = distance * 100 + Math.abs(restElements.get(i));
+ }
+ return distance;
+ }
+ };
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
index 3f287cf..3f7997b 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
@@ -20,11 +20,15 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
import java.util.Random;
+import java.util.logging.Logger;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.InputSplit;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -32,13 +36,17 @@
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+import edu.uci.ics.hyracks.hdfs.api.INcCollection;
+import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
/**
- * The scheduler conduct data-local scheduling for data reading on HDFS.
- * This class works for Hadoop old API.
+ * The scheduler conduct data-local scheduling for data reading on HDFS. This
+ * class works for Hadoop old API.
*/
@SuppressWarnings("deprecation")
public class Scheduler {
+ private static final Logger LOGGER = Logger.getLogger(Scheduler.class.getName());
/** a list of NCs */
private String[] NCs;
@@ -49,6 +57,14 @@
/** a map from the NC name to the index */
private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
+ /** a map from NC name to the NodeControllerInfo */
+ private Map<String, NodeControllerInfo> ncNameToNcInfos;
+
+ /**
+ * the nc collection builder
+ */
+ private INcCollectionBuilder ncCollectionBuilder;
+
/**
* The constructor of the scheduler.
*
@@ -58,7 +74,10 @@
public Scheduler(String ipAddress, int port) throws HyracksException {
try {
IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
- Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
+ this.ncNameToNcInfos = hcc.getNodeControllerInfos();
+ ClusterTopology topology = hcc.getClusterTopology();
+ this.ncCollectionBuilder = topology == null ? new IPProximityNcCollectionBuilder()
+ : new RackAwareNcCollectionBuilder(topology);
loadIPAddressToNCMap(ncNameToNcInfos);
} catch (Exception e) {
throw new HyracksException(e);
@@ -68,56 +87,122 @@
/**
* The constructor of the scheduler.
*
- * @param ncNameToNcInfos the mapping from nc names to nc infos
+ * @param ncNameToNcInfos
+ * @throws HyracksException
+ */
+ public Scheduler(String ipAddress, int port, INcCollectionBuilder ncCollectionBuilder) throws HyracksException {
+ try {
+ IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+ this.ncNameToNcInfos = hcc.getNodeControllerInfos();
+ this.ncCollectionBuilder = ncCollectionBuilder;
+ loadIPAddressToNCMap(ncNameToNcInfos);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * the mapping from nc names to nc infos
* @throws HyracksException
*/
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
+ this.ncNameToNcInfos = ncNameToNcInfos;
+ this.ncCollectionBuilder = new IPProximityNcCollectionBuilder();
loadIPAddressToNCMap(ncNameToNcInfos);
}
/**
- * Set location constraints for a file scan operator with a list of file splits.
- * It guarantees the maximum slots a machine can is at most one more than the minimum slots a
- * machine can get.
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * the mapping from nc names to nc infos
+ * @param topology
+ * the hyracks cluster toplogy
+ * @throws HyracksException
+ */
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology) throws HyracksException {
+ this(ncNameToNcInfos);
+ this.ncCollectionBuilder = topology == null ? new IPProximityNcCollectionBuilder()
+ : new RackAwareNcCollectionBuilder(topology);
+ }
+
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * the mapping from nc names to nc infos
+ * @throws HyracksException
+ */
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder ncCollectionBuilder)
+ throws HyracksException {
+ this.ncNameToNcInfos = ncNameToNcInfos;
+ this.ncCollectionBuilder = ncCollectionBuilder;
+ loadIPAddressToNCMap(ncNameToNcInfos);
+ }
+
+ /**
+ * Set location constraints for a file scan operator with a list of file
+ * splits. It guarantees the maximum slots a machine can is at most one more
+ * than the minimum slots a machine can get.
*
* @throws HyracksDataException
*/
public String[] getLocationConstraints(InputSplit[] splits) throws HyracksException {
- int[] capacity = new int[NCs.length];
- Arrays.fill(capacity, 0);
+ if (splits == null) {
+ /** deal the case when the splits array is null */
+ return new String[] {};
+ }
+ int[] workloads = new int[NCs.length];
+ Arrays.fill(workloads, 0);
String[] locations = new String[splits.length];
+ Map<String, IntWritable> locationToNumOfSplits = new HashMap<String, IntWritable>();
/**
* upper bound number of slots that a machine can get
*/
- int upperBoundSlots = splits.length % capacity.length == 0 ? (splits.length / capacity.length) : (splits.length
- / capacity.length + 1);
+ int upperBoundSlots = splits.length % workloads.length == 0 ? (splits.length / workloads.length)
+ : (splits.length / workloads.length + 1);
/**
* lower bound number of slots that a machine can get
*/
- int lowerBoundSlots = splits.length % capacity.length == 0 ? upperBoundSlots : upperBoundSlots - 1;
+ int lowerBoundSlots = splits.length % workloads.length == 0 ? upperBoundSlots : upperBoundSlots - 1;
try {
Random random = new Random(System.currentTimeMillis());
boolean scheduled[] = new boolean[splits.length];
Arrays.fill(scheduled, false);
-
+ /**
+ * scan the splits and build the popularity map
+ * give the machines with less local splits more scheduling priority
+ */
+ buildPopularityMap(splits, locationToNumOfSplits);
/**
* push data-local lower-bounds slots to each machine
*/
- scheduleLocalSlots(splits, capacity, locations, lowerBoundSlots, random, scheduled);
+ scheduleLocalSlots(splits, workloads, locations, lowerBoundSlots, random, scheduled, locationToNumOfSplits);
/**
* push data-local upper-bounds slots to each machine
*/
- scheduleLocalSlots(splits, capacity, locations, upperBoundSlots, random, scheduled);
+ scheduleLocalSlots(splits, workloads, locations, upperBoundSlots, random, scheduled, locationToNumOfSplits);
+ int dataLocalCount = 0;
+ for (int i = 0; i < scheduled.length; i++) {
+ if (scheduled[i] == true) {
+ dataLocalCount++;
+ }
+ }
+ LOGGER.info("Data local rate: "
+ + (scheduled.length == 0 ? 0.0 : ((float) dataLocalCount / (float) (scheduled.length))));
/**
* push non-data-local lower-bounds slots to each machine
*/
- scheduleNoLocalSlots(splits, capacity, locations, lowerBoundSlots, scheduled);
+ scheduleNonLocalSlots(splits, workloads, locations, lowerBoundSlots, scheduled);
/**
* push non-data-local upper-bounds slots to each machine
*/
- scheduleNoLocalSlots(splits, capacity, locations, upperBoundSlots, scheduled);
+ scheduleNonLocalSlots(splits, workloads, locations, upperBoundSlots, scheduled);
return locations;
} catch (IOException e) {
throw new HyracksException(e);
@@ -129,46 +214,38 @@
*
* @param splits
* The HDFS file splits.
- * @param capacity
+ * @param workloads
* The current capacity of each machine.
* @param locations
* The result schedule.
- * @param slots
+ * @param slotLimit
* The maximum slots of each machine.
* @param scheduled
* Indicate which slot is scheduled.
*/
- private void scheduleNoLocalSlots(InputSplit[] splits, int[] capacity, String[] locations, int slots,
- boolean[] scheduled) {
+ private void scheduleNonLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slotLimit,
+ boolean[] scheduled) throws IOException, UnknownHostException {
/**
- * find the lowest index the current available NCs
+ * build the map from available ips to the number of available slots
*/
- int currentAvailableNC = 0;
- for (int i = 0; i < capacity.length; i++) {
- if (capacity[i] < slots) {
- currentAvailableNC = i;
- break;
- }
+ INcCollection ncCollection = this.ncCollectionBuilder.build(ncNameToNcInfos, ipToNcMapping, ncNameToIndex, NCs,
+ workloads, slotLimit);
+ if (ncCollection.numAvailableSlots() == 0) {
+ return;
}
-
/**
* schedule no-local file reads
*/
for (int i = 0; i < splits.length; i++) {
- // if there is no data-local NC choice, choose a random one
+ /** if there is no data-local NC choice, choose a random one */
if (!scheduled[i]) {
- locations[i] = NCs[currentAvailableNC];
- capacity[currentAvailableNC]++;
- scheduled[i] = true;
-
- /**
- * move the available NC cursor to the next one
- */
- for (int j = currentAvailableNC; j < capacity.length; j++) {
- if (capacity[j] < slots) {
- currentAvailableNC = j;
- break;
- }
+ InputSplit split = splits[i];
+ String selectedNcName = ncCollection.findNearestAvailableSlot(split);
+ if (selectedNcName != null) {
+ int ncIndex = ncNameToIndex.get(selectedNcName);
+ workloads[ncIndex]++;
+ scheduled[i] = true;
+ locations[i] = selectedNcName;
}
}
}
@@ -179,7 +256,7 @@
*
* @param splits
* The HDFS file splits.
- * @param capacity
+ * @param workloads
* The current capacity of each machine.
* @param locations
* The result schedule.
@@ -192,19 +269,37 @@
* @throws IOException
* @throws UnknownHostException
*/
- private void scheduleLocalSlots(InputSplit[] splits, int[] capacity, String[] locations, int slots, Random random,
- boolean[] scheduled) throws IOException, UnknownHostException {
+ private void scheduleLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots, Random random,
+ boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits) throws IOException,
+ UnknownHostException {
+ /** scheduling candidates will be ordered inversely according to their popularity */
+ PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(3, new Comparator<String>() {
+
+ @Override
+ public int compare(String s1, String s2) {
+ return locationToNumSplits.get(s1).compareTo(locationToNumSplits.get(s2));
+ }
+
+ });
for (int i = 0; i < splits.length; i++) {
+ if (scheduled[i]) {
+ continue;
+ }
/**
* get the location of all the splits
*/
- String[] loc = splits[i].getLocations();
- if (loc.length > 0) {
- for (int j = 0; j < loc.length; j++) {
+ String[] locs = splits[i].getLocations();
+ if (locs.length > 0) {
+ scheduleCadndiates.clear();
+ for (int j = 0; j < locs.length; j++) {
+ scheduleCadndiates.add(locs[j]);
+ }
+
+ for (String candidate : scheduleCadndiates) {
/**
* get all the IP addresses from the name
*/
- InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
+ InetAddress[] allIps = InetAddress.getAllByName(candidate);
/**
* iterate overa all ips
*/
@@ -223,16 +318,17 @@
/**
* check if the node is already full
*/
- if (capacity[pos] < slots) {
+ if (workloads[pos] < slots) {
locations[i] = nc;
- capacity[pos]++;
+ workloads[pos]++;
scheduled[i] = true;
+ break;
}
}
}
-
/**
- * break the loop for data-locations if the schedule has already been found
+ * break the loop for data-locations if the schedule has
+ * already been found
*/
if (scheduled[i] == true) {
break;
@@ -243,6 +339,30 @@
}
/**
+ * Scan the splits once and build a popularity map
+ *
+ * @param splits
+ * the split array
+ * @param locationToNumOfSplits
+ * the map to be built
+ * @throws IOException
+ */
+ private void buildPopularityMap(InputSplit[] splits, Map<String, IntWritable> locationToNumOfSplits)
+ throws IOException {
+ for (InputSplit split : splits) {
+ String[] locations = split.getLocations();
+ for (String loc : locations) {
+ IntWritable locCount = locationToNumOfSplits.get(loc);
+ if (locCount == null) {
+ locCount = new IntWritable(0);
+ locationToNumOfSplits.put(loc, locCount);
+ }
+ locCount.set(locCount.get() + 1);
+ }
+ }
+ }
+
+ /**
* Load the IP-address-to-NC map from the NCNameToNCInfoMap
*
* @param ncNameToNcInfos
@@ -251,6 +371,8 @@
private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
try {
NCs = new String[ncNameToNcInfos.size()];
+ ipToNcMapping.clear();
+ ncNameToIndex.clear();
int i = 0;
/**
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
index cb97ca1..cde187d 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
/**
* The scheduler conduct data-local scheduling for data reading on HDFS.
@@ -53,6 +54,17 @@
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos);
}
+
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * the mapping from nc names to nc infos
+ * @throws HyracksException
+ */
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder builder) throws HyracksException {
+ scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos, builder);
+ }
/**
* Set location constraints for a file scan operator with a list of file splits
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
index 2686077..affe1347 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
@@ -71,7 +71,6 @@
private static final String HDFS_INPUT_PATH = "/customer/";
private static final String HDFS_OUTPUT_PATH = "/customer_result/";
- private static final String HYRACKS_APP_NAME = "DataflowTest";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
private MiniDFSCluster dfsCluster;
@@ -82,7 +81,6 @@
public void setUp() throws Exception {
cleanupStores();
HyracksUtils.init();
- HyracksUtils.createApp(HYRACKS_APP_NAME);
FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
startHDFS();
@@ -164,7 +162,7 @@
IHyracksClientConnection client = new HyracksConnection(HyracksUtils.CC_HOST,
HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
- JobId jobId = client.startJob(HYRACKS_APP_NAME, jobSpec);
+ JobId jobId = client.startJob(jobSpec);
client.waitForCompletion(jobId);
Assert.assertEquals(true, checkResults());
@@ -196,7 +194,6 @@
@Override
public void tearDown() throws Exception {
- HyracksUtils.destroyApp(HYRACKS_APP_NAME);
HyracksUtils.deinit();
cleanupHDFS();
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
index eccd5ee..1f394a9 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.hdfs.scheduler;
+import java.io.FileReader;
+import java.io.IOException;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
@@ -25,13 +27,28 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.client.NodeStatus;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+import edu.uci.ics.hyracks.api.topology.TopologyDefinitionParser;
@SuppressWarnings("deprecation")
public class SchedulerTest extends TestCase {
+ private static String TOPOLOGY_PATH = "src/test/resources/topology.xml";
+
+ private ClusterTopology parseTopology() throws IOException, SAXException {
+ FileReader fr = new FileReader(TOPOLOGY_PATH);
+ InputSource in = new InputSource(fr);
+ try {
+ return TopologyDefinitionParser.parse(in);
+ } finally {
+ fr.close();
+ }
+ }
/**
* Test the scheduler for the case when the Hyracks cluster is the HDFS cluster
@@ -67,11 +84,17 @@
fileSplits[4] = new FileSplit(new Path("part-5"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
fileSplits[5] = new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
+ String[] expectedResults = new String[] { "nc1", "nc4", "nc6", "nc2", "nc3", "nc5" };
+
Scheduler scheduler = new Scheduler(ncNameToNcInfos);
String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
- String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc5", "nc6" };
-
+ ClusterTopology topology = parseTopology();
+ scheduler = new Scheduler(ncNameToNcInfos, topology);
+ locationConstraints = scheduler.getLocationConstraints(fileSplits);
for (int i = 0; i < locationConstraints.length; i++) {
Assert.assertEquals(locationConstraints[i], expectedResults[i]);
}
@@ -96,11 +119,11 @@
ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
.getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
.getAddress(), 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
+ ncNameToNcInfos.put("nc7", new NodeControllerInfo("nc7", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+ .getByName("10.0.0.7").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
.getAddress(), 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
+ ncNameToNcInfos.put("nc12", new NodeControllerInfo("nc12", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+ .getByName("10.0.0.12").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
.getAddress(), 5098)));
InputSplit[] fileSplits = new InputSplit[12];
@@ -112,17 +135,25 @@
fileSplits[5] = new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
fileSplits[6] = new FileSplit(new Path("part-7"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
fileSplits[7] = new FileSplit(new Path("part-8"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
- fileSplits[8] = new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" });
+ fileSplits[8] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.14", "10.0.0.11", "10.0.0.13" });
fileSplits[9] = new FileSplit(new Path("part-10"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.6" });
fileSplits[10] = new FileSplit(new Path("part-11"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.7" });
- fileSplits[11] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
+ fileSplits[11] = new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" });
Scheduler scheduler = new Scheduler(ncNameToNcInfos);
String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
- String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
- "nc6", "nc5" };
+ String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc12",
+ "nc7", "nc7", "nc12" };
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+ expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc7", "nc12", "nc7",
+ "nc12" };
+ ClusterTopology topology = parseTopology();
+ scheduler = new Scheduler(ncNameToNcInfos, topology);
+ locationConstraints = scheduler.getLocationConstraints(fileSplits);
for (int i = 0; i < locationConstraints.length; i++) {
Assert.assertEquals(locationConstraints[i], expectedResults[i]);
}
@@ -168,12 +199,19 @@
fileSplits[10] = new FileSplit(new Path("part-11"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
fileSplits[11] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
+ String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc6",
+ "nc5", "nc6" };
+
Scheduler scheduler = new Scheduler(ncNameToNcInfos);
String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
- String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
- "nc5", "nc6" };
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+ ClusterTopology topology = parseTopology();
+ scheduler = new Scheduler(ncNameToNcInfos, topology);
+ locationConstraints = scheduler.getLocationConstraints(fileSplits);
for (int i = 0; i < locationConstraints.length; i++) {
Assert.assertEquals(locationConstraints[i], expectedResults[i]);
}
@@ -220,15 +258,84 @@
fileSplits[11] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
fileSplits[12] = new FileSplit(new Path("part-13"), 0, 0, new String[] { "10.0.0.2", "10.0.0.4", "10.0.0.5" });
+ String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc1",
+ "nc5", "nc2", "nc4" };
+
Scheduler scheduler = new Scheduler(ncNameToNcInfos);
String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
- String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
- "nc5", "nc5", "nc6" };
-
for (int i = 0; i < locationConstraints.length; i++) {
Assert.assertEquals(locationConstraints[i], expectedResults[i]);
}
+
+ ClusterTopology topology = parseTopology();
+ scheduler = new Scheduler(ncNameToNcInfos, topology);
+ locationConstraints = scheduler.getLocationConstraints(fileSplits);
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+
+ }
+
+ /**
+ * Test boundary cases where splits array is empty or null
+ *
+ * @throws Exception
+ */
+ public void testSchedulercBoundary() throws Exception {
+ Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
+ ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+ .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
+ .getAddress(), 5098)));
+ ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+ .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
+ .getAddress(), 5098)));
+ ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+ .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
+ .getAddress(), 5098)));
+ ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+ .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
+ .getAddress(), 5098)));
+ ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+ .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
+ .getAddress(), 5098)));
+ ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+ .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
+ .getAddress(), 5098)));
+
+ /** test empty file splits */
+ InputSplit[] fileSplits = new InputSplit[0];
+ String[] expectedResults = new String[] {};
+
+ Scheduler scheduler = new Scheduler(ncNameToNcInfos);
+ String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
+
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+
+ ClusterTopology topology = parseTopology();
+ scheduler = new Scheduler(ncNameToNcInfos, topology);
+ locationConstraints = scheduler.getLocationConstraints(fileSplits);
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+
+ fileSplits = null;
+ expectedResults = new String[] {};
+
+ scheduler = new Scheduler(ncNameToNcInfos);
+ locationConstraints = scheduler.getLocationConstraints(fileSplits);
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+
+ scheduler = new Scheduler(ncNameToNcInfos, topology);
+ locationConstraints = scheduler.getLocationConstraints(fileSplits);
+ for (int i = 0; i < locationConstraints.length; i++) {
+ Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+ }
+
}
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
index 8c12518..bdff2fd 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
@@ -83,14 +83,6 @@
hcc = new HyracksConnection(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
}
- public static void destroyApp(String hyracksAppName) throws Exception {
- hcc.destroyApplication(hyracksAppName);
- }
-
- public static void createApp(String hyracksAppName) throws Exception {
- hcc.createApplication(hyracksAppName, null);
- }
-
public static void deinit() throws Exception {
nc2.stop();
nc1.stop();
@@ -99,7 +91,7 @@
public static void runJob(JobSpecification spec, String appName) throws Exception {
spec.setFrameSize(FRAME_SIZE);
- JobId jobId = hcc.startJob(appName, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
hcc.waitForCompletion(jobId);
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
index 9f77979..c2892e9 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
@@ -74,7 +74,6 @@
private static final String HDFS_INPUT_PATH = "/customer/";
private static final String HDFS_OUTPUT_PATH = "/customer_result/";
- private static final String HYRACKS_APP_NAME = "DataflowTest";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
private MiniDFSCluster dfsCluster;
private MiniDFSClusterFactory dfsClusterFactory = new MiniDFSClusterFactory();
@@ -87,7 +86,6 @@
conf = new Job();
cleanupStores();
HyracksUtils.init();
- HyracksUtils.createApp(HYRACKS_APP_NAME);
FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
startHDFS();
@@ -171,7 +169,7 @@
IHyracksClientConnection client = new HyracksConnection(HyracksUtils.CC_HOST,
HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
- JobId jobId = client.startJob(HYRACKS_APP_NAME, jobSpec);
+ JobId jobId = client.startJob(jobSpec);
client.waitForCompletion(jobId);
Assert.assertEquals(true, checkResults());
@@ -203,7 +201,6 @@
@Override
public void tearDown() throws Exception {
- HyracksUtils.destroyApp(HYRACKS_APP_NAME);
HyracksUtils.deinit();
cleanupHDFS();
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
index df3feb2..442aeae0 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -74,7 +74,7 @@
Scheduler scheduler = new Scheduler(ncNameToNcInfos);
String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
- String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc5", "nc6" };
+ String[] expectedResults = new String[] { "nc1", "nc4", "nc6", "nc2", "nc3", "nc5" };
for (int i = 0; i < locationConstraints.length; i++) {
Assert.assertEquals(locationConstraints[i], expectedResults[i]);
@@ -124,8 +124,8 @@
Scheduler scheduler = new Scheduler(ncNameToNcInfos);
String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
- String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
- "nc6", "nc5" };
+ String[] expectedResults = new String[] { "nc1", "nc4", "nc6", "nc1", "nc4", "nc2", "nc2", "nc3", "nc6", "nc5",
+ "nc3", "nc5" };
for (int i = 0; i < locationConstraints.length; i++) {
Assert.assertEquals(locationConstraints[i], expectedResults[i]);
@@ -175,7 +175,7 @@
Scheduler scheduler = new Scheduler(ncNameToNcInfos);
String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
- String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
+ String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc6",
"nc5", "nc6" };
for (int i = 0; i < locationConstraints.length; i++) {
@@ -227,8 +227,8 @@
Scheduler scheduler = new Scheduler(ncNameToNcInfos);
String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
- String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
- "nc5", "nc5", "nc6" };
+ String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc1",
+ "nc5", "nc2", "nc4" };
for (int i = 0; i < locationConstraints.length; i++) {
Assert.assertEquals(locationConstraints[i], expectedResults[i]);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/topology.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/topology.xml
new file mode 100644
index 0000000..3a0ac7e
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/topology.xml
@@ -0,0 +1,32 @@
+<cluster-topology>
+ <network-switch name="all">
+ <network-switch name="rack1">
+ <terminal name="10.0.0.1" />
+ <terminal name="10.0.0.5" />
+ <terminal name="10.0.0.9" />
+ <terminal name="10.0.0.13" />
+ <terminal name="10.0.0.17" />
+ </network-switch>
+ <network-switch name="rack2">
+ <terminal name="10.0.0.2" />
+ <terminal name="10.0.0.6" />
+ <terminal name="10.0.0.10" />
+ <terminal name="10.0.0.14" />
+ <terminal name="10.0.0.18" />
+ </network-switch>
+ <network-switch name="rack3">
+ <terminal name="10.0.0.3" />
+ <terminal name="10.0.0.7" />
+ <terminal name="10.0.0.11" />
+ <terminal name="10.0.0.15" />
+ <terminal name="10.0.0.19" />
+ </network-switch>
+ <network-switch name="rack4">
+ <terminal name="10.0.0.4" />
+ <terminal name="10.0.0.8" />
+ <terminal name="10.0.0.12" />
+ <terminal name="10.0.0.16" />
+ <terminal name="10.0.0.20" />
+ </network-switch>
+ </network-switch>
+</cluster-topology>
\ No newline at end of file