refactor hyracks-hdfs to unify the code path of new/old APIs and generate more balanced schedules
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_release_cleanup@3088 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
index e7309d4..3f287cf 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -49,7 +50,7 @@
private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
/**
- * The constructor of the scheduler
+ * The constructor of the scheduler.
*
* @param ncNameToNcInfos
* @throws HyracksException
@@ -64,12 +65,20 @@
}
}
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos the mapping from nc names to nc infos
+ * @throws HyracksException
+ */
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
loadIPAddressToNCMap(ncNameToNcInfos);
}
/**
- * Set location constraints for a file scan operator with a list of file splits
+ * Set location constraints for a file scan operator with a list of file splits.
+ * It guarantees the maximum slots a machine can is at most one more than the minimum slots a
+ * machine can get.
*
* @throws HyracksDataException
*/
@@ -77,93 +86,38 @@
int[] capacity = new int[NCs.length];
Arrays.fill(capacity, 0);
String[] locations = new String[splits.length];
- int slots = splits.length % capacity.length == 0 ? (splits.length / capacity.length) : (splits.length
+ /**
+ * upper bound number of slots that a machine can get
+ */
+ int upperBoundSlots = splits.length % capacity.length == 0 ? (splits.length / capacity.length) : (splits.length
/ capacity.length + 1);
+ /**
+ * lower bound number of slots that a machine can get
+ */
+ int lowerBoundSlots = splits.length % capacity.length == 0 ? upperBoundSlots : upperBoundSlots - 1;
try {
Random random = new Random(System.currentTimeMillis());
boolean scheduled[] = new boolean[splits.length];
Arrays.fill(scheduled, false);
- for (int i = 0; i < splits.length; i++) {
- /**
- * get the location of all the splits
- */
- String[] loc = splits[i].getLocations();
- if (loc.length > 0) {
- for (int j = 0; j < loc.length; j++) {
- /**
- * get all the IP addresses from the name
- */
- InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
- /**
- * iterate overa all ips
- */
- for (InetAddress ip : allIps) {
- /**
- * if the node controller exists
- */
- if (ipToNcMapping.get(ip.getHostAddress()) != null) {
- /**
- * set the ncs
- */
- List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
- int arrayPos = random.nextInt(dataLocations.size());
- String nc = dataLocations.get(arrayPos);
- int pos = ncNameToIndex.get(nc);
- /**
- * check if the node is already full
- */
- if (capacity[pos] < slots) {
- locations[i] = nc;
- capacity[pos]++;
- scheduled[i] = true;
- }
- }
- }
-
- /**
- * break the loop for data-locations if the schedule has already been found
- */
- if (scheduled[i] == true) {
- break;
- }
- }
- }
- }
+ /**
+ * push data-local lower-bounds slots to each machine
+ */
+ scheduleLocalSlots(splits, capacity, locations, lowerBoundSlots, random, scheduled);
+ /**
+ * push data-local upper-bounds slots to each machine
+ */
+ scheduleLocalSlots(splits, capacity, locations, upperBoundSlots, random, scheduled);
/**
- * find the lowest index the current available NCs
+ * push non-data-local lower-bounds slots to each machine
*/
- int currentAvailableNC = 0;
- for (int i = 0; i < capacity.length; i++) {
- if (capacity[i] < slots) {
- currentAvailableNC = i;
- break;
- }
- }
-
+ scheduleNoLocalSlots(splits, capacity, locations, lowerBoundSlots, scheduled);
/**
- * schedule no-local file reads
+ * push non-data-local upper-bounds slots to each machine
*/
- for (int i = 0; i < splits.length; i++) {
- // if there is no data-local NC choice, choose a random one
- if (!scheduled[i]) {
- locations[i] = NCs[currentAvailableNC];
- capacity[currentAvailableNC]++;
- scheduled[i] = true;
-
- /**
- * move the available NC cursor to the next one
- */
- for (int j = currentAvailableNC; j < capacity.length; j++) {
- if (capacity[j] < slots) {
- currentAvailableNC = j;
- break;
- }
- }
- }
- }
+ scheduleNoLocalSlots(splits, capacity, locations, upperBoundSlots, scheduled);
return locations;
} catch (IOException e) {
throw new HyracksException(e);
@@ -171,6 +125,124 @@
}
/**
+ * Schedule non-local slots to each machine
+ *
+ * @param splits
+ * The HDFS file splits.
+ * @param capacity
+ * The current capacity of each machine.
+ * @param locations
+ * The result schedule.
+ * @param slots
+ * The maximum slots of each machine.
+ * @param scheduled
+ * Indicate which slot is scheduled.
+ */
+ private void scheduleNoLocalSlots(InputSplit[] splits, int[] capacity, String[] locations, int slots,
+ boolean[] scheduled) {
+ /**
+ * find the lowest index the current available NCs
+ */
+ int currentAvailableNC = 0;
+ for (int i = 0; i < capacity.length; i++) {
+ if (capacity[i] < slots) {
+ currentAvailableNC = i;
+ break;
+ }
+ }
+
+ /**
+ * schedule no-local file reads
+ */
+ for (int i = 0; i < splits.length; i++) {
+ // if there is no data-local NC choice, choose a random one
+ if (!scheduled[i]) {
+ locations[i] = NCs[currentAvailableNC];
+ capacity[currentAvailableNC]++;
+ scheduled[i] = true;
+
+ /**
+ * move the available NC cursor to the next one
+ */
+ for (int j = currentAvailableNC; j < capacity.length; j++) {
+ if (capacity[j] < slots) {
+ currentAvailableNC = j;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Schedule data-local slots to each machine.
+ *
+ * @param splits
+ * The HDFS file splits.
+ * @param capacity
+ * The current capacity of each machine.
+ * @param locations
+ * The result schedule.
+ * @param slots
+ * The maximum slots of each machine.
+ * @param random
+ * The random generator.
+ * @param scheduled
+ * Indicate which slot is scheduled.
+ * @throws IOException
+ * @throws UnknownHostException
+ */
+ private void scheduleLocalSlots(InputSplit[] splits, int[] capacity, String[] locations, int slots, Random random,
+ boolean[] scheduled) throws IOException, UnknownHostException {
+ for (int i = 0; i < splits.length; i++) {
+ /**
+ * get the location of all the splits
+ */
+ String[] loc = splits[i].getLocations();
+ if (loc.length > 0) {
+ for (int j = 0; j < loc.length; j++) {
+ /**
+ * get all the IP addresses from the name
+ */
+ InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
+ /**
+ * iterate overa all ips
+ */
+ for (InetAddress ip : allIps) {
+ /**
+ * if the node controller exists
+ */
+ if (ipToNcMapping.get(ip.getHostAddress()) != null) {
+ /**
+ * set the ncs
+ */
+ List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
+ int arrayPos = random.nextInt(dataLocations.size());
+ String nc = dataLocations.get(arrayPos);
+ int pos = ncNameToIndex.get(nc);
+ /**
+ * check if the node is already full
+ */
+ if (capacity[pos] < slots) {
+ locations[i] = nc;
+ capacity[pos]++;
+ scheduled[i] = true;
+ }
+ }
+ }
+
+ /**
+ * break the loop for data-locations if the schedule has already been found
+ */
+ if (scheduled[i] == true) {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /**
* Load the IP-address-to-NC map from the NCNameToNCInfoMap
*
* @param ncNameToNcInfos
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
index 3445d68..cb97ca1 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
@@ -15,18 +15,11 @@
package edu.uci.ics.hyracks.hdfs2.scheduler;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import org.apache.hadoop.mapreduce.InputSplit;
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -35,16 +28,10 @@
* The scheduler conduct data-local scheduling for data reading on HDFS.
* This class works for Hadoop new API.
*/
+@SuppressWarnings("deprecation")
public class Scheduler {
- /** a list of NCs */
- private String[] NCs;
-
- /** a map from ip to NCs */
- private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
-
- /** a map from the NC name to the index */
- private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
+ private edu.uci.ics.hyracks.hdfs.scheduler.Scheduler scheduler;
/**
* The constructor of the scheduler
@@ -53,17 +40,18 @@
* @throws HyracksException
*/
public Scheduler(String ipAddress, int port) throws HyracksException {
- try {
- IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
- Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
- loadIPAddressToNCMap(ncNameToNcInfos);
- } catch (Exception e) {
- throw new HyracksException(e);
- }
+ scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ipAddress, port);
}
+ /**
+ * The constructor of the scheduler.
+ *
+ * @param ncNameToNcInfos
+ * the mapping from nc names to nc infos
+ * @throws HyracksException
+ */
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
- loadIPAddressToNCMap(ncNameToNcInfos);
+ scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos);
}
/**
@@ -72,135 +60,11 @@
* @throws HyracksDataException
*/
public String[] getLocationConstraints(List<InputSplit> splits) throws HyracksException {
- int[] capacity = new int[NCs.length];
- Arrays.fill(capacity, 0);
- String[] locations = new String[splits.size()];
- int slots = splits.size() % capacity.length == 0 ? (splits.size() / capacity.length) : (splits.size()
- / capacity.length + 1);
-
try {
- Random random = new Random(System.currentTimeMillis());
- boolean scheduled[] = new boolean[splits.size()];
- Arrays.fill(scheduled, false);
-
- for (int i = 0; i < splits.size(); i++) {
- /**
- * get the location of all the splits
- */
- String[] loc = splits.get(i).getLocations();
- if (loc.length > 0) {
- for (int j = 0; j < loc.length; j++) {
- /**
- * get all the IP addresses from the name
- */
- InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
- /**
- * iterate overa all ips
- */
- for (InetAddress ip : allIps) {
- /**
- * if the node controller exists
- */
- if (ipToNcMapping.get(ip.getHostAddress()) != null) {
- /**
- * set the ncs
- */
- List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
- int arrayPos = random.nextInt(dataLocations.size());
- String nc = dataLocations.get(arrayPos);
- int pos = ncNameToIndex.get(nc);
- /**
- * check if the node is already full
- */
- if (capacity[pos] < slots) {
- locations[i] = nc;
- capacity[pos]++;
- scheduled[i] = true;
- }
- }
- }
-
- /**
- * break the loop for data-locations if the schedule has already been found
- */
- if (scheduled[i] == true) {
- break;
- }
- }
- }
- }
-
- /**
- * find the lowest index the current available NCs
- */
- int currentAvailableNC = 0;
- for (int i = 0; i < capacity.length; i++) {
- if (capacity[i] < slots) {
- currentAvailableNC = i;
- break;
- }
- }
-
- /**
- * schedule no-local file reads
- */
- for (int i = 0; i < splits.size(); i++) {
- // if there is no data-local NC choice, choose a random one
- if (!scheduled[i]) {
- locations[i] = NCs[currentAvailableNC];
- capacity[currentAvailableNC]++;
- scheduled[i] = true;
-
- /**
- * move the available NC cursor to the next one
- */
- for (int j = currentAvailableNC; j < capacity.length; j++) {
- if (capacity[j] < slots) {
- currentAvailableNC = j;
- break;
- }
- }
- }
- }
- return locations;
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
-
- /**
- * Load the IP-address-to-NC map from the NCNameToNCInfoMap
- *
- * @param ncNameToNcInfos
- * @throws HyracksException
- */
- private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
- try {
- NCs = new String[ncNameToNcInfos.size()];
- int i = 0;
-
- /**
- * build the IP address to NC map
- */
- for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
- String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
- .getHostAddress();
- List<String> matchedNCs = ipToNcMapping.get(ipAddr);
- if (matchedNCs == null) {
- matchedNCs = new ArrayList<String>();
- ipToNcMapping.put(ipAddr, matchedNCs);
- }
- matchedNCs.add(entry.getKey());
- NCs[i] = entry.getKey();
- i++;
- }
-
- /**
- * set up the NC name to index mapping
- */
- for (i = 0; i < NCs.length; i++) {
- ncNameToIndex.put(NCs[i], i);
- }
+ org.apache.hadoop.mapred.InputSplit[] inputSplits = new org.apache.hadoop.mapred.InputSplit[splits.size()];
+ for (int i = 0; i < inputSplits.length; i++)
+ inputSplits[i] = new WrappedFileSplit(splits.get(i).getLocations(), splits.get(i).getLength());
+ return scheduler.getLocationConstraints(inputSplits);
} catch (Exception e) {
throw new HyracksException(e);
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/WrappedFileSplit.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/WrappedFileSplit.java
new file mode 100644
index 0000000..1deb469
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/WrappedFileSplit.java
@@ -0,0 +1,51 @@
+package edu.uci.ics.hyracks.hdfs2.scheduler;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+/**
+ * The wrapped implementation of InputSplit, for the new API scheduler
+ * to reuse the old API scheduler
+ */
+@SuppressWarnings("deprecation")
+public class WrappedFileSplit implements InputSplit {
+
+ private String[] locations;
+ private long length;
+
+ public WrappedFileSplit(String[] locations, long length) {
+ this.locations = locations;
+ this.length = length;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ int len = input.readInt();
+ locations = new String[len];
+ for (int i = 0; i < len; i++)
+ locations[i] = input.readUTF();
+ length = input.readLong();
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.write(locations.length);
+ for (int i = 0; i < locations.length; i++)
+ output.writeUTF(locations[i]);
+ output.writeLong(length);
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return length;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return locations;
+ }
+
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
index 4b8a278..c6b0416 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -199,8 +199,8 @@
Scheduler scheduler = new Scheduler(ncNameToNcInfos);
String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
- String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc3", "nc4", "nc2",
- "nc4", "nc5", "nc5" };
+ String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
+ "nc5", "nc5", "nc6" };
for (int i = 0; i < locationConstraints.length; i++) {
Assert.assertEquals(locationConstraints[i], expectedResults[i]);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
index ea2af13..4b1e11c 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -34,7 +34,6 @@
/**
* Test case for the new HDFS API scheduler
- *
*/
public class SchedulerTest extends TestCase {
@@ -204,8 +203,8 @@
Scheduler scheduler = new Scheduler(ncNameToNcInfos);
String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
- String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc3", "nc4", "nc2",
- "nc4", "nc5", "nc5" };
+ String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
+ "nc5", "nc5", "nc6" };
for (int i = 0; i < locationConstraints.length; i++) {
Assert.assertEquals(locationConstraints[i], expectedResults[i]);