considering IP proximity in scheduling when there is no topology information

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3148 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
index e357100..aaebc09 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
@@ -24,9 +24,13 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.PriorityQueue;
 import java.util.Random;
+import java.util.TreeMap;
+import java.util.logging.Logger;
 
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.InputSplit;
 
@@ -43,6 +47,8 @@
 @SuppressWarnings("deprecation")
 public class Scheduler {
 
+    private static final Logger LOGGER = Logger.getLogger(Scheduler.class.getName());
+
     /** a list of NCs */
     private String[] NCs;
 
@@ -52,6 +58,8 @@
     /** a map from the NC name to the index */
     private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
 
+    private Map<String, NodeControllerInfo> ncNameToNcInfos;
+
     /**
      * The constructor of the scheduler.
      * 
@@ -61,7 +69,7 @@
     public Scheduler(String ipAddress, int port) throws HyracksException {
         try {
             IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
-            Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
+            this.ncNameToNcInfos = hcc.getNodeControllerInfos();
             loadIPAddressToNCMap(ncNameToNcInfos);
         } catch (Exception e) {
             throw new HyracksException(e);
@@ -76,6 +84,7 @@
      * @throws HyracksException
      */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
+        this.ncNameToNcInfos = ncNameToNcInfos;
         loadIPAddressToNCMap(ncNameToNcInfos);
     }
 
@@ -87,48 +96,53 @@
      * @throws HyracksDataException
      */
     public String[] getLocationConstraints(InputSplit[] splits) throws HyracksException {
-        int[] capacity = new int[NCs.length];
-        Arrays.fill(capacity, 0);
+        int[] workloads = new int[NCs.length];
+        Arrays.fill(workloads, 0);
         String[] locations = new String[splits.length];
         Map<String, IntWritable> locationToNumOfSplits = new HashMap<String, IntWritable>();
         /**
          * upper bound number of slots that a machine can get
          */
-        int upperBoundSlots = splits.length % capacity.length == 0 ? (splits.length / capacity.length) : (splits.length
-                / capacity.length + 1);
+        int upperBoundSlots = splits.length % workloads.length == 0 ? (splits.length / workloads.length)
+                : (splits.length / workloads.length + 1);
         /**
          * lower bound number of slots that a machine can get
          */
-        int lowerBoundSlots = splits.length % capacity.length == 0 ? upperBoundSlots : upperBoundSlots - 1;
+        int lowerBoundSlots = splits.length % workloads.length == 0 ? upperBoundSlots : upperBoundSlots - 1;
 
         try {
             Random random = new Random(System.currentTimeMillis());
             boolean scheduled[] = new boolean[splits.length];
             Arrays.fill(scheduled, false);
-
             /**
              * scan the splits and build the popularity map
              * give the machines with less local splits more scheduling priority
              */
             buildPopularityMap(splits, locationToNumOfSplits);
-
             /**
              * push data-local lower-bounds slots to each machine
              */
-            scheduleLocalSlots(splits, capacity, locations, lowerBoundSlots, random, scheduled, locationToNumOfSplits);
+            scheduleLocalSlots(splits, workloads, locations, lowerBoundSlots, random, scheduled, locationToNumOfSplits);
             /**
              * push data-local upper-bounds slots to each machine
              */
-            scheduleLocalSlots(splits, capacity, locations, upperBoundSlots, random, scheduled, locationToNumOfSplits);
+            scheduleLocalSlots(splits, workloads, locations, upperBoundSlots, random, scheduled, locationToNumOfSplits);
 
+            int dataLocalCount = 0;
+            for (int i = 0; i < scheduled.length; i++) {
+                if (scheduled[i] == true) {
+                    dataLocalCount++;
+                }
+            }
+            LOGGER.info("Data local rate: " + ((float) dataLocalCount / (float) (scheduled.length)));
             /**
              * push non-data-local lower-bounds slots to each machine
              */
-            scheduleNoLocalSlots(splits, capacity, locations, lowerBoundSlots, scheduled);
+            scheduleNonLocalSlots(splits, workloads, locations, lowerBoundSlots, scheduled);
             /**
              * push non-data-local upper-bounds slots to each machine
              */
-            scheduleNoLocalSlots(splits, capacity, locations, upperBoundSlots, scheduled);
+            scheduleNonLocalSlots(splits, workloads, locations, upperBoundSlots, scheduled);
             return locations;
         } catch (IOException e) {
             throw new HyracksException(e);
@@ -140,7 +154,7 @@
      * 
      * @param splits
      *            The HDFS file splits.
-     * @param capacity
+     * @param workloads
      *            The current capacity of each machine.
      * @param locations
      *            The result schedule.
@@ -149,36 +163,100 @@
      * @param scheduled
      *            Indicate which slot is scheduled.
      */
-    private void scheduleNoLocalSlots(InputSplit[] splits, int[] capacity, String[] locations, int slots,
-            boolean[] scheduled) {
+    private void scheduleNonLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots,
+            boolean[] scheduled) throws IOException, UnknownHostException {
         /**
-         * find the lowest index the current available NCs
+         * build the map from available ips to the number of available slots
          */
-        int currentAvailableNC = 0;
-        for (int i = 0; i < capacity.length; i++) {
-            if (capacity[i] < slots) {
-                currentAvailableNC = i;
-                break;
+        TreeMap<BytesWritable, IntWritable> availableIpsToSlots = new TreeMap<BytesWritable, IntWritable>();
+        for (int i = 0; i < workloads.length; i++) {
+            if (workloads[i] < slots) {
+                BytesWritable ip = new BytesWritable(ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress());
+                IntWritable availableSlot = availableIpsToSlots.get(ip);
+                if (availableSlot == null) {
+                    availableSlot = new IntWritable(slots - workloads[i]);
+                    availableIpsToSlots.put(ip, availableSlot);
+                } else {
+                    availableSlot.set(slots - workloads[i] + availableSlot.get());
+                }
             }
         }
-
+        if (availableIpsToSlots.size() == 0) {
+            return;
+        }
         /**
          * schedule no-local file reads
          */
         for (int i = 0; i < splits.length; i++) {
             // if there is no data-local NC choice, choose a random one
             if (!scheduled[i]) {
-                locations[i] = NCs[currentAvailableNC];
-                capacity[currentAvailableNC]++;
-                scheduled[i] = true;
+                InputSplit split = splits[i];
+                String[] locs = split.getLocations();
+                int minDistance = Integer.MAX_VALUE;
+                BytesWritable currentCandidateIp = null;
+                if (locs == null || locs.length > 0) {
+                    for (int j = 0; j < locs.length; j++) {
+                        /**
+                         * get all the IP addresses from the name
+                         */
+                        InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
+                        for (InetAddress ip : allIps) {
+                            BytesWritable splitIp = new BytesWritable(ip.getAddress());
+                            /**
+                             * if the node controller exists
+                             */
+                            BytesWritable candidateNcIp = availableIpsToSlots.floorKey(splitIp);
+                            if (candidateNcIp == null) {
+                                candidateNcIp = availableIpsToSlots.ceilingKey(splitIp);
+                            }
+                            if (candidateNcIp != null) {
+                                if (availableIpsToSlots.get(candidateNcIp).get() > 0) {
+                                    byte[] candidateIP = candidateNcIp.getBytes();
+                                    byte[] splitIP = splitIp.getBytes();
+                                    int candidateInt = candidateIP[0] << 24 | (candidateIP[1] & 0xFF) << 16
+                                            | (candidateIP[2] & 0xFF) << 8 | (candidateIP[3] & 0xFF);
+                                    int splitInt = splitIP[0] << 24 | (splitIP[1] & 0xFF) << 16
+                                            | (splitIP[2] & 0xFF) << 8 | (splitIP[3] & 0xFF);
+                                    int distance = Math.abs(candidateInt - splitInt);
+                                    if (minDistance > distance) {
+                                        minDistance = distance;
+                                        currentCandidateIp = candidateNcIp;
+                                    }
+                                }
+                            }
+                        }
+                    }
+                } else {
+                    for (Entry<BytesWritable, IntWritable> entry : availableIpsToSlots.entrySet()) {
+                        if (entry.getValue().get() > 0) {
+                            currentCandidateIp = entry.getKey();
+                            break;
+                        }
+                    }
+                }
 
-                /**
-                 * move the available NC cursor to the next one
-                 */
-                for (int j = currentAvailableNC; j < capacity.length; j++) {
-                    if (capacity[j] < slots) {
-                        currentAvailableNC = j;
-                        break;
+                if (currentCandidateIp != null) {
+                    /**
+                     * Update the entry of the selected IP
+                     */
+                    IntWritable availableSlot = availableIpsToSlots.get(currentCandidateIp);
+                    availableSlot.set(availableSlot.get() - 1);
+                    if (availableSlot.get() == 0) {
+                        availableIpsToSlots.remove(currentCandidateIp);
+                    }
+                    /**
+                     * Update the entry of the selected NC
+                     */
+                    List<String> dataLocations = ipToNcMapping.get(InetAddress.getByAddress(
+                            currentCandidateIp.getBytes()).getHostAddress());
+                    for (String nc : dataLocations) {
+                        int ncIndex = ncNameToIndex.get(nc);
+                        if (workloads[ncIndex] < slots) {
+                            locations[i] = nc;
+                            workloads[ncIndex]++;
+                            scheduled[i] = true;
+                            break;
+                        }
                     }
                 }
             }
@@ -190,7 +268,7 @@
      * 
      * @param splits
      *            The HDFS file splits.
-     * @param capacity
+     * @param workloads
      *            The current capacity of each machine.
      * @param locations
      *            The result schedule.
@@ -203,7 +281,7 @@
      * @throws IOException
      * @throws UnknownHostException
      */
-    private void scheduleLocalSlots(InputSplit[] splits, int[] capacity, String[] locations, int slots, Random random,
+    private void scheduleLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots, Random random,
             boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits) throws IOException,
             UnknownHostException {
         /** scheduling candidates will be ordered inversely according to their popularity */
@@ -216,6 +294,9 @@
 
         });
         for (int i = 0; i < splits.length; i++) {
+            if (scheduled[i]) {
+                continue;
+            }
             /**
              * get the location of all the splits
              */
@@ -249,10 +330,11 @@
                             /**
                              * check if the node is already full
                              */
-                            if (capacity[pos] < slots) {
+                            if (workloads[pos] < slots) {
                                 locations[i] = nc;
-                                capacity[pos]++;
+                                workloads[pos]++;
                                 scheduled[i] = true;
+                                break;
                             }
                         }
                     }
@@ -301,6 +383,8 @@
     private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
         try {
             NCs = new String[ncNameToNcInfos.size()];
+            ipToNcMapping.clear();
+            ncNameToIndex.clear();
             int i = 0;
 
             /**
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
index e42e8a6..f392293 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -96,11 +96,11 @@
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
                 .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
                 .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
+        ncNameToNcInfos.put("nc7", new NodeControllerInfo("nc7", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.7").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
                 .getAddress(), 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
+        ncNameToNcInfos.put("nc12", new NodeControllerInfo("nc12", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.12").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
                 .getAddress(), 5098)));
 
         InputSplit[] fileSplits = new InputSplit[12];
@@ -112,17 +112,16 @@
         fileSplits[5] = new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
         fileSplits[6] = new FileSplit(new Path("part-7"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
         fileSplits[7] = new FileSplit(new Path("part-8"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
-        fileSplits[8] = new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" });
+        fileSplits[8] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.14", "10.0.0.11", "10.0.0.13" });
         fileSplits[9] = new FileSplit(new Path("part-10"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.6" });
         fileSplits[10] = new FileSplit(new Path("part-11"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.7" });
-        fileSplits[11] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
+        fileSplits[11] = new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" });
 
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
 
-        String[] expectedResults = new String[] { "nc1", "nc4", "nc6", "nc1", "nc4", "nc2", "nc2", "nc3", "nc6", "nc5",
-                "nc3", "nc5" };
-
+        String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc12",
+                "nc7", "nc7", "nc12" };
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);
         }
@@ -223,8 +222,8 @@
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
 
-        String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc6",
-                "nc5", "nc3", "nc5" };
+        String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc1",
+                "nc5", "nc2", "nc4" };
 
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
index b5d9d1a..442aeae 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -227,8 +227,8 @@
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
 
-        String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc6",
-                "nc5", "nc3", "nc5" };
+        String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc1",
+                "nc5", "nc2", "nc4" };
 
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);