refactor hyracks-hdfs to unify the code path of new/old APIs and generate more balanced schedules

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_release_cleanup@3088 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
index e7309d4..3f287cf 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
@@ -17,6 +17,7 @@
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -49,7 +50,7 @@
     private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
 
     /**
-     * The constructor of the scheduler
+     * The constructor of the scheduler.
      * 
      * @param ncNameToNcInfos
      * @throws HyracksException
@@ -64,12 +65,20 @@
         }
     }
 
+    /**
+     * The constructor of the scheduler.
+     * 
+     * @param ncNameToNcInfos the mapping from nc names to nc infos
+     * @throws HyracksException
+     */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
         loadIPAddressToNCMap(ncNameToNcInfos);
     }
 
     /**
-     * Set location constraints for a file scan operator with a list of file splits
+     * Set location constraints for a file scan operator with a list of file splits.
+     * It guarantees the maximum slots a machine can is at most one more than the minimum slots a
+     * machine can get.
      * 
      * @throws HyracksDataException
      */
@@ -77,93 +86,38 @@
         int[] capacity = new int[NCs.length];
         Arrays.fill(capacity, 0);
         String[] locations = new String[splits.length];
-        int slots = splits.length % capacity.length == 0 ? (splits.length / capacity.length) : (splits.length
+        /**
+         * upper bound number of slots that a machine can get
+         */
+        int upperBoundSlots = splits.length % capacity.length == 0 ? (splits.length / capacity.length) : (splits.length
                 / capacity.length + 1);
+        /**
+         * lower bound number of slots that a machine can get
+         */
+        int lowerBoundSlots = splits.length % capacity.length == 0 ? upperBoundSlots : upperBoundSlots - 1;
 
         try {
             Random random = new Random(System.currentTimeMillis());
             boolean scheduled[] = new boolean[splits.length];
             Arrays.fill(scheduled, false);
 
-            for (int i = 0; i < splits.length; i++) {
-                /**
-                 * get the location of all the splits
-                 */
-                String[] loc = splits[i].getLocations();
-                if (loc.length > 0) {
-                    for (int j = 0; j < loc.length; j++) {
-                        /**
-                         * get all the IP addresses from the name
-                         */
-                        InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
-                        /**
-                         * iterate overa all ips
-                         */
-                        for (InetAddress ip : allIps) {
-                            /**
-                             * if the node controller exists
-                             */
-                            if (ipToNcMapping.get(ip.getHostAddress()) != null) {
-                                /**
-                                 * set the ncs
-                                 */
-                                List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
-                                int arrayPos = random.nextInt(dataLocations.size());
-                                String nc = dataLocations.get(arrayPos);
-                                int pos = ncNameToIndex.get(nc);
-                                /**
-                                 * check if the node is already full
-                                 */
-                                if (capacity[pos] < slots) {
-                                    locations[i] = nc;
-                                    capacity[pos]++;
-                                    scheduled[i] = true;
-                                }
-                            }
-                        }
-
-                        /**
-                         * break the loop for data-locations if the schedule has already been found
-                         */
-                        if (scheduled[i] == true) {
-                            break;
-                        }
-                    }
-                }
-            }
+            /**
+             * push data-local lower-bounds slots to each machine
+             */
+            scheduleLocalSlots(splits, capacity, locations, lowerBoundSlots, random, scheduled);
+            /**
+             * push data-local upper-bounds slots to each machine
+             */
+            scheduleLocalSlots(splits, capacity, locations, upperBoundSlots, random, scheduled);
 
             /**
-             * find the lowest index the current available NCs
+             * push non-data-local lower-bounds slots to each machine
              */
-            int currentAvailableNC = 0;
-            for (int i = 0; i < capacity.length; i++) {
-                if (capacity[i] < slots) {
-                    currentAvailableNC = i;
-                    break;
-                }
-            }
-
+            scheduleNoLocalSlots(splits, capacity, locations, lowerBoundSlots, scheduled);
             /**
-             * schedule no-local file reads
+             * push non-data-local upper-bounds slots to each machine
              */
-            for (int i = 0; i < splits.length; i++) {
-                // if there is no data-local NC choice, choose a random one
-                if (!scheduled[i]) {
-                    locations[i] = NCs[currentAvailableNC];
-                    capacity[currentAvailableNC]++;
-                    scheduled[i] = true;
-
-                    /**
-                     * move the available NC cursor to the next one
-                     */
-                    for (int j = currentAvailableNC; j < capacity.length; j++) {
-                        if (capacity[j] < slots) {
-                            currentAvailableNC = j;
-                            break;
-                        }
-                    }
-                }
-            }
+            scheduleNoLocalSlots(splits, capacity, locations, upperBoundSlots, scheduled);
             return locations;
         } catch (IOException e) {
             throw new HyracksException(e);
@@ -171,6 +125,124 @@
     }
 
     /**
+     * Schedule non-local slots to each machine
+     * 
+     * @param splits
+     *            The HDFS file splits.
+     * @param capacity
+     *            The current capacity of each machine.
+     * @param locations
+     *            The result schedule.
+     * @param slots
+     *            The maximum slots of each machine.
+     * @param scheduled
+     *            Indicate which slot is scheduled.
+     */
+    private void scheduleNoLocalSlots(InputSplit[] splits, int[] capacity, String[] locations, int slots,
+            boolean[] scheduled) {
+        /**
+         * find the lowest index the current available NCs
+         */
+        int currentAvailableNC = 0;
+        for (int i = 0; i < capacity.length; i++) {
+            if (capacity[i] < slots) {
+                currentAvailableNC = i;
+                break;
+            }
+        }
+
+        /**
+         * schedule no-local file reads
+         */
+        for (int i = 0; i < splits.length; i++) {
+            // if there is no data-local NC choice, choose a random one
+            if (!scheduled[i]) {
+                locations[i] = NCs[currentAvailableNC];
+                capacity[currentAvailableNC]++;
+                scheduled[i] = true;
+
+                /**
+                 * move the available NC cursor to the next one
+                 */
+                for (int j = currentAvailableNC; j < capacity.length; j++) {
+                    if (capacity[j] < slots) {
+                        currentAvailableNC = j;
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Schedule data-local slots to each machine.
+     * 
+     * @param splits
+     *            The HDFS file splits.
+     * @param capacity
+     *            The current capacity of each machine.
+     * @param locations
+     *            The result schedule.
+     * @param slots
+     *            The maximum slots of each machine.
+     * @param random
+     *            The random generator.
+     * @param scheduled
+     *            Indicate which slot is scheduled.
+     * @throws IOException
+     * @throws UnknownHostException
+     */
+    private void scheduleLocalSlots(InputSplit[] splits, int[] capacity, String[] locations, int slots, Random random,
+            boolean[] scheduled) throws IOException, UnknownHostException {
+        for (int i = 0; i < splits.length; i++) {
+            /**
+             * get the location of all the splits
+             */
+            String[] loc = splits[i].getLocations();
+            if (loc.length > 0) {
+                for (int j = 0; j < loc.length; j++) {
+                    /**
+                     * get all the IP addresses from the name
+                     */
+                    InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
+                    /**
+                     * iterate overa all ips
+                     */
+                    for (InetAddress ip : allIps) {
+                        /**
+                         * if the node controller exists
+                         */
+                        if (ipToNcMapping.get(ip.getHostAddress()) != null) {
+                            /**
+                             * set the ncs
+                             */
+                            List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
+                            int arrayPos = random.nextInt(dataLocations.size());
+                            String nc = dataLocations.get(arrayPos);
+                            int pos = ncNameToIndex.get(nc);
+                            /**
+                             * check if the node is already full
+                             */
+                            if (capacity[pos] < slots) {
+                                locations[i] = nc;
+                                capacity[pos]++;
+                                scheduled[i] = true;
+                            }
+                        }
+                    }
+
+                    /**
+                     * break the loop for data-locations if the schedule has already been found
+                     */
+                    if (scheduled[i] == true) {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    /**
      * Load the IP-address-to-NC map from the NCNameToNCInfoMap
      * 
      * @param ncNameToNcInfos
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
index 3445d68..cb97ca1 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
@@ -15,18 +15,11 @@
 
 package edu.uci.ics.hyracks.hdfs2.scheduler;
 
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 
 import org.apache.hadoop.mapreduce.InputSplit;
 
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -35,16 +28,10 @@
  * The scheduler conduct data-local scheduling for data reading on HDFS.
  * This class works for Hadoop new API.
  */
+@SuppressWarnings("deprecation")
 public class Scheduler {
 
-    /** a list of NCs */
-    private String[] NCs;
-
-    /** a map from ip to NCs */
-    private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
-
-    /** a map from the NC name to the index */
-    private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
+    private edu.uci.ics.hyracks.hdfs.scheduler.Scheduler scheduler;
 
     /**
      * The constructor of the scheduler
@@ -53,17 +40,18 @@
      * @throws HyracksException
      */
     public Scheduler(String ipAddress, int port) throws HyracksException {
-        try {
-            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
-            Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
-            loadIPAddressToNCMap(ncNameToNcInfos);
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
+        scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ipAddress, port);
     }
 
+    /**
+     * The constructor of the scheduler.
+     * 
+     * @param ncNameToNcInfos
+     *            the mapping from nc names to nc infos
+     * @throws HyracksException
+     */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
-        loadIPAddressToNCMap(ncNameToNcInfos);
+        scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos);
     }
 
     /**
@@ -72,135 +60,11 @@
      * @throws HyracksDataException
      */
     public String[] getLocationConstraints(List<InputSplit> splits) throws HyracksException {
-        int[] capacity = new int[NCs.length];
-        Arrays.fill(capacity, 0);
-        String[] locations = new String[splits.size()];
-        int slots = splits.size() % capacity.length == 0 ? (splits.size() / capacity.length) : (splits.size()
-                / capacity.length + 1);
-
         try {
-            Random random = new Random(System.currentTimeMillis());
-            boolean scheduled[] = new boolean[splits.size()];
-            Arrays.fill(scheduled, false);
-
-            for (int i = 0; i < splits.size(); i++) {
-                /**
-                 * get the location of all the splits
-                 */
-                String[] loc = splits.get(i).getLocations();
-                if (loc.length > 0) {
-                    for (int j = 0; j < loc.length; j++) {
-                        /**
-                         * get all the IP addresses from the name
-                         */
-                        InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
-                        /**
-                         * iterate overa all ips
-                         */
-                        for (InetAddress ip : allIps) {
-                            /**
-                             * if the node controller exists
-                             */
-                            if (ipToNcMapping.get(ip.getHostAddress()) != null) {
-                                /**
-                                 * set the ncs
-                                 */
-                                List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
-                                int arrayPos = random.nextInt(dataLocations.size());
-                                String nc = dataLocations.get(arrayPos);
-                                int pos = ncNameToIndex.get(nc);
-                                /**
-                                 * check if the node is already full
-                                 */
-                                if (capacity[pos] < slots) {
-                                    locations[i] = nc;
-                                    capacity[pos]++;
-                                    scheduled[i] = true;
-                                }
-                            }
-                        }
-
-                        /**
-                         * break the loop for data-locations if the schedule has already been found
-                         */
-                        if (scheduled[i] == true) {
-                            break;
-                        }
-                    }
-                }
-            }
-
-            /**
-             * find the lowest index the current available NCs
-             */
-            int currentAvailableNC = 0;
-            for (int i = 0; i < capacity.length; i++) {
-                if (capacity[i] < slots) {
-                    currentAvailableNC = i;
-                    break;
-                }
-            }
-
-            /**
-             * schedule no-local file reads
-             */
-            for (int i = 0; i < splits.size(); i++) {
-                // if there is no data-local NC choice, choose a random one
-                if (!scheduled[i]) {
-                    locations[i] = NCs[currentAvailableNC];
-                    capacity[currentAvailableNC]++;
-                    scheduled[i] = true;
-
-                    /**
-                     * move the available NC cursor to the next one
-                     */
-                    for (int j = currentAvailableNC; j < capacity.length; j++) {
-                        if (capacity[j] < slots) {
-                            currentAvailableNC = j;
-                            break;
-                        }
-                    }
-                }
-            }
-            return locations;
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-
-    /**
-     * Load the IP-address-to-NC map from the NCNameToNCInfoMap
-     * 
-     * @param ncNameToNcInfos
-     * @throws HyracksException
-     */
-    private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
-        try {
-            NCs = new String[ncNameToNcInfos.size()];
-            int i = 0;
-
-            /**
-             * build the IP address to NC map
-             */
-            for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
-                String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
-                        .getHostAddress();
-                List<String> matchedNCs = ipToNcMapping.get(ipAddr);
-                if (matchedNCs == null) {
-                    matchedNCs = new ArrayList<String>();
-                    ipToNcMapping.put(ipAddr, matchedNCs);
-                }
-                matchedNCs.add(entry.getKey());
-                NCs[i] = entry.getKey();
-                i++;
-            }
-
-            /**
-             * set up the NC name to index mapping
-             */
-            for (i = 0; i < NCs.length; i++) {
-                ncNameToIndex.put(NCs[i], i);
-            }
+            org.apache.hadoop.mapred.InputSplit[] inputSplits = new org.apache.hadoop.mapred.InputSplit[splits.size()];
+            for (int i = 0; i < inputSplits.length; i++)
+                inputSplits[i] = new WrappedFileSplit(splits.get(i).getLocations(), splits.get(i).getLength());
+            return scheduler.getLocationConstraints(inputSplits);
         } catch (Exception e) {
             throw new HyracksException(e);
         }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/WrappedFileSplit.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/WrappedFileSplit.java
new file mode 100644
index 0000000..1deb469
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/WrappedFileSplit.java
@@ -0,0 +1,51 @@
+package edu.uci.ics.hyracks.hdfs2.scheduler;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+/**
+ * The wrapped implementation of InputSplit, for the new API scheduler
+ * to reuse the old API scheduler
+ */
+@SuppressWarnings("deprecation")
+public class WrappedFileSplit implements InputSplit {
+
+    private String[] locations;
+    private long length;
+
+    public WrappedFileSplit(String[] locations, long length) {
+        this.locations = locations;
+        this.length = length;
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        int len = input.readInt();
+        locations = new String[len];
+        for (int i = 0; i < len; i++)
+            locations[i] = input.readUTF();
+        length = input.readLong();
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        output.write(locations.length);
+        for (int i = 0; i < locations.length; i++)
+            output.writeUTF(locations[i]);
+        output.writeLong(length);
+    }
+
+    @Override
+    public long getLength() throws IOException {
+        return length;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+        return locations;
+    }
+
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
index 4b8a278..c6b0416 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -199,8 +199,8 @@
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
 
-        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc3", "nc4", "nc2",
-                "nc4", "nc5", "nc5" };
+        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
+                "nc5", "nc5", "nc6" };
 
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
index ea2af13..4b1e11c 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -34,7 +34,6 @@
 
 /**
  * Test case for the new HDFS API scheduler
- * 
  */
 public class SchedulerTest extends TestCase {
 
@@ -204,8 +203,8 @@
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
 
-        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc3", "nc4", "nc2",
-                "nc4", "nc5", "nc5" };
+        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
+                "nc5", "nc5", "nc6" };
 
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);