Merged fullstack_asterix_stabilization -r 2933:3157

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_ioc@3164 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
index e74d610..9092655 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
@@ -18,8 +18,9 @@
 				<artifactId>maven-compiler-plugin</artifactId>
 				<version>2.0.2</version>
 				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
+					<source>1.7</source>
+					<target>1.7</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
@@ -63,6 +64,10 @@
 		<profile>
 			<activation>
 				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>1.0.4</value>
+				</property>
 			</activation>
 			<id>hadoop-1.0.4</id>
 			<dependencies>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
index a2b16c6..16ce76b 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
@@ -1,7 +1,8 @@
 package edu.uci.ics.hyracks.hdfs;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -14,12 +15,25 @@
 public class ContextFactory {
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    public TaskAttemptContext createContext(Configuration conf, InputSplit split) throws HyracksDataException {
+    public TaskAttemptContext createContext(Configuration conf, TaskAttemptID tid) throws HyracksDataException {
         try {
-            return new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null, split);
+            return new Mapper().new Context(conf, tid, null, null, null, null, null);
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
     }
 
+    public TaskAttemptContext createContext(Configuration conf, int partition) throws HyracksDataException {
+        try {
+            TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
+            return new TaskAttemptContext(conf, tid);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public JobContext createJobContext(Configuration conf) {
+        return new JobContext(conf, new JobID("0", 0));
+    }
+
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
index 27a1e33..8b7ecf0 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
@@ -17,8 +17,9 @@
 				<artifactId>maven-compiler-plugin</artifactId>
 				<version>2.0.2</version>
 				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
+					<source>1.7</source>
+					<target>1.7</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
@@ -40,6 +41,10 @@
 		<profile>
 			<activation>
 				<activeByDefault>true</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>0.23.1</value>
+				</property>
 			</activation>
 			<id>hadoop-0.23.1</id>
 			<dependencies>
@@ -77,6 +82,10 @@
 			<id>hadoop-0.23.6</id>
 			<activation>
 				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>0.23.6</value>
+				</property>
 			</activation>
 			<dependencies>
 				<dependency>
@@ -109,6 +118,86 @@
 				</dependency>
 			</dependencies>
 		</profile>
+		<profile>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>cdh-4.2</value>
+				</property>
+			</activation>
+			<id>cdh-4.2</id>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-common</artifactId>
+					<version>2.0.0-cdh4.2.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-mapreduce-client-core</artifactId>
+					<version>2.0.0-cdh4.2.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-hdfs</artifactId>
+					<version>2.0.0-cdh4.2.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<version>2.0.0-cdh4.2.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>cdh-4.1</value>
+				</property>
+			</activation>
+			<id>cdh-4.1</id>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-common</artifactId>
+					<version>2.0.0-cdh4.1.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-mapreduce-client-core</artifactId>
+					<version>2.0.0-cdh4.1.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-hdfs</artifactId>
+					<version>2.0.0-cdh4.1.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<version>2.0.0-cdh4.1.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
 	</profiles>
 
 	<dependencies>
@@ -120,4 +209,11 @@
 			<scope>compile</scope>
 		</dependency>
 	</dependencies>
+
+	<repositories>
+		<repository>
+			<id>cloudera</id>
+			<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+		</repository>
+	</repositories>
 </project>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
index 60ae5d3..ddcce64 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
@@ -1,9 +1,12 @@
 package edu.uci.ics.hyracks.hdfs;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -13,12 +16,25 @@
  */
 public class ContextFactory {
 
-    public TaskAttemptContext createContext(Configuration conf, InputSplit split) throws HyracksDataException {
+    public TaskAttemptContext createContext(Configuration conf, TaskAttemptID tid) throws HyracksDataException {
         try {
-            return new TaskAttemptContextImpl(conf, new TaskAttemptID());
+            return new TaskAttemptContextImpl(conf, tid);
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
     }
 
+    public TaskAttemptContext createContext(Configuration conf, int partition) throws HyracksDataException {
+        try {
+            TaskAttemptID tid = new TaskAttemptID("", 0, TaskType.REDUCE, partition, 0);
+            return new TaskAttemptContextImpl(conf, tid);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public JobContext createJobContext(Configuration conf) {
+        return new JobContextImpl(conf, new JobID("0", 0));
+    }
+
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
index fccfec4..a28c698a 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
@@ -18,8 +18,9 @@
 				<artifactId>maven-compiler-plugin</artifactId>
 				<version>2.0.2</version>
 				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
+					<source>1.7</source>
+					<target>1.7</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
@@ -75,6 +76,10 @@
 		<profile>
 			<activation>
 				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>1.0.4</value>
+				</property>
 			</activation>
 			<id>hadoop-1.0.4</id>
 			<dependencies>
@@ -90,6 +95,10 @@
 		<profile>
 			<activation>
 				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>0.23.1</value>
+				</property>
 			</activation>
 			<id>hadoop-0.23.1</id>
 			<dependencies>
@@ -105,6 +114,10 @@
 		<profile>
 			<activation>
 				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>0.23.6</value>
+				</property>
 			</activation>
 			<id>hadoop-0.23.6</id>
 			<dependencies>
@@ -117,6 +130,44 @@
 				</dependency>
 			</dependencies>
 		</profile>
+		<profile>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>cdh-4.1</value>
+				</property>
+			</activation>
+			<id>cdh-4.1</id>
+			<dependencies>
+				<dependency>
+					<groupId>edu.uci.ics.hyracks</groupId>
+					<artifactId>hyracks-hdfs-0.23.1</artifactId>
+					<version>0.2.3-SNAPSHOT</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>cdh-4.2</value>
+				</property>
+			</activation>
+			<id>cdh-4.2</id>
+			<dependencies>
+				<dependency>
+					<groupId>edu.uci.ics.hyracks</groupId>
+					<artifactId>hyracks-hdfs-0.23.1</artifactId>
+					<version>0.2.3-SNAPSHOT</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
 	</profiles>
 
 	<dependencies>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
index 5923e1e..5d35ec5 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
@@ -29,12 +29,24 @@
 public interface IKeyValueParser<K, V> {
 
     /**
+     * Initialize the key value parser.
+     * 
+     * @param writer
+     *            The hyracks writer for outputting data.
+     * @throws HyracksDataException
+     */
+    public void open(IFrameWriter writer) throws HyracksDataException;
+
+    /**
      * Parse a key-value pair returned by HDFS record reader to a tuple.
      * when the parsers' internal buffer is full, it can flush the buffer to the writer
      * 
      * @param key
+     *            The key returned from Hadoop's InputReader.
      * @param value
+     *            The value returned from Hadoop's InputReader.
      * @param writer
+     *            The hyracks writer for outputting data.
      * @throws HyracksDataException
      */
     public void parse(K key, V value, IFrameWriter writer) throws HyracksDataException;
@@ -44,7 +56,8 @@
      * This method is called in the close() of HDFSReadOperatorDescriptor.
      * 
      * @param writer
+     *            The hyracks writer for outputting data.
      * @throws HyracksDataException
      */
-    public void flush(IFrameWriter writer) throws HyracksDataException;
+    public void close(IFrameWriter writer) throws HyracksDataException;
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
index 6e943ad..7d6f868 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
@@ -18,6 +18,7 @@
 import java.io.Serializable;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * Users need to implement this interface to use the HDFSReadOperatorDescriptor.
@@ -36,6 +37,6 @@
      *            the IHyracksTaskContext
      * @return a key-value parser instance.
      */
-    public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx);
+    public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx) throws HyracksDataException;
 
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollection.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollection.java
new file mode 100644
index 0000000..c51c1dd
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollection.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.hyracks.hdfs.api;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+@SuppressWarnings("deprecation")
+public interface INcCollection {
+
+    public String findNearestAvailableSlot(InputSplit split);
+
+    public int numAvailableSlots();
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollectionBuilder.java
new file mode 100644
index 0000000..ef3ff23
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/INcCollectionBuilder.java
@@ -0,0 +1,18 @@
+package edu.uci.ics.hyracks.hdfs.api;
+
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+
+/**
+ * NC collections
+ * 
+ * @author yingyib
+ */
+public interface INcCollectionBuilder {
+
+    public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
+            Map<String, List<String>> ipToNcMapping, Map<String, Integer> ncNameToIndex, String[] NCs, int[] workloads,
+            int slotLimit);
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
index 25b9523..8e85627 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
@@ -26,6 +26,15 @@
 public interface ITupleWriter {
 
     /**
+     * Initialize the the tuple writer.
+     * 
+     * @param output
+     *            The channel for output data.
+     * @throws HyracksDataException
+     */
+    public void open(DataOutput output) throws HyracksDataException;
+
+    /**
      * Write the tuple to the DataOutput.
      * 
      * @param output
@@ -36,4 +45,13 @@
      */
     public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException;
 
+    /**
+     * Close the writer.
+     * 
+     * @param output
+     *            The channel for output data.
+     * @throws HyracksDataException
+     */
+    public void close(DataOutput output) throws HyracksDataException;
+
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
index 839de8f..9a025c2 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
@@ -17,14 +17,19 @@
 
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
 /**
  * Users need to implement this interface to use the HDFSWriteOperatorDescriptor.
  */
 public interface ITupleWriterFactory extends Serializable {
 
     /**
+     * @param ctx
+     *            the IHyracksTaskContext
      * @return a tuple writer instance
      */
-    public ITupleWriter getTupleWriter();
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException;
 
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index e924650..f49688b 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -102,6 +102,7 @@
                     JobConf conf = confFactory.getConf();
                     IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
                     writer.open();
+                    parser.open(writer);
                     InputFormat inputFormat = conf.getInputFormat();
                     for (int i = 0; i < inputSplits.length; i++) {
                         /**
@@ -131,7 +132,7 @@
                             }
                         }
                     }
-                    parser.flush(writer);
+                    parser.close(writer);
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index ff97a29..3ce6b2a 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -89,10 +89,11 @@
                 String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
                 String fileName = outputDirPath + File.separator + "part-" + partition;
 
-                tupleWriter = tupleWriterFactory.getTupleWriter();
+                tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
                 try {
                     FileSystem dfs = FileSystem.get(conf);
                     dos = dfs.create(new Path(fileName), true);
+                    tupleWriter.open(dos);
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
                 }
@@ -116,6 +117,7 @@
             @Override
             public void close() throws HyracksDataException {
                 try {
+                    tupleWriter.close(dos);
                     dos.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
index 9cc9ebc..147e872 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
@@ -23,6 +23,7 @@
 import java.io.Serializable;
 import java.lang.reflect.Constructor;
 
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -38,6 +39,8 @@
         splitBytes = splitsToBytes(splits);
         if (splits.length > 0) {
             splitClassName = splits[0].getClass().getName();
+        } else {
+            splitClassName = FileSplit.class.getName();
         }
     }
 
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
index c691f5d..9574bb4 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
@@ -43,6 +43,11 @@
         return new IKeyValueParser<LongWritable, Text>() {
 
             @Override
+            public void open(IFrameWriter writer) {
+
+            }
+
+            @Override
             public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
                 tb.reset();
                 tb.addField(value.getBytes(), 0, value.getLength());
@@ -56,7 +61,7 @@
             }
 
             @Override
-            public void flush(IFrameWriter writer) throws HyracksDataException {
+            public void close(IFrameWriter writer) throws HyracksDataException {
                 FrameUtils.flushFrame(buffer, writer);
             }
 
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
index d26721d..0da14e5 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
@@ -17,6 +17,7 @@
 
 import java.io.DataOutput;
 
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
@@ -26,9 +27,14 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    public ITupleWriter getTupleWriter() {
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) {
         return new ITupleWriter() {
-            byte newLine = "\n".getBytes()[0];
+            private byte newLine = "\n".getBytes()[0];
+
+            @Override
+            public void open(DataOutput output) {
+
+            }
 
             @Override
             public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
@@ -43,6 +49,11 @@
                 }
             }
 
+            @Override
+            public void close(DataOutput output) {
+
+            }
+
         };
     }
 
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
new file mode 100644
index 0000000..320b48b
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java
@@ -0,0 +1,121 @@
+package edu.uci.ics.hyracks.hdfs.scheduler;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputSplit;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.hdfs.api.INcCollection;
+import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
+
+@SuppressWarnings("deprecation")
+public class IPProximityNcCollectionBuilder implements INcCollectionBuilder {
+
+    @Override
+    public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
+            final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
+            final int[] workloads, final int slotLimit) {
+        final TreeMap<BytesWritable, IntWritable> availableIpsToSlots = new TreeMap<BytesWritable, IntWritable>();
+        for (int i = 0; i < workloads.length; i++) {
+            if (workloads[i] < slotLimit) {
+                BytesWritable ip = new BytesWritable(ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress());
+                IntWritable availableSlot = availableIpsToSlots.get(ip);
+                if (availableSlot == null) {
+                    availableSlot = new IntWritable(slotLimit - workloads[i]);
+                    availableIpsToSlots.put(ip, availableSlot);
+                } else {
+                    availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
+                }
+            }
+        }
+        return new INcCollection() {
+
+            @Override
+            public String findNearestAvailableSlot(InputSplit split) {
+                try {
+                    String[] locs = split.getLocations();
+                    int minDistance = Integer.MAX_VALUE;
+                    BytesWritable currentCandidateIp = null;
+                    if (locs == null || locs.length > 0) {
+                        for (int j = 0; j < locs.length; j++) {
+                            /**
+                             * get all the IP addresses from the name
+                             */
+                            InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
+                            for (InetAddress ip : allIps) {
+                                BytesWritable splitIp = new BytesWritable(ip.getAddress());
+                                /**
+                                 * if the node controller exists
+                                 */
+                                BytesWritable candidateNcIp = availableIpsToSlots.floorKey(splitIp);
+                                if (candidateNcIp == null) {
+                                    candidateNcIp = availableIpsToSlots.ceilingKey(splitIp);
+                                }
+                                if (candidateNcIp != null) {
+                                    if (availableIpsToSlots.get(candidateNcIp).get() > 0) {
+                                        byte[] candidateIP = candidateNcIp.getBytes();
+                                        byte[] splitIP = splitIp.getBytes();
+                                        int candidateInt = candidateIP[0] << 24 | (candidateIP[1] & 0xFF) << 16
+                                                | (candidateIP[2] & 0xFF) << 8 | (candidateIP[3] & 0xFF);
+                                        int splitInt = splitIP[0] << 24 | (splitIP[1] & 0xFF) << 16
+                                                | (splitIP[2] & 0xFF) << 8 | (splitIP[3] & 0xFF);
+                                        int distance = Math.abs(candidateInt - splitInt);
+                                        if (minDistance > distance) {
+                                            minDistance = distance;
+                                            currentCandidateIp = candidateNcIp;
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    } else {
+                        for (Entry<BytesWritable, IntWritable> entry : availableIpsToSlots.entrySet()) {
+                            if (entry.getValue().get() > 0) {
+                                currentCandidateIp = entry.getKey();
+                                break;
+                            }
+                        }
+                    }
+
+                    if (currentCandidateIp != null) {
+                        /**
+                         * Update the entry of the selected IP
+                         */
+                        IntWritable availableSlot = availableIpsToSlots.get(currentCandidateIp);
+                        availableSlot.set(availableSlot.get() - 1);
+                        if (availableSlot.get() == 0) {
+                            availableIpsToSlots.remove(currentCandidateIp);
+                        }
+                        /**
+                         * Update the entry of the selected NC
+                         */
+                        List<String> dataLocations = ipToNcMapping.get(InetAddress.getByAddress(
+                                currentCandidateIp.getBytes()).getHostAddress());
+                        for (String nc : dataLocations) {
+                            int ncIndex = ncNameToIndex.get(nc);
+                            if (workloads[ncIndex] < slotLimit) {
+                                return nc;
+                            }
+                        }
+                    }
+                    /** not scheduled */
+                    return null;
+                } catch (Exception e) {
+                    throw new IllegalStateException(e);
+                }
+            }
+
+            @Override
+            public int numAvailableSlots() {
+                return availableIpsToSlots.size();
+            }
+
+        };
+    }
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
new file mode 100644
index 0000000..5371c84
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java
@@ -0,0 +1,167 @@
+package edu.uci.ics.hyracks.hdfs.scheduler;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputSplit;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+import edu.uci.ics.hyracks.hdfs.api.INcCollection;
+import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
+
+@SuppressWarnings("deprecation")
+public class RackAwareNcCollectionBuilder implements INcCollectionBuilder {
+    private ClusterTopology topology;
+
+    public RackAwareNcCollectionBuilder(ClusterTopology topology) {
+        this.topology = topology;
+    }
+
+    @Override
+    public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
+            final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
+            final int[] workloads, final int slotLimit) {
+        try {
+            final Map<List<Integer>, String> pathToNCs = new HashMap<List<Integer>, String>();
+            for (int i = 0; i < NCs.length; i++) {
+                List<Integer> path = new ArrayList<Integer>();
+                String ipAddress = InetAddress.getByAddress(
+                        ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress()).getHostAddress();
+                topology.lookupNetworkTerminal(ipAddress, path);
+                pathToNCs.put(path, NCs[i]);
+            }
+
+            final TreeMap<List<Integer>, IntWritable> availableIpsToSlots = new TreeMap<List<Integer>, IntWritable>(
+                    new Comparator<List<Integer>>() {
+
+                        @Override
+                        public int compare(List<Integer> l1, List<Integer> l2) {
+                            int commonLength = Math.min(l1.size(), l2.size());
+                            for (int i = 0; i < commonLength; i++) {
+                                Integer value1 = l1.get(i);
+                                Integer value2 = l2.get(i);
+                                int cmp = value1 > value2 ? 1 : (value1 < value2 ? -1 : 0);
+                                if (cmp != 0) {
+                                    return cmp;
+                                }
+                            }
+                            return l1.size() > l2.size() ? 1 : (l1.size() < l2.size() ? -1 : 0);
+                        }
+
+                    });
+            for (int i = 0; i < workloads.length; i++) {
+                if (workloads[i] < slotLimit) {
+                    List<Integer> path = new ArrayList<Integer>();
+                    String ipAddress = InetAddress.getByAddress(
+                            ncNameToNcInfos.get(NCs[i]).getNetworkAddress().getIpAddress()).getHostAddress();
+                    topology.lookupNetworkTerminal(ipAddress, path);
+                    IntWritable availableSlot = availableIpsToSlots.get(path);
+                    if (availableSlot == null) {
+                        availableSlot = new IntWritable(slotLimit - workloads[i]);
+                        availableIpsToSlots.put(path, availableSlot);
+                    } else {
+                        availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
+                    }
+                }
+            }
+            return new INcCollection() {
+
+                @Override
+                public String findNearestAvailableSlot(InputSplit split) {
+                    try {
+                        String[] locs = split.getLocations();
+                        int minDistance = Integer.MAX_VALUE;
+                        List<Integer> currentCandidatePath = null;
+                        if (locs == null || locs.length > 0) {
+                            for (int j = 0; j < locs.length; j++) {
+                                /**
+                                 * get all the IP addresses from the name
+                                 */
+                                InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
+                                for (InetAddress ip : allIps) {
+                                    List<Integer> splitPath = new ArrayList<Integer>();
+                                    boolean inCluster = topology.lookupNetworkTerminal(ip.getHostAddress(), splitPath);
+                                    if (!inCluster) {
+                                        continue;
+                                    }
+                                    /**
+                                     * if the node controller exists
+                                     */
+                                    List<Integer> candidatePath = availableIpsToSlots.floorKey(splitPath);
+                                    if (candidatePath == null) {
+                                        candidatePath = availableIpsToSlots.ceilingKey(splitPath);
+                                    }
+                                    if (candidatePath != null) {
+                                        if (availableIpsToSlots.get(candidatePath).get() > 0) {
+                                            int distance = distance(splitPath, candidatePath);
+                                            if (minDistance > distance) {
+                                                minDistance = distance;
+                                                currentCandidatePath = candidatePath;
+                                            }
+                                        }
+
+                                    }
+                                }
+                            }
+                        } else {
+                            for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
+                                if (entry.getValue().get() > 0) {
+                                    currentCandidatePath = entry.getKey();
+                                    break;
+                                }
+                            }
+                        }
+
+                        if (currentCandidatePath.size() > 0) {
+                            /**
+                             * Update the entry of the selected IP
+                             */
+                            IntWritable availableSlot = availableIpsToSlots.get(currentCandidatePath);
+                            availableSlot.set(availableSlot.get() - 1);
+                            if (availableSlot.get() == 0) {
+                                availableIpsToSlots.remove(currentCandidatePath);
+                            }
+                            /**
+                             * Update the entry of the selected NC
+                             */
+                            String candidateNc = pathToNCs.get(currentCandidatePath);
+                            int ncIndex = ncNameToIndex.get(candidateNc);
+                            if (workloads[ncIndex] < slotLimit) {
+                                return candidateNc;
+                            }
+                        }
+                        /** not scheduled */
+                        return null;
+                    } catch (Exception e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+
+                @Override
+                public int numAvailableSlots() {
+                    return availableIpsToSlots.size();
+                }
+
+                private int distance(List<Integer> splitPath, List<Integer> candidatePath) {
+                    int commonLength = Math.min(splitPath.size(), candidatePath.size());
+                    int distance = 0;
+                    for (int i = 0; i < commonLength; i++) {
+                        distance = distance * 10 + Math.abs(splitPath.get(i) - candidatePath.get(i));
+                    }
+                    return distance;
+                }
+            };
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
index e7309d4..6d31855 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
@@ -17,13 +17,18 @@
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Random;
+import java.util.logging.Logger;
 
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.InputSplit;
 
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -31,13 +36,17 @@
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+import edu.uci.ics.hyracks.hdfs.api.INcCollection;
+import edu.uci.ics.hyracks.hdfs.api.INcCollectionBuilder;
 
 /**
- * The scheduler conduct data-local scheduling for data reading on HDFS.
- * This class works for Hadoop old API.
+ * The scheduler conduct data-local scheduling for data reading on HDFS. This
+ * class works for Hadoop old API.
  */
 @SuppressWarnings("deprecation")
 public class Scheduler {
+    private static final Logger LOGGER = Logger.getLogger(Scheduler.class.getName());
 
     /** a list of NCs */
     private String[] NCs;
@@ -48,8 +57,16 @@
     /** a map from the NC name to the index */
     private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
 
+    /** a map from NC name to the NodeControllerInfo */
+    private Map<String, NodeControllerInfo> ncNameToNcInfos;
+
     /**
-     * The constructor of the scheduler
+     * the nc collection builder
+     */
+    private INcCollectionBuilder ncCollectionBuilder;
+
+    /**
+     * The constructor of the scheduler.
      * 
      * @param ncNameToNcInfos
      * @throws HyracksException
@@ -57,113 +74,130 @@
     public Scheduler(String ipAddress, int port) throws HyracksException {
         try {
             IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
-            Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
+            this.ncNameToNcInfos = hcc.getNodeControllerInfos();
+            ClusterTopology topology = hcc.getClusterTopology();
+            this.ncCollectionBuilder = topology == null ? new IPProximityNcCollectionBuilder()
+                    : new RackAwareNcCollectionBuilder(topology);
             loadIPAddressToNCMap(ncNameToNcInfos);
         } catch (Exception e) {
             throw new HyracksException(e);
         }
     }
 
+    /**
+     * The constructor of the scheduler.
+     * 
+     * @param ncNameToNcInfos
+     * @throws HyracksException
+     */
+    public Scheduler(String ipAddress, int port, INcCollectionBuilder ncCollectionBuilder) throws HyracksException {
+        try {
+            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+            this.ncNameToNcInfos = hcc.getNodeControllerInfos();
+            this.ncCollectionBuilder = ncCollectionBuilder;
+            loadIPAddressToNCMap(ncNameToNcInfos);
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
+    /**
+     * The constructor of the scheduler.
+     * 
+     * @param ncNameToNcInfos
+     *            the mapping from nc names to nc infos
+     * @throws HyracksException
+     */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
+        this.ncNameToNcInfos = ncNameToNcInfos;
+        this.ncCollectionBuilder = new IPProximityNcCollectionBuilder();
         loadIPAddressToNCMap(ncNameToNcInfos);
     }
 
     /**
-     * Set location constraints for a file scan operator with a list of file splits
+     * The constructor of the scheduler.
+     * 
+     * @param ncNameToNcInfos
+     *            the mapping from nc names to nc infos
+     * @param topology
+     *            the hyracks cluster toplogy
+     * @throws HyracksException
+     */
+    public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology) throws HyracksException {
+        this(ncNameToNcInfos);
+        this.ncCollectionBuilder = topology == null ? new IPProximityNcCollectionBuilder()
+                : new RackAwareNcCollectionBuilder(topology);
+    }
+
+    /**
+     * The constructor of the scheduler.
+     * 
+     * @param ncNameToNcInfos
+     *            the mapping from nc names to nc infos
+     * @throws HyracksException
+     */
+    public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder ncCollectionBuilder)
+            throws HyracksException {
+        this.ncNameToNcInfos = ncNameToNcInfos;
+        this.ncCollectionBuilder = ncCollectionBuilder;
+        loadIPAddressToNCMap(ncNameToNcInfos);
+    }
+
+    /**
+     * Set location constraints for a file scan operator with a list of file
+     * splits. It guarantees the maximum slots a machine can is at most one more
+     * than the minimum slots a machine can get.
      * 
      * @throws HyracksDataException
      */
     public String[] getLocationConstraints(InputSplit[] splits) throws HyracksException {
-        int[] capacity = new int[NCs.length];
-        Arrays.fill(capacity, 0);
+        int[] workloads = new int[NCs.length];
+        Arrays.fill(workloads, 0);
         String[] locations = new String[splits.length];
-        int slots = splits.length % capacity.length == 0 ? (splits.length / capacity.length) : (splits.length
-                / capacity.length + 1);
+        Map<String, IntWritable> locationToNumOfSplits = new HashMap<String, IntWritable>();
+        /**
+         * upper bound number of slots that a machine can get
+         */
+        int upperBoundSlots = splits.length % workloads.length == 0 ? (splits.length / workloads.length)
+                : (splits.length / workloads.length + 1);
+        /**
+         * lower bound number of slots that a machine can get
+         */
+        int lowerBoundSlots = splits.length % workloads.length == 0 ? upperBoundSlots : upperBoundSlots - 1;
 
         try {
             Random random = new Random(System.currentTimeMillis());
             boolean scheduled[] = new boolean[splits.length];
             Arrays.fill(scheduled, false);
-
-            for (int i = 0; i < splits.length; i++) {
-                /**
-                 * get the location of all the splits
-                 */
-                String[] loc = splits[i].getLocations();
-                if (loc.length > 0) {
-                    for (int j = 0; j < loc.length; j++) {
-                        /**
-                         * get all the IP addresses from the name
-                         */
-                        InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
-                        /**
-                         * iterate overa all ips
-                         */
-                        for (InetAddress ip : allIps) {
-                            /**
-                             * if the node controller exists
-                             */
-                            if (ipToNcMapping.get(ip.getHostAddress()) != null) {
-                                /**
-                                 * set the ncs
-                                 */
-                                List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
-                                int arrayPos = random.nextInt(dataLocations.size());
-                                String nc = dataLocations.get(arrayPos);
-                                int pos = ncNameToIndex.get(nc);
-                                /**
-                                 * check if the node is already full
-                                 */
-                                if (capacity[pos] < slots) {
-                                    locations[i] = nc;
-                                    capacity[pos]++;
-                                    scheduled[i] = true;
-                                }
-                            }
-                        }
-
-                        /**
-                         * break the loop for data-locations if the schedule has already been found
-                         */
-                        if (scheduled[i] == true) {
-                            break;
-                        }
-                    }
-                }
-            }
-
             /**
-             * find the lowest index the current available NCs
+             * scan the splits and build the popularity map
+             * give the machines with less local splits more scheduling priority
              */
-            int currentAvailableNC = 0;
-            for (int i = 0; i < capacity.length; i++) {
-                if (capacity[i] < slots) {
-                    currentAvailableNC = i;
-                    break;
-                }
-            }
-
+            buildPopularityMap(splits, locationToNumOfSplits);
             /**
-             * schedule no-local file reads
+             * push data-local lower-bounds slots to each machine
              */
-            for (int i = 0; i < splits.length; i++) {
-                // if there is no data-local NC choice, choose a random one
-                if (!scheduled[i]) {
-                    locations[i] = NCs[currentAvailableNC];
-                    capacity[currentAvailableNC]++;
-                    scheduled[i] = true;
+            scheduleLocalSlots(splits, workloads, locations, lowerBoundSlots, random, scheduled, locationToNumOfSplits);
+            /**
+             * push data-local upper-bounds slots to each machine
+             */
+            scheduleLocalSlots(splits, workloads, locations, upperBoundSlots, random, scheduled, locationToNumOfSplits);
 
-                    /**
-                     * move the available NC cursor to the next one
-                     */
-                    for (int j = currentAvailableNC; j < capacity.length; j++) {
-                        if (capacity[j] < slots) {
-                            currentAvailableNC = j;
-                            break;
-                        }
-                    }
+            int dataLocalCount = 0;
+            for (int i = 0; i < scheduled.length; i++) {
+                if (scheduled[i] == true) {
+                    dataLocalCount++;
                 }
             }
+            LOGGER.info("Data local rate: " + ((float) dataLocalCount / (float) (scheduled.length)));
+            /**
+             * push non-data-local lower-bounds slots to each machine
+             */
+            scheduleNonLocalSlots(splits, workloads, locations, lowerBoundSlots, scheduled);
+            /**
+             * push non-data-local upper-bounds slots to each machine
+             */
+            scheduleNonLocalSlots(splits, workloads, locations, upperBoundSlots, scheduled);
             return locations;
         } catch (IOException e) {
             throw new HyracksException(e);
@@ -171,6 +205,159 @@
     }
 
     /**
+     * Schedule non-local slots to each machine
+     * 
+     * @param splits
+     *            The HDFS file splits.
+     * @param workloads
+     *            The current capacity of each machine.
+     * @param locations
+     *            The result schedule.
+     * @param slotLimit
+     *            The maximum slots of each machine.
+     * @param scheduled
+     *            Indicate which slot is scheduled.
+     */
+    private void scheduleNonLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slotLimit,
+            boolean[] scheduled) throws IOException, UnknownHostException {
+        /**
+         * build the map from available ips to the number of available slots
+         */
+        INcCollection ncCollection = this.ncCollectionBuilder.build(ncNameToNcInfos, ipToNcMapping, ncNameToIndex, NCs,
+                workloads, slotLimit);
+        if (ncCollection.numAvailableSlots() == 0) {
+            return;
+        }
+        /**
+         * schedule no-local file reads
+         */
+        for (int i = 0; i < splits.length; i++) {
+            /** if there is no data-local NC choice, choose a random one */
+            if (!scheduled[i]) {
+                InputSplit split = splits[i];
+                String selectedNcName = ncCollection.findNearestAvailableSlot(split);
+                if (selectedNcName != null) {
+                    int ncIndex = ncNameToIndex.get(selectedNcName);
+                    workloads[ncIndex]++;
+                    scheduled[i] = true;
+                    locations[i] = selectedNcName;
+                }
+            }
+        }
+    }
+
+    /**
+     * Schedule data-local slots to each machine.
+     * 
+     * @param splits
+     *            The HDFS file splits.
+     * @param workloads
+     *            The current capacity of each machine.
+     * @param locations
+     *            The result schedule.
+     * @param slots
+     *            The maximum slots of each machine.
+     * @param random
+     *            The random generator.
+     * @param scheduled
+     *            Indicate which slot is scheduled.
+     * @throws IOException
+     * @throws UnknownHostException
+     */
+    private void scheduleLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots, Random random,
+            boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits) throws IOException,
+            UnknownHostException {
+        /** scheduling candidates will be ordered inversely according to their popularity */
+        PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(3, new Comparator<String>() {
+
+            @Override
+            public int compare(String s1, String s2) {
+                return locationToNumSplits.get(s1).compareTo(locationToNumSplits.get(s2));
+            }
+
+        });
+        for (int i = 0; i < splits.length; i++) {
+            if (scheduled[i]) {
+                continue;
+            }
+            /**
+             * get the location of all the splits
+             */
+            String[] locs = splits[i].getLocations();
+            if (locs.length > 0) {
+                scheduleCadndiates.clear();
+                for (int j = 0; j < locs.length; j++) {
+                    scheduleCadndiates.add(locs[j]);
+                }
+
+                for (String candidate : scheduleCadndiates) {
+                    /**
+                     * get all the IP addresses from the name
+                     */
+                    InetAddress[] allIps = InetAddress.getAllByName(candidate);
+                    /**
+                     * iterate overa all ips
+                     */
+                    for (InetAddress ip : allIps) {
+                        /**
+                         * if the node controller exists
+                         */
+                        if (ipToNcMapping.get(ip.getHostAddress()) != null) {
+                            /**
+                             * set the ncs
+                             */
+                            List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
+                            int arrayPos = random.nextInt(dataLocations.size());
+                            String nc = dataLocations.get(arrayPos);
+                            int pos = ncNameToIndex.get(nc);
+                            /**
+                             * check if the node is already full
+                             */
+                            if (workloads[pos] < slots) {
+                                locations[i] = nc;
+                                workloads[pos]++;
+                                scheduled[i] = true;
+                                break;
+                            }
+                        }
+                    }
+                    /**
+                     * break the loop for data-locations if the schedule has
+                     * already been found
+                     */
+                    if (scheduled[i] == true) {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Scan the splits once and build a popularity map
+     * 
+     * @param splits
+     *            the split array
+     * @param locationToNumOfSplits
+     *            the map to be built
+     * @throws IOException
+     */
+    private void buildPopularityMap(InputSplit[] splits, Map<String, IntWritable> locationToNumOfSplits)
+            throws IOException {
+        for (InputSplit split : splits) {
+            String[] locations = split.getLocations();
+            for (String loc : locations) {
+                IntWritable locCount = locationToNumOfSplits.get(loc);
+                if (locCount == null) {
+                    locCount = new IntWritable(0);
+                    locationToNumOfSplits.put(loc, locCount);
+                }
+                locCount.set(locCount.get() + 1);
+            }
+        }
+    }
+
+    /**
      * Load the IP-address-to-NC map from the NCNameToNCInfoMap
      * 
      * @param ncNameToNcInfos
@@ -179,6 +366,8 @@
     private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
         try {
             NCs = new String[ncNameToNcInfos.size()];
+            ipToNcMapping.clear();
+            ncNameToIndex.clear();
             int i = 0;
 
             /**
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 90f5603..9e9abdf 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -139,8 +139,7 @@
                             /**
                              * read the split
                              */
-                            TaskAttemptContext context = ctxFactory.createContext(job.getConfiguration(),
-                                    inputSplits.get(i));
+                            TaskAttemptContext context = ctxFactory.createContext(job.getConfiguration(), i);
                             RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
                             reader.initialize(inputSplits.get(i), context);
                             while (reader.nextKeyValue() == true) {
@@ -148,7 +147,7 @@
                             }
                         }
                     }
-                    parser.flush(writer);
+                    parser.close(writer);
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 390a7b5..c1c227c 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -39,8 +39,8 @@
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
 
 /**
- * The HDFS file write operator using the Hadoop new API.
- * To use this operator, a user need to provide an ITupleWriterFactory.
+ * The HDFS file write operator using the Hadoop new API. To use this operator,
+ * a user need to provide an ITupleWriterFactory.
  */
 public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
@@ -88,10 +88,11 @@
                 String outputPath = FileOutputFormat.getOutputPath(conf).toString();
                 String fileName = outputPath + File.separator + "part-" + partition;
 
-                tupleWriter = tupleWriterFactory.getTupleWriter();
+                tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
                 try {
                     FileSystem dfs = FileSystem.get(conf.getConfiguration());
                     dos = dfs.create(new Path(fileName), true);
+                    tupleWriter.open(dos);
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
                 }
@@ -115,6 +116,7 @@
             @Override
             public void close() throws HyracksDataException {
                 try {
+                    tupleWriter.close(dos);
                     dos.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
index 3445d68..cb97ca1 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
@@ -15,18 +15,11 @@
 
 package edu.uci.ics.hyracks.hdfs2.scheduler;
 
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 
 import org.apache.hadoop.mapreduce.InputSplit;
 
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -35,16 +28,10 @@
  * The scheduler conduct data-local scheduling for data reading on HDFS.
  * This class works for Hadoop new API.
  */
+@SuppressWarnings("deprecation")
 public class Scheduler {
 
-    /** a list of NCs */
-    private String[] NCs;
-
-    /** a map from ip to NCs */
-    private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
-
-    /** a map from the NC name to the index */
-    private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
+    private edu.uci.ics.hyracks.hdfs.scheduler.Scheduler scheduler;
 
     /**
      * The constructor of the scheduler
@@ -53,17 +40,18 @@
      * @throws HyracksException
      */
     public Scheduler(String ipAddress, int port) throws HyracksException {
-        try {
-            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
-            Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
-            loadIPAddressToNCMap(ncNameToNcInfos);
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
+        scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ipAddress, port);
     }
 
+    /**
+     * The constructor of the scheduler.
+     * 
+     * @param ncNameToNcInfos
+     *            the mapping from nc names to nc infos
+     * @throws HyracksException
+     */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
-        loadIPAddressToNCMap(ncNameToNcInfos);
+        scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos);
     }
 
     /**
@@ -72,135 +60,11 @@
      * @throws HyracksDataException
      */
     public String[] getLocationConstraints(List<InputSplit> splits) throws HyracksException {
-        int[] capacity = new int[NCs.length];
-        Arrays.fill(capacity, 0);
-        String[] locations = new String[splits.size()];
-        int slots = splits.size() % capacity.length == 0 ? (splits.size() / capacity.length) : (splits.size()
-                / capacity.length + 1);
-
         try {
-            Random random = new Random(System.currentTimeMillis());
-            boolean scheduled[] = new boolean[splits.size()];
-            Arrays.fill(scheduled, false);
-
-            for (int i = 0; i < splits.size(); i++) {
-                /**
-                 * get the location of all the splits
-                 */
-                String[] loc = splits.get(i).getLocations();
-                if (loc.length > 0) {
-                    for (int j = 0; j < loc.length; j++) {
-                        /**
-                         * get all the IP addresses from the name
-                         */
-                        InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
-                        /**
-                         * iterate overa all ips
-                         */
-                        for (InetAddress ip : allIps) {
-                            /**
-                             * if the node controller exists
-                             */
-                            if (ipToNcMapping.get(ip.getHostAddress()) != null) {
-                                /**
-                                 * set the ncs
-                                 */
-                                List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
-                                int arrayPos = random.nextInt(dataLocations.size());
-                                String nc = dataLocations.get(arrayPos);
-                                int pos = ncNameToIndex.get(nc);
-                                /**
-                                 * check if the node is already full
-                                 */
-                                if (capacity[pos] < slots) {
-                                    locations[i] = nc;
-                                    capacity[pos]++;
-                                    scheduled[i] = true;
-                                }
-                            }
-                        }
-
-                        /**
-                         * break the loop for data-locations if the schedule has already been found
-                         */
-                        if (scheduled[i] == true) {
-                            break;
-                        }
-                    }
-                }
-            }
-
-            /**
-             * find the lowest index the current available NCs
-             */
-            int currentAvailableNC = 0;
-            for (int i = 0; i < capacity.length; i++) {
-                if (capacity[i] < slots) {
-                    currentAvailableNC = i;
-                    break;
-                }
-            }
-
-            /**
-             * schedule no-local file reads
-             */
-            for (int i = 0; i < splits.size(); i++) {
-                // if there is no data-local NC choice, choose a random one
-                if (!scheduled[i]) {
-                    locations[i] = NCs[currentAvailableNC];
-                    capacity[currentAvailableNC]++;
-                    scheduled[i] = true;
-
-                    /**
-                     * move the available NC cursor to the next one
-                     */
-                    for (int j = currentAvailableNC; j < capacity.length; j++) {
-                        if (capacity[j] < slots) {
-                            currentAvailableNC = j;
-                            break;
-                        }
-                    }
-                }
-            }
-            return locations;
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-
-    /**
-     * Load the IP-address-to-NC map from the NCNameToNCInfoMap
-     * 
-     * @param ncNameToNcInfos
-     * @throws HyracksException
-     */
-    private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
-        try {
-            NCs = new String[ncNameToNcInfos.size()];
-            int i = 0;
-
-            /**
-             * build the IP address to NC map
-             */
-            for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
-                String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
-                        .getHostAddress();
-                List<String> matchedNCs = ipToNcMapping.get(ipAddr);
-                if (matchedNCs == null) {
-                    matchedNCs = new ArrayList<String>();
-                    ipToNcMapping.put(ipAddr, matchedNCs);
-                }
-                matchedNCs.add(entry.getKey());
-                NCs[i] = entry.getKey();
-                i++;
-            }
-
-            /**
-             * set up the NC name to index mapping
-             */
-            for (i = 0; i < NCs.length; i++) {
-                ncNameToIndex.put(NCs[i], i);
-            }
+            org.apache.hadoop.mapred.InputSplit[] inputSplits = new org.apache.hadoop.mapred.InputSplit[splits.size()];
+            for (int i = 0; i < inputSplits.length; i++)
+                inputSplits[i] = new WrappedFileSplit(splits.get(i).getLocations(), splits.get(i).getLength());
+            return scheduler.getLocationConstraints(inputSplits);
         } catch (Exception e) {
             throw new HyracksException(e);
         }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/WrappedFileSplit.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/WrappedFileSplit.java
new file mode 100644
index 0000000..1deb469
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/WrappedFileSplit.java
@@ -0,0 +1,51 @@
+package edu.uci.ics.hyracks.hdfs2.scheduler;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+/**
+ * The wrapped implementation of InputSplit, for the new API scheduler
+ * to reuse the old API scheduler
+ */
+@SuppressWarnings("deprecation")
+public class WrappedFileSplit implements InputSplit {
+
+    private String[] locations;
+    private long length;
+
+    public WrappedFileSplit(String[] locations, long length) {
+        this.locations = locations;
+        this.length = length;
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        int len = input.readInt();
+        locations = new String[len];
+        for (int i = 0; i < len; i++)
+            locations[i] = input.readUTF();
+        length = input.readLong();
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        output.write(locations.length);
+        for (int i = 0; i < locations.length; i++)
+            output.writeUTF(locations[i]);
+        output.writeLong(length);
+    }
+
+    @Override
+    public long getLength() throws IOException {
+        return length;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+        return locations;
+    }
+
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
index 4b8a278..90967a0 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.hdfs.scheduler;
 
+import java.io.FileReader;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
@@ -25,13 +27,28 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
 
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.client.NodeStatus;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+import edu.uci.ics.hyracks.api.topology.TopologyDefinitionParser;
 
 @SuppressWarnings("deprecation")
 public class SchedulerTest extends TestCase {
+    private static String TOPOLOGY_PATH = "src/test/resources/topology.xml";
+
+    private ClusterTopology parseTopology() throws IOException, SAXException {
+        FileReader fr = new FileReader(TOPOLOGY_PATH);
+        InputSource in = new InputSource(fr);
+        try {
+            return TopologyDefinitionParser.parse(in);
+        } finally {
+            fr.close();
+        }
+    }
 
     /**
      * Test the scheduler for the case when the Hyracks cluster is the HDFS cluster
@@ -41,17 +58,23 @@
     public void testSchedulerSimple() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099)));
+                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099)));
+                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099)));
+                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099)));
+                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099)));
+                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099)));
+                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
+                .getAddress(), 5098)));
 
         InputSplit[] fileSplits = new InputSplit[6];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -61,11 +84,17 @@
         fileSplits[4] = new FileSplit(new Path("part-5"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
         fileSplits[5] = new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
 
+        String[] expectedResults = new String[] { "nc1", "nc4", "nc6", "nc2", "nc3", "nc5" };
+
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
+        for (int i = 0; i < locationConstraints.length; i++) {
+            Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+        }
 
-        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc5", "nc6" };
-
+        ClusterTopology topology = parseTopology();
+        scheduler = new Scheduler(ncNameToNcInfos, topology);
+        locationConstraints = scheduler.getLocationConstraints(fileSplits);
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);
         }
@@ -79,17 +108,23 @@
     public void testSchedulerLargerHDFS() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099)));
+                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099)));
+                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099)));
+                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099)));
+                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
+                .getAddress(), 5098)));
+        ncNameToNcInfos.put("nc7", new NodeControllerInfo("nc7", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.7").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
+                .getAddress(), 5098)));
+        ncNameToNcInfos.put("nc12", new NodeControllerInfo("nc12", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.12").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
+                .getAddress(), 5098)));
 
         InputSplit[] fileSplits = new InputSplit[12];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -100,17 +135,25 @@
         fileSplits[5] = new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
         fileSplits[6] = new FileSplit(new Path("part-7"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
         fileSplits[7] = new FileSplit(new Path("part-8"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
-        fileSplits[8] = new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" });
+        fileSplits[8] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.14", "10.0.0.11", "10.0.0.13" });
         fileSplits[9] = new FileSplit(new Path("part-10"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.6" });
         fileSplits[10] = new FileSplit(new Path("part-11"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.7" });
-        fileSplits[11] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
+        fileSplits[11] = new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" });
 
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
 
-        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
-                "nc6", "nc5" };
+        String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc12",
+                "nc7", "nc7", "nc12" };
+        for (int i = 0; i < locationConstraints.length; i++) {
+            Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+        }
 
+        expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc7", "nc12", "nc7",
+                "nc12" };
+        ClusterTopology topology = parseTopology();
+        scheduler = new Scheduler(ncNameToNcInfos, topology);
+        locationConstraints = scheduler.getLocationConstraints(fileSplits);
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);
         }
@@ -124,17 +167,23 @@
     public void testSchedulerSmallerHDFS() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099)));
+                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099)));
+                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099)));
+                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099)));
+                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099)));
+                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099)));
+                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
+                .getAddress(), 5098)));
 
         InputSplit[] fileSplits = new InputSplit[12];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -150,12 +199,19 @@
         fileSplits[10] = new FileSplit(new Path("part-11"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
         fileSplits[11] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
 
+        String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc6",
+                "nc5", "nc6" };
+
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
 
-        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
-                "nc5", "nc6" };
+        for (int i = 0; i < locationConstraints.length; i++) {
+            Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+        }
 
+        ClusterTopology topology = parseTopology();
+        scheduler = new Scheduler(ncNameToNcInfos, topology);
+        locationConstraints = scheduler.getLocationConstraints(fileSplits);
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);
         }
@@ -169,17 +225,23 @@
     public void testSchedulerSmallerHDFSOdd() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099)));
+                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099)));
+                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099)));
+                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099)));
+                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099)));
+                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099)));
+                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
+                .getAddress(), 5098)));
 
         InputSplit[] fileSplits = new InputSplit[13];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -196,15 +258,23 @@
         fileSplits[11] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
         fileSplits[12] = new FileSplit(new Path("part-13"), 0, 0, new String[] { "10.0.0.2", "10.0.0.4", "10.0.0.5" });
 
+        String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc1",
+                "nc5", "nc2", "nc4" };
+
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
 
-        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc3", "nc4", "nc2",
-                "nc4", "nc5", "nc5" };
-
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);
         }
+
+        ClusterTopology topology = parseTopology();
+        scheduler = new Scheduler(ncNameToNcInfos, topology);
+        locationConstraints = scheduler.getLocationConstraints(fileSplits);
+        for (int i = 0; i < locationConstraints.length; i++) {
+            Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+        }
+
     }
 
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
index 30dbb80..bdff2fd 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/HyracksUtils.java
@@ -64,6 +64,7 @@
         ncConfig1.clusterNetIPAddress = "localhost";
         ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
         ncConfig1.dataIPAddress = "127.0.0.1";
+        ncConfig1.datasetIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
@@ -73,6 +74,7 @@
         ncConfig2.clusterNetIPAddress = "localhost";
         ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
         ncConfig2.dataIPAddress = "127.0.0.1";
+        ncConfig2.datasetIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
index ea2af13..442aeae0 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -34,7 +34,6 @@
 
 /**
  * Test case for the new HDFS API scheduler
- * 
  */
 public class SchedulerTest extends TestCase {
 
@@ -46,17 +45,23 @@
     public void testSchedulerSimple() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099)));
+                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099)));
+                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099)));
+                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099)));
+                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099)));
+                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099)));
+                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
+                .getAddress(), 5098)));
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -69,7 +74,7 @@
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
 
-        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc5", "nc6" };
+        String[] expectedResults = new String[] { "nc1", "nc4", "nc6", "nc2", "nc3", "nc5" };
 
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);
@@ -84,17 +89,23 @@
     public void testSchedulerLargerHDFS() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099)));
+                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099)));
+                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099)));
+                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099)));
+                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099)));
+                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099)));
+                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
+                .getAddress(), 5098)));
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -113,8 +124,8 @@
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
 
-        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
-                "nc6", "nc5" };
+        String[] expectedResults = new String[] { "nc1", "nc4", "nc6", "nc1", "nc4", "nc2", "nc2", "nc3", "nc6", "nc5",
+                "nc3", "nc5" };
 
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);
@@ -129,17 +140,23 @@
     public void testSchedulerSmallerHDFS() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099)));
+                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099)));
+                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099)));
+                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099)));
+                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099)));
+                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099)));
+                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
+                .getAddress(), 5098)));
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -158,7 +175,7 @@
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
 
-        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
+        String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc6",
                 "nc5", "nc6" };
 
         for (int i = 0; i < locationConstraints.length; i++) {
@@ -174,17 +191,23 @@
     public void testSchedulerSmallerHDFSOdd() throws Exception {
         Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
         ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.1").getAddress(), 5099)));
+                .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.2").getAddress(), 5099)));
+                .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.3").getAddress(), 5099)));
+                .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.4").getAddress(), 5099)));
+                .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.5").getAddress(), 5099)));
+                .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
+                .getAddress(), 5098)));
         ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
-                .getByName("10.0.0.6").getAddress(), 5099)));
+                .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
+                .getAddress(), 5098)));
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -204,8 +227,8 @@
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
 
-        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc3", "nc4", "nc2",
-                "nc4", "nc5", "nc5" };
+        String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc5", "nc1",
+                "nc5", "nc2", "nc4" };
 
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/topology.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/topology.xml
new file mode 100644
index 0000000..3a0ac7e
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/resources/topology.xml
@@ -0,0 +1,32 @@
+<cluster-topology>
+	<network-switch name="all">
+		<network-switch name="rack1">
+			<terminal name="10.0.0.1" />
+			<terminal name="10.0.0.5" />
+			<terminal name="10.0.0.9" />
+			<terminal name="10.0.0.13" />
+			<terminal name="10.0.0.17" />
+		</network-switch>
+		<network-switch name="rack2">
+			<terminal name="10.0.0.2" />
+			<terminal name="10.0.0.6" />
+			<terminal name="10.0.0.10" />
+			<terminal name="10.0.0.14" />
+			<terminal name="10.0.0.18" />
+		</network-switch>
+		<network-switch name="rack3">
+			<terminal name="10.0.0.3" />
+			<terminal name="10.0.0.7" />
+			<terminal name="10.0.0.11" />
+			<terminal name="10.0.0.15" />
+			<terminal name="10.0.0.19" />
+		</network-switch>
+		<network-switch name="rack4">
+			<terminal name="10.0.0.4" />
+			<terminal name="10.0.0.8" />
+			<terminal name="10.0.0.12" />
+			<terminal name="10.0.0.16" />
+			<terminal name="10.0.0.20" />
+		</network-switch>
+	</network-switch>
+</cluster-topology>
\ No newline at end of file