considering IP proximity in scheduling when there is no topology information
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3148 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
index e357100..aaebc09 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
@@ -24,9 +24,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.PriorityQueue;
import java.util.Random;
+import java.util.TreeMap;
+import java.util.logging.Logger;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.InputSplit;
@@ -43,6 +47,8 @@
@SuppressWarnings("deprecation")
public class Scheduler {
+ private static final Logger LOGGER = Logger.getLogger(Scheduler.class.getName());
+
/** a list of NCs */
private String[] NCs;
@@ -52,6 +58,8 @@
/** a map from the NC name to the index */
private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
+ private Map<String, NodeControllerInfo> ncNameToNcInfos;
+
/**
* The constructor of the scheduler.
*
@@ -61,7 +69,7 @@
public Scheduler(String ipAddress, int port) throws HyracksException {
try {
IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
- Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
+ this.ncNameToNcInfos = hcc.getNodeControllerInfos();
loadIPAddressToNCMap(ncNameToNcInfos);
} catch (Exception e) {
throw new HyracksException(e);
@@ -76,6 +84,7 @@
* @throws HyracksException
*/
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
+ this.ncNameToNcInfos = ncNameToNcInfos;
loadIPAddressToNCMap(ncNameToNcInfos);
}
@@ -87,48 +96,53 @@
* @throws HyracksDataException
*/
public String[] getLocationConstraints(InputSplit[] splits) throws HyracksException {
- int[] capacity = new int[NCs.length];
- Arrays.fill(capacity, 0);
+ int[] workloads = new int[NCs.length];
+ Arrays.fill(workloads, 0);
String[] locations = new String[splits.length];
Map<String, IntWritable> locationToNumOfSplits = new HashMap<String, IntWritable>();
/**
* upper bound number of slots that a machine can get
*/
- int upperBoundSlots = splits.length % capacity.length == 0 ? (splits.length / capacity.length) : (splits.length
- / capacity.length + 1);
+ int upperBoundSlots = splits.length % workloads.length == 0 ? (splits.length / workloads.length)
+ : (splits.length / workloads.length + 1);
/**
* lower bound number of slots that a machine can get
*/
- int lowerBoundSlots = splits.length % capacity.length == 0 ? upperBoundSlots : upperBoundSlots - 1;
+ int lowerBoundSlots = splits.length % workloads.length == 0 ? upperBoundSlots : upperBoundSlots - 1;
try {
Random random = new Random(System.currentTimeMillis());
boolean scheduled[] = new boolean[splits.length];
Arrays.fill(scheduled, false);
-
/**
* scan the splits and build the popularity map
* give the machines with less local splits more scheduling priority
*/
buildPopularityMap(splits, locationToNumOfSplits);
-
/**
* push data-local lower-bounds slots to each machine
*/
- scheduleLocalSlots(splits, capacity, locations, lowerBoundSlots, random, scheduled, locationToNumOfSplits);
+ scheduleLocalSlots(splits, workloads, locations, lowerBoundSlots, random, scheduled, locationToNumOfSplits);
/**
* push data-local upper-bounds slots to each machine
*/
- scheduleLocalSlots(splits, capacity, locations, upperBoundSlots, random, scheduled, locationToNumOfSplits);
+ scheduleLocalSlots(splits, workloads, locations, upperBoundSlots, random, scheduled, locationToNumOfSplits);
+ int dataLocalCount = 0;
+ for (int i = 0; i < scheduled.length; i++) {
+ if (scheduled[i] == true) {
+ dataLocalCount++;
+ }
+ }
+ LOGGER.info("Data local rate: " + ((float) dataLocalCount / (float) (scheduled.length)));
/**
* push non-data-local lower-bounds slots to each machine
*/
- scheduleNoLocalSlots(splits, capacity, locations, lowerBoundSlots, scheduled);
+ scheduleNonLocalSlots(splits, workloads, locations, lowerBoundSlots, scheduled);
/**
* push non-data-local upper-bounds slots to each machine
*/
- scheduleNoLocalSlots(splits, capacity, locations, upperBoundSlots, scheduled);
+ scheduleNonLocalSlots(splits, workloads, locations, upperBoundSlots, scheduled);
return locations;
} catch (IOException e) {
throw new HyracksException(e);
@@ -140,7 +154,7 @@
*
* @param splits
* The HDFS file splits.
- * @param capacity
+ * @param workloads
* The current capacity of each machine.
* @param locations
* The result schedule.
@@ -149,36 +163,100 @@
* @param scheduled
* Indicate which slot is scheduled.
*/
- private void scheduleNoLocalSlots(InputSplit[] splits, int[] capacity, String[] locations, int slots,
- boolean[] scheduled) {
+ private void scheduleNonLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots,
+ boolean[] scheduled) throws IOException, UnknownHostException {
/**
- * find the lowest index the current available NCs
+ * build the map from available ips to the number of available slots
*/
- int currentAvailableNC = 0;
- for (int i = 0; i < capacity.length; i++) {
- if (capacity[i] < slots) {
- currentAvailableNC = i;
- break;
+ TreeMap<BytesWritable, IntWritable> availableIpsToSlots = new TreeMap<BytesWritable, IntWritable>();
+ for (int i = 0; i < workloads.length; i++) {
+ if (workloads[i] < slots) {
+ BytesWritable ip = new BytesWritable(ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress());
+ IntWritable availableSlot = availableIpsToSlots.get(ip);
+ if (availableSlot == null) {
+ availableSlot = new IntWritable(slots - workloads[i]);
+ availableIpsToSlots.put(ip, availableSlot);
+ } else {
+ availableSlot.set(slots - workloads[i] + availableSlot.get());
+ }
}
}
-
+ if (availableIpsToSlots.size() == 0) {
+ return;
+ }
/**
* schedule no-local file reads
*/
for (int i = 0; i < splits.length; i++) {
// if there is no data-local NC choice, choose a random one
if (!scheduled[i]) {
- locations[i] = NCs[currentAvailableNC];
- capacity[currentAvailableNC]++;
- scheduled[i] = true;
+ InputSplit split = splits[i];
+ String[] locs = split.getLocations();
+ int minDistance = Integer.MAX_VALUE;
+ BytesWritable currentCandidateIp = null;
+ if (locs == null || locs.length > 0) {
+ for (int j = 0; j < locs.length; j++) {
+ /**
+ * get all the IP addresses from the name
+ */
+ InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
+ for (InetAddress ip : allIps) {
+ BytesWritable splitIp = new BytesWritable(ip.getAddress());
+ /**
+ * if the node controller exists
+ */
+ BytesWritable candidateNcIp = availableIpsToSlots.floorKey(splitIp);
+ if (candidateNcIp == null) {
+ candidateNcIp = availableIpsToSlots.ceilingKey(splitIp);
+ }
+ if (candidateNcIp != null) {
+ if (availableIpsToSlots.get(candidateNcIp).get() > 0) {
+ byte[] candidateIP = candidateNcIp.getBytes();
+ byte[] splitIP = splitIp.getBytes();
+ int candidateInt = candidateIP[0] << 24 | (candidateIP[1] & 0xFF) << 16
+ | (candidateIP[2] & 0xFF) << 8 | (candidateIP[3] & 0xFF);
+ int splitInt = splitIP[0] << 24 | (splitIP[1] & 0xFF) << 16
+ | (splitIP[2] & 0xFF) << 8 | (splitIP[3] & 0xFF);
+ int distance = Math.abs(candidateInt - splitInt);
+ if (minDistance > distance) {
+ minDistance = distance;
+ currentCandidateIp = candidateNcIp;
+ }
+ }
+ }
+ }
+ }
+ } else {
+ for (Entry<BytesWritable, IntWritable> entry : availableIpsToSlots.entrySet()) {
+ if (entry.getValue().get() > 0) {
+ currentCandidateIp = entry.getKey();
+ break;
+ }
+ }
+ }
- /**
- * move the available NC cursor to the next one
- */
- for (int j = currentAvailableNC; j < capacity.length; j++) {
- if (capacity[j] < slots) {
- currentAvailableNC = j;
- break;
+ if (currentCandidateIp != null) {
+ /**
+ * Update the entry of the selected IP
+ */
+ IntWritable availableSlot = availableIpsToSlots.get(currentCandidateIp);
+ availableSlot.set(availableSlot.get() - 1);
+ if (availableSlot.get() == 0) {
+ availableIpsToSlots.remove(currentCandidateIp);
+ }
+ /**
+ * Update the entry of the selected NC
+ */
+ List<String> dataLocations = ipToNcMapping.get(InetAddress.getByAddress(
+ currentCandidateIp.getBytes()).getHostAddress());
+ for (String nc : dataLocations) {
+ int ncIndex = ncNameToIndex.get(nc);
+ if (workloads[ncIndex] < slots) {
+ locations[i] = nc;
+ workloads[ncIndex]++;
+ scheduled[i] = true;
+ break;
+ }
}
}
}
@@ -190,7 +268,7 @@
*
* @param splits
* The HDFS file splits.
- * @param capacity
+ * @param workloads
* The current capacity of each machine.
* @param locations
* The result schedule.
@@ -203,7 +281,7 @@
* @throws IOException
* @throws UnknownHostException
*/
- private void scheduleLocalSlots(InputSplit[] splits, int[] capacity, String[] locations, int slots, Random random,
+ private void scheduleLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots, Random random,
boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits) throws IOException,
UnknownHostException {
/** scheduling candidates will be ordered inversely according to their popularity */
@@ -216,6 +294,9 @@
});
for (int i = 0; i < splits.length; i++) {
+ if (scheduled[i]) {
+ continue;
+ }
/**
* get the location of all the splits
*/
@@ -249,10 +330,11 @@
/**
* check if the node is already full
*/
- if (capacity[pos] < slots) {
+ if (workloads[pos] < slots) {
locations[i] = nc;
- capacity[pos]++;
+ workloads[pos]++;
scheduled[i] = true;
+ break;
}
}
}
@@ -301,6 +383,8 @@
private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
try {
NCs = new String[ncNameToNcInfos.size()];
+ ipToNcMapping.clear();
+ ncNameToIndex.clear();
int i = 0;
/**
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
index e42e8a6..f392293 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -96,11 +96,11 @@
ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
.getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
.getAddress(), 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
+ ncNameToNcInfos.put("nc7", new NodeControllerInfo("nc7", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+ .getByName("10.0.0.7").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
.getAddress(), 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
+ ncNameToNcInfos.put("nc12", new NodeControllerInfo("nc12", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+ .getByName("10.0.0.12").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
.getAddress(), 5098)));
InputSplit[] fileSplits = new InputSplit[12];
@@ -112,17 +112,16 @@
fileSplits[5] = new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
fileSplits[6] = new FileSplit(new Path("part-7"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
fileSplits[7] = new FileSplit(new Path("part-8"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
- fileSplits[8] = new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" });
+ fileSplits[8] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.14", "10.0.0.11", "10.0.0.13" });
fileSplits[9] = new FileSplit(new Path("part-10"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.6" });
fileSplits[10] = new FileSplit(new Path("part-11"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.7" });
- fileSplits[11] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
+ fileSplits[11] = new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" });
Scheduler scheduler = new Scheduler(ncNameToNcInfos);
String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
- String[] expectedResults = new String[] { "nc1", "nc4", "nc6", "nc1", "nc4", "nc2", "nc2", "nc3", "nc6", "nc5",
- "nc3", "nc5" };
-
+ String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc12",
+ "nc7", "nc7", "nc12" };
for (int i = 0; i < locationConstraints.length; i++) {
Assert.assertEquals(locationConstraints[i], expectedResults[i]);
}
@@ -223,8 +222,8 @@
Scheduler scheduler = new Scheduler(ncNameToNcInfos);
String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
- String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc6",
- "nc5", "nc3", "nc5" };
+ String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc1",
+ "nc5", "nc2", "nc4" };
for (int i = 0; i < locationConstraints.length; i++) {
Assert.assertEquals(locationConstraints[i], expectedResults[i]);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
index b5d9d1a..442aeae 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -227,8 +227,8 @@
Scheduler scheduler = new Scheduler(ncNameToNcInfos);
String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
- String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc6",
- "nc5", "nc3", "nc5" };
+ String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc1",
+ "nc5", "nc2", "nc4" };
for (int i = 0; i < locationConstraints.length; i++) {
Assert.assertEquals(locationConstraints[i], expectedResults[i]);