refactoring hdfs-scheduler to support different policies
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3154 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollection.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollection.java
new file mode 100644
index 0000000..c51c1dd
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollection.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.hyracks.hdfs.api;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+@SuppressWarnings("deprecation")
+public interface INcCollection {
+
+ public String findNearestAvailableSlot(InputSplit split);
+
+ public int numAvailableSlots();
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollectionBuilder.java
new file mode 100644
index 0000000..ef3ff23
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollectionBuilder.java
@@ -0,0 +1,18 @@
+package edu.uci.ics.hyracks.hdfs.api;
+
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+
+/**
+ * NC collections
+ *
+ * @author yingyib
+ */
+public interface INcCollectionBuilder {
+
+ public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
+ Map<String, List<String>> ipToNcMapping, Map<String, Integer> ncNameToIndex, String[] NCs, int[] workloads,
+ int slotLimit);
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
new file mode 100644
index 0000000..320b48b
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
@@ -0,0 +1,121 @@
+package edu.uci.ics.hyracks.hdfs.scheduler;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputSplit;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.hdfs.api.INcCollection;
+import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
+
+@SuppressWarnings("deprecation")
+public class IPProximityNcCollectionBuilder implements INcCollectionBuilder {
+
+ @Override
+ public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
+ final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
+ final int[] workloads, final int slotLimit) {
+ final TreeMap<BytesWritable, IntWritable> availableIpsToSlots = new TreeMap<BytesWritable, IntWritable>();
+ for (int i = 0; i < workloads.length; i++) {
+ if (workloads[i] < slotLimit) {
+ BytesWritable ip = new BytesWritable(ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress());
+ IntWritable availableSlot = availableIpsToSlots.get(ip);
+ if (availableSlot == null) {
+ availableSlot = new IntWritable(slotLimit - workloads[i]);
+ availableIpsToSlots.put(ip, availableSlot);
+ } else {
+ availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
+ }
+ }
+ }
+ return new INcCollection() {
+
+ @Override
+ public String findNearestAvailableSlot(InputSplit split) {
+ try {
+ String[] locs = split.getLocations();
+ int minDistance = Integer.MAX_VALUE;
+ BytesWritable currentCandidateIp = null;
+ if (locs == null || locs.length > 0) {
+ for (int j = 0; j < locs.length; j++) {
+ /**
+ * get all the IP addresses from the name
+ */
+ InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
+ for (InetAddress ip : allIps) {
+ BytesWritable splitIp = new BytesWritable(ip.getAddress());
+ /**
+ * if the node controller exists
+ */
+ BytesWritable candidateNcIp = availableIpsToSlots.floorKey(splitIp);
+ if (candidateNcIp == null) {
+ candidateNcIp = availableIpsToSlots.ceilingKey(splitIp);
+ }
+ if (candidateNcIp != null) {
+ if (availableIpsToSlots.get(candidateNcIp).get() > 0) {
+ byte[] candidateIP = candidateNcIp.getBytes();
+ byte[] splitIP = splitIp.getBytes();
+ int candidateInt = candidateIP[0] << 24 | (candidateIP[1] & 0xFF) << 16
+ | (candidateIP[2] & 0xFF) << 8 | (candidateIP[3] & 0xFF);
+ int splitInt = splitIP[0] << 24 | (splitIP[1] & 0xFF) << 16
+ | (splitIP[2] & 0xFF) << 8 | (splitIP[3] & 0xFF);
+ int distance = Math.abs(candidateInt - splitInt);
+ if (minDistance > distance) {
+ minDistance = distance;
+ currentCandidateIp = candidateNcIp;
+ }
+ }
+ }
+ }
+ }
+ } else {
+ for (Entry<BytesWritable, IntWritable> entry : availableIpsToSlots.entrySet()) {
+ if (entry.getValue().get() > 0) {
+ currentCandidateIp = entry.getKey();
+ break;
+ }
+ }
+ }
+
+ if (currentCandidateIp != null) {
+ /**
+ * Update the entry of the selected IP
+ */
+ IntWritable availableSlot = availableIpsToSlots.get(currentCandidateIp);
+ availableSlot.set(availableSlot.get() - 1);
+ if (availableSlot.get() == 0) {
+ availableIpsToSlots.remove(currentCandidateIp);
+ }
+ /**
+ * Update the entry of the selected NC
+ */
+ List<String> dataLocations = ipToNcMapping.get(InetAddress.getByAddress(
+ currentCandidateIp.getBytes()).getHostAddress());
+ for (String nc : dataLocations) {
+ int ncIndex = ncNameToIndex.get(nc);
+ if (workloads[ncIndex] < slotLimit) {
+ return nc;
+ }
+ }
+ }
+ /** not scheduled */
+ return null;
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public int numAvailableSlots() {
+ return availableIpsToSlots.size();
+ }
+
+ };
+ }
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
index aaebc09..a6320e7 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
@@ -24,13 +24,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.PriorityQueue;
import java.util.Random;
-import java.util.TreeMap;
import java.util.logging.Logger;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.InputSplit;
@@ -39,6 +36,9 @@
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+import edu.uci.ics.hyracks.hdfs.api.INcCollection;
+import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
/**
* The scheduler conduct data-local scheduling for data reading on HDFS. This
@@ -46,7 +46,6 @@
*/
@SuppressWarnings("deprecation")
public class Scheduler {
-
private static final Logger LOGGER = Logger.getLogger(Scheduler.class.getName());
/** a list of NCs */
@@ -58,18 +57,38 @@
/** a map from the NC name to the index */
private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
+ /** a map from NC name to the NodeControllerInfo */
private Map<String, NodeControllerInfo> ncNameToNcInfos;
/**
+ * the nc collection builder
+ */
+ private INcCollectionBuilder ncCollectionBuilder;
+
+ private ClusterTopology topology;
+
+ /**
* The constructor of the scheduler.
*
* @param ncNameToNcInfos
* @throws HyracksException
*/
public Scheduler(String ipAddress, int port) throws HyracksException {
+ this(ipAddress, port, new IPProximityNcCollectionBuilder());
+ }
+
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * @throws HyracksException
+ */
+ public Scheduler(String ipAddress, int port, INcCollectionBuilder ncCollectionBuilder) throws HyracksException {
try {
IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
this.ncNameToNcInfos = hcc.getNodeControllerInfos();
+ this.topology = hcc.getClusterTopology();
+ this.ncCollectionBuilder = ncCollectionBuilder;
loadIPAddressToNCMap(ncNameToNcInfos);
} catch (Exception e) {
throw new HyracksException(e);
@@ -81,10 +100,40 @@
*
* @param ncNameToNcInfos
* the mapping from nc names to nc infos
+ * @param topology
+ * the hyracks cluster toplogy
+ * @throws HyracksException
+ */
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder ncCollectionBuilder,
+ ClusterTopology topology) throws HyracksException {
+ this(ncNameToNcInfos, ncCollectionBuilder);
+ this.topology = topology;
+ }
+
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * the mapping from nc names to nc infos
* @throws HyracksException
*/
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
this.ncNameToNcInfos = ncNameToNcInfos;
+ this.ncCollectionBuilder = new IPProximityNcCollectionBuilder();
+ loadIPAddressToNCMap(ncNameToNcInfos);
+ }
+
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * the mapping from nc names to nc infos
+ * @throws HyracksException
+ */
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder ncCollectionBuilder)
+ throws HyracksException {
+ this.ncNameToNcInfos = ncNameToNcInfos;
+ this.ncCollectionBuilder = ncCollectionBuilder;
loadIPAddressToNCMap(ncNameToNcInfos);
}
@@ -158,106 +207,34 @@
* The current capacity of each machine.
* @param locations
* The result schedule.
- * @param slots
+ * @param slotLimit
* The maximum slots of each machine.
* @param scheduled
* Indicate which slot is scheduled.
*/
- private void scheduleNonLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots,
+ private void scheduleNonLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slotLimit,
boolean[] scheduled) throws IOException, UnknownHostException {
/**
* build the map from available ips to the number of available slots
*/
- TreeMap<BytesWritable, IntWritable> availableIpsToSlots = new TreeMap<BytesWritable, IntWritable>();
- for (int i = 0; i < workloads.length; i++) {
- if (workloads[i] < slots) {
- BytesWritable ip = new BytesWritable(ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress());
- IntWritable availableSlot = availableIpsToSlots.get(ip);
- if (availableSlot == null) {
- availableSlot = new IntWritable(slots - workloads[i]);
- availableIpsToSlots.put(ip, availableSlot);
- } else {
- availableSlot.set(slots - workloads[i] + availableSlot.get());
- }
- }
- }
- if (availableIpsToSlots.size() == 0) {
+ INcCollection ncCollection = this.ncCollectionBuilder.build(ncNameToNcInfos, ipToNcMapping, ncNameToIndex, NCs,
+ workloads, slotLimit);
+ if (ncCollection.numAvailableSlots() == 0) {
return;
}
/**
* schedule no-local file reads
*/
for (int i = 0; i < splits.length; i++) {
- // if there is no data-local NC choice, choose a random one
+ /** if there is no data-local NC choice, choose a random one */
if (!scheduled[i]) {
InputSplit split = splits[i];
- String[] locs = split.getLocations();
- int minDistance = Integer.MAX_VALUE;
- BytesWritable currentCandidateIp = null;
- if (locs == null || locs.length > 0) {
- for (int j = 0; j < locs.length; j++) {
- /**
- * get all the IP addresses from the name
- */
- InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
- for (InetAddress ip : allIps) {
- BytesWritable splitIp = new BytesWritable(ip.getAddress());
- /**
- * if the node controller exists
- */
- BytesWritable candidateNcIp = availableIpsToSlots.floorKey(splitIp);
- if (candidateNcIp == null) {
- candidateNcIp = availableIpsToSlots.ceilingKey(splitIp);
- }
- if (candidateNcIp != null) {
- if (availableIpsToSlots.get(candidateNcIp).get() > 0) {
- byte[] candidateIP = candidateNcIp.getBytes();
- byte[] splitIP = splitIp.getBytes();
- int candidateInt = candidateIP[0] << 24 | (candidateIP[1] & 0xFF) << 16
- | (candidateIP[2] & 0xFF) << 8 | (candidateIP[3] & 0xFF);
- int splitInt = splitIP[0] << 24 | (splitIP[1] & 0xFF) << 16
- | (splitIP[2] & 0xFF) << 8 | (splitIP[3] & 0xFF);
- int distance = Math.abs(candidateInt - splitInt);
- if (minDistance > distance) {
- minDistance = distance;
- currentCandidateIp = candidateNcIp;
- }
- }
- }
- }
- }
- } else {
- for (Entry<BytesWritable, IntWritable> entry : availableIpsToSlots.entrySet()) {
- if (entry.getValue().get() > 0) {
- currentCandidateIp = entry.getKey();
- break;
- }
- }
- }
-
- if (currentCandidateIp != null) {
- /**
- * Update the entry of the selected IP
- */
- IntWritable availableSlot = availableIpsToSlots.get(currentCandidateIp);
- availableSlot.set(availableSlot.get() - 1);
- if (availableSlot.get() == 0) {
- availableIpsToSlots.remove(currentCandidateIp);
- }
- /**
- * Update the entry of the selected NC
- */
- List<String> dataLocations = ipToNcMapping.get(InetAddress.getByAddress(
- currentCandidateIp.getBytes()).getHostAddress());
- for (String nc : dataLocations) {
- int ncIndex = ncNameToIndex.get(nc);
- if (workloads[ncIndex] < slots) {
- locations[i] = nc;
- workloads[ncIndex]++;
- scheduled[i] = true;
- break;
- }
- }
+ String selectedNcName = ncCollection.findNearestAvailableSlot(split);
+ if (selectedNcName != null) {
+ int ncIndex = ncNameToIndex.get(selectedNcName);
+ workloads[ncIndex]++;
+ scheduled[i] = true;
+ locations[i] = selectedNcName;
}
}
}