refactoring hdfs-scheduler to support different policies

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3154 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollection.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollection.java
new file mode 100644
index 0000000..c51c1dd
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollection.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.hyracks.hdfs.api;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+@SuppressWarnings("deprecation")
+public interface INcCollection {
+
+    public String findNearestAvailableSlot(InputSplit split);
+
+    public int numAvailableSlots();
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollectionBuilder.java
new file mode 100644
index 0000000..ef3ff23
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollectionBuilder.java
@@ -0,0 +1,18 @@
+package edu.uci.ics.hyracks.hdfs.api;
+
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+
+/**
+ * NC collections
+ * 
+ * @author yingyib
+ */
+public interface INcCollectionBuilder {
+
+    public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
+            Map<String, List<String>> ipToNcMapping, Map<String, Integer> ncNameToIndex, String[] NCs, int[] workloads,
+            int slotLimit);
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
new file mode 100644
index 0000000..320b48b
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
@@ -0,0 +1,121 @@
+package edu.uci.ics.hyracks.hdfs.scheduler;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputSplit;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.hdfs.api.INcCollection;
+import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
+
+@SuppressWarnings("deprecation")
+public class IPProximityNcCollectionBuilder implements INcCollectionBuilder {
+
+    @Override
+    public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
+            final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
+            final int[] workloads, final int slotLimit) {
+        final TreeMap<BytesWritable, IntWritable> availableIpsToSlots = new TreeMap<BytesWritable, IntWritable>();
+        for (int i = 0; i < workloads.length; i++) {
+            if (workloads[i] < slotLimit) {
+                BytesWritable ip = new BytesWritable(ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress());
+                IntWritable availableSlot = availableIpsToSlots.get(ip);
+                if (availableSlot == null) {
+                    availableSlot = new IntWritable(slotLimit - workloads[i]);
+                    availableIpsToSlots.put(ip, availableSlot);
+                } else {
+                    availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
+                }
+            }
+        }
+        return new INcCollection() {
+
+            @Override
+            public String findNearestAvailableSlot(InputSplit split) {
+                try {
+                    String[] locs = split.getLocations();
+                    int minDistance = Integer.MAX_VALUE;
+                    BytesWritable currentCandidateIp = null;
+                    if (locs == null || locs.length > 0) {
+                        for (int j = 0; j < locs.length; j++) {
+                            /**
+                             * get all the IP addresses from the name
+                             */
+                            InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
+                            for (InetAddress ip : allIps) {
+                                BytesWritable splitIp = new BytesWritable(ip.getAddress());
+                                /**
+                                 * if the node controller exists
+                                 */
+                                BytesWritable candidateNcIp = availableIpsToSlots.floorKey(splitIp);
+                                if (candidateNcIp == null) {
+                                    candidateNcIp = availableIpsToSlots.ceilingKey(splitIp);
+                                }
+                                if (candidateNcIp != null) {
+                                    if (availableIpsToSlots.get(candidateNcIp).get() > 0) {
+                                        byte[] candidateIP = candidateNcIp.getBytes();
+                                        byte[] splitIP = splitIp.getBytes();
+                                        int candidateInt = candidateIP[0] << 24 | (candidateIP[1] & 0xFF) << 16
+                                                | (candidateIP[2] & 0xFF) << 8 | (candidateIP[3] & 0xFF);
+                                        int splitInt = splitIP[0] << 24 | (splitIP[1] & 0xFF) << 16
+                                                | (splitIP[2] & 0xFF) << 8 | (splitIP[3] & 0xFF);
+                                        int distance = Math.abs(candidateInt - splitInt);
+                                        if (minDistance > distance) {
+                                            minDistance = distance;
+                                            currentCandidateIp = candidateNcIp;
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    } else {
+                        for (Entry<BytesWritable, IntWritable> entry : availableIpsToSlots.entrySet()) {
+                            if (entry.getValue().get() > 0) {
+                                currentCandidateIp = entry.getKey();
+                                break;
+                            }
+                        }
+                    }
+
+                    if (currentCandidateIp != null) {
+                        /**
+                         * Update the entry of the selected IP
+                         */
+                        IntWritable availableSlot = availableIpsToSlots.get(currentCandidateIp);
+                        availableSlot.set(availableSlot.get() - 1);
+                        if (availableSlot.get() == 0) {
+                            availableIpsToSlots.remove(currentCandidateIp);
+                        }
+                        /**
+                         * Update the entry of the selected NC
+                         */
+                        List<String> dataLocations = ipToNcMapping.get(InetAddress.getByAddress(
+                                currentCandidateIp.getBytes()).getHostAddress());
+                        for (String nc : dataLocations) {
+                            int ncIndex = ncNameToIndex.get(nc);
+                            if (workloads[ncIndex] < slotLimit) {
+                                return nc;
+                            }
+                        }
+                    }
+                    /** not scheduled */
+                    return null;
+                } catch (Exception e) {
+                    throw new IllegalStateException(e);
+                }
+            }
+
+            @Override
+            public int numAvailableSlots() {
+                return availableIpsToSlots.size();
+            }
+
+        };
+    }
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
index aaebc09..a6320e7 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
@@ -24,13 +24,10 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.PriorityQueue;
 import java.util.Random;
-import java.util.TreeMap;
 import java.util.logging.Logger;
 
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.InputSplit;
 
@@ -39,6 +36,9 @@
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+import edu.uci.ics.hyracks.hdfs.api.INcCollection;
+import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
 
 /**
  * The scheduler conduct data-local scheduling for data reading on HDFS. This
@@ -46,7 +46,6 @@
  */
 @SuppressWarnings("deprecation")
 public class Scheduler {
-
     private static final Logger LOGGER = Logger.getLogger(Scheduler.class.getName());
 
     /** a list of NCs */
@@ -58,18 +57,38 @@
     /** a map from the NC name to the index */
     private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
 
+    /** a map from NC name to the NodeControllerInfo */
     private Map<String, NodeControllerInfo> ncNameToNcInfos;
 
     /**
+     * the nc collection builder
+     */
+    private INcCollectionBuilder ncCollectionBuilder;
+    
+    private ClusterTopology topology;
+
+    /**
      * The constructor of the scheduler.
      * 
      * @param ncNameToNcInfos
      * @throws HyracksException
      */
     public Scheduler(String ipAddress, int port) throws HyracksException {
+        this(ipAddress, port, new IPProximityNcCollectionBuilder());
+    }
+
+    /**
+     * The constructor of the scheduler.
+     * 
+     * @param ncNameToNcInfos
+     * @throws HyracksException
+     */
+    public Scheduler(String ipAddress, int port, INcCollectionBuilder ncCollectionBuilder) throws HyracksException {
         try {
             IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
             this.ncNameToNcInfos = hcc.getNodeControllerInfos();
+            this.topology = hcc.getClusterTopology();
+            this.ncCollectionBuilder = ncCollectionBuilder;
             loadIPAddressToNCMap(ncNameToNcInfos);
         } catch (Exception e) {
             throw new HyracksException(e);
@@ -81,10 +100,40 @@
      * 
      * @param ncNameToNcInfos
      *            the mapping from nc names to nc infos
+     * @param topology
+     *            the hyracks cluster toplogy
+     * @throws HyracksException
+     */
+    public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder ncCollectionBuilder,
+            ClusterTopology topology) throws HyracksException {
+        this(ncNameToNcInfos, ncCollectionBuilder);
+        this.topology = topology;
+    }
+
+    /**
+     * The constructor of the scheduler.
+     * 
+     * @param ncNameToNcInfos
+     *            the mapping from nc names to nc infos
      * @throws HyracksException
      */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
         this.ncNameToNcInfos = ncNameToNcInfos;
+        this.ncCollectionBuilder = new IPProximityNcCollectionBuilder();
+        loadIPAddressToNCMap(ncNameToNcInfos);
+    }
+
+    /**
+     * The constructor of the scheduler.
+     * 
+     * @param ncNameToNcInfos
+     *            the mapping from nc names to nc infos
+     * @throws HyracksException
+     */
+    public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder ncCollectionBuilder)
+            throws HyracksException {
+        this.ncNameToNcInfos = ncNameToNcInfos;
+        this.ncCollectionBuilder = ncCollectionBuilder;
         loadIPAddressToNCMap(ncNameToNcInfos);
     }
 
@@ -158,106 +207,34 @@
      *            The current capacity of each machine.
      * @param locations
      *            The result schedule.
-     * @param slots
+     * @param slotLimit
      *            The maximum slots of each machine.
      * @param scheduled
      *            Indicate which slot is scheduled.
      */
-    private void scheduleNonLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots,
+    private void scheduleNonLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slotLimit,
             boolean[] scheduled) throws IOException, UnknownHostException {
         /**
          * build the map from available ips to the number of available slots
          */
-        TreeMap<BytesWritable, IntWritable> availableIpsToSlots = new TreeMap<BytesWritable, IntWritable>();
-        for (int i = 0; i < workloads.length; i++) {
-            if (workloads[i] < slots) {
-                BytesWritable ip = new BytesWritable(ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress());
-                IntWritable availableSlot = availableIpsToSlots.get(ip);
-                if (availableSlot == null) {
-                    availableSlot = new IntWritable(slots - workloads[i]);
-                    availableIpsToSlots.put(ip, availableSlot);
-                } else {
-                    availableSlot.set(slots - workloads[i] + availableSlot.get());
-                }
-            }
-        }
-        if (availableIpsToSlots.size() == 0) {
+        INcCollection ncCollection = this.ncCollectionBuilder.build(ncNameToNcInfos, ipToNcMapping, ncNameToIndex, NCs,
+                workloads, slotLimit);
+        if (ncCollection.numAvailableSlots() == 0) {
             return;
         }
         /**
          * schedule no-local file reads
          */
         for (int i = 0; i < splits.length; i++) {
-            // if there is no data-local NC choice, choose a random one
+            /** if there is no data-local NC choice, choose a random one */
             if (!scheduled[i]) {
                 InputSplit split = splits[i];
-                String[] locs = split.getLocations();
-                int minDistance = Integer.MAX_VALUE;
-                BytesWritable currentCandidateIp = null;
-                if (locs == null || locs.length > 0) {
-                    for (int j = 0; j < locs.length; j++) {
-                        /**
-                         * get all the IP addresses from the name
-                         */
-                        InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
-                        for (InetAddress ip : allIps) {
-                            BytesWritable splitIp = new BytesWritable(ip.getAddress());
-                            /**
-                             * if the node controller exists
-                             */
-                            BytesWritable candidateNcIp = availableIpsToSlots.floorKey(splitIp);
-                            if (candidateNcIp == null) {
-                                candidateNcIp = availableIpsToSlots.ceilingKey(splitIp);
-                            }
-                            if (candidateNcIp != null) {
-                                if (availableIpsToSlots.get(candidateNcIp).get() > 0) {
-                                    byte[] candidateIP = candidateNcIp.getBytes();
-                                    byte[] splitIP = splitIp.getBytes();
-                                    int candidateInt = candidateIP[0] << 24 | (candidateIP[1] & 0xFF) << 16
-                                            | (candidateIP[2] & 0xFF) << 8 | (candidateIP[3] & 0xFF);
-                                    int splitInt = splitIP[0] << 24 | (splitIP[1] & 0xFF) << 16
-                                            | (splitIP[2] & 0xFF) << 8 | (splitIP[3] & 0xFF);
-                                    int distance = Math.abs(candidateInt - splitInt);
-                                    if (minDistance > distance) {
-                                        minDistance = distance;
-                                        currentCandidateIp = candidateNcIp;
-                                    }
-                                }
-                            }
-                        }
-                    }
-                } else {
-                    for (Entry<BytesWritable, IntWritable> entry : availableIpsToSlots.entrySet()) {
-                        if (entry.getValue().get() > 0) {
-                            currentCandidateIp = entry.getKey();
-                            break;
-                        }
-                    }
-                }
-
-                if (currentCandidateIp != null) {
-                    /**
-                     * Update the entry of the selected IP
-                     */
-                    IntWritable availableSlot = availableIpsToSlots.get(currentCandidateIp);
-                    availableSlot.set(availableSlot.get() - 1);
-                    if (availableSlot.get() == 0) {
-                        availableIpsToSlots.remove(currentCandidateIp);
-                    }
-                    /**
-                     * Update the entry of the selected NC
-                     */
-                    List<String> dataLocations = ipToNcMapping.get(InetAddress.getByAddress(
-                            currentCandidateIp.getBytes()).getHostAddress());
-                    for (String nc : dataLocations) {
-                        int ncIndex = ncNameToIndex.get(nc);
-                        if (workloads[ncIndex] < slots) {
-                            locations[i] = nc;
-                            workloads[ncIndex]++;
-                            scheduled[i] = true;
-                            break;
-                        }
-                    }
+                String selectedNcName = ncCollection.findNearestAvailableSlot(split);
+                if (selectedNcName != null) {
+                    int ncIndex = ncNameToIndex.get(selectedNcName);
+                    workloads[ncIndex]++;
+                    scheduled[i] = true;
+                    locations[i] = selectedNcName;
                 }
             }
         }