diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
index b33e8e2..9092655 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
@@ -20,6 +20,7 @@
 				<configuration>
 					<source>1.7</source>
 					<target>1.7</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
@@ -63,6 +64,10 @@
 		<profile>
 			<activation>
 				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>1.0.4</value>
+				</property>
 			</activation>
 			<id>hadoop-1.0.4</id>
 			<dependencies>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
index a2b16c6..16ce76b 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
@@ -1,7 +1,8 @@
 package edu.uci.ics.hyracks.hdfs;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -14,12 +15,25 @@
 public class ContextFactory {
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    public TaskAttemptContext createContext(Configuration conf, InputSplit split) throws HyracksDataException {
+    public TaskAttemptContext createContext(Configuration conf, TaskAttemptID tid) throws HyracksDataException {
         try {
-            return new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null, split);
+            return new Mapper().new Context(conf, tid, null, null, null, null, null);
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
     }
 
+    public TaskAttemptContext createContext(Configuration conf, int partition) throws HyracksDataException {
+        try {
+            TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
+            return new TaskAttemptContext(conf, tid);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public JobContext createJobContext(Configuration conf) {
+        return new JobContext(conf, new JobID("0", 0));
+    }
+
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
index 07b244f..8b7ecf0 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
@@ -19,6 +19,7 @@
 				<configuration>
 					<source>1.7</source>
 					<target>1.7</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
@@ -40,6 +41,10 @@
 		<profile>
 			<activation>
 				<activeByDefault>true</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>0.23.1</value>
+				</property>
 			</activation>
 			<id>hadoop-0.23.1</id>
 			<dependencies>
@@ -77,6 +82,10 @@
 			<id>hadoop-0.23.6</id>
 			<activation>
 				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>0.23.6</value>
+				</property>
 			</activation>
 			<dependencies>
 				<dependency>
@@ -109,6 +118,86 @@
 				</dependency>
 			</dependencies>
 		</profile>
+		<profile>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>cdh-4.2</value>
+				</property>
+			</activation>
+			<id>cdh-4.2</id>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-common</artifactId>
+					<version>2.0.0-cdh4.2.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-mapreduce-client-core</artifactId>
+					<version>2.0.0-cdh4.2.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-hdfs</artifactId>
+					<version>2.0.0-cdh4.2.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<version>2.0.0-cdh4.2.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>cdh-4.1</value>
+				</property>
+			</activation>
+			<id>cdh-4.1</id>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-common</artifactId>
+					<version>2.0.0-cdh4.1.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-mapreduce-client-core</artifactId>
+					<version>2.0.0-cdh4.1.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-hdfs</artifactId>
+					<version>2.0.0-cdh4.1.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<version>2.0.0-cdh4.1.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
 	</profiles>
 
 	<dependencies>
@@ -120,4 +209,11 @@
 			<scope>compile</scope>
 		</dependency>
 	</dependencies>
+
+	<repositories>
+		<repository>
+			<id>cloudera</id>
+			<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+		</repository>
+	</repositories>
 </project>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
index 60ae5d3..ddcce64 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
@@ -1,9 +1,12 @@
 package edu.uci.ics.hyracks.hdfs;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -13,12 +16,25 @@
  */
 public class ContextFactory {
 
-    public TaskAttemptContext createContext(Configuration conf, InputSplit split) throws HyracksDataException {
+    public TaskAttemptContext createContext(Configuration conf, TaskAttemptID tid) throws HyracksDataException {
         try {
-            return new TaskAttemptContextImpl(conf, new TaskAttemptID());
+            return new TaskAttemptContextImpl(conf, tid);
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
     }
 
+    public TaskAttemptContext createContext(Configuration conf, int partition) throws HyracksDataException {
+        try {
+            TaskAttemptID tid = new TaskAttemptID("", 0, TaskType.REDUCE, partition, 0);
+            return new TaskAttemptContextImpl(conf, tid);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public JobContext createJobContext(Configuration conf) {
+        return new JobContextImpl(conf, new JobID("0", 0));
+    }
+
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
index 6557b08..a28c698a 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
@@ -20,6 +20,7 @@
 				<configuration>
 					<source>1.7</source>
 					<target>1.7</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
@@ -75,6 +76,10 @@
 		<profile>
 			<activation>
 				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>1.0.4</value>
+				</property>
 			</activation>
 			<id>hadoop-1.0.4</id>
 			<dependencies>
@@ -90,6 +95,10 @@
 		<profile>
 			<activation>
 				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>0.23.1</value>
+				</property>
 			</activation>
 			<id>hadoop-0.23.1</id>
 			<dependencies>
@@ -105,6 +114,10 @@
 		<profile>
 			<activation>
 				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>0.23.6</value>
+				</property>
 			</activation>
 			<id>hadoop-0.23.6</id>
 			<dependencies>
@@ -117,6 +130,44 @@
 				</dependency>
 			</dependencies>
 		</profile>
+		<profile>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>cdh-4.1</value>
+				</property>
+			</activation>
+			<id>cdh-4.1</id>
+			<dependencies>
+				<dependency>
+					<groupId>edu.uci.ics.hyracks</groupId>
+					<artifactId>hyracks-hdfs-0.23.1</artifactId>
+					<version>0.2.3-SNAPSHOT</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>cdh-4.2</value>
+				</property>
+			</activation>
+			<id>cdh-4.2</id>
+			<dependencies>
+				<dependency>
+					<groupId>edu.uci.ics.hyracks</groupId>
+					<artifactId>hyracks-hdfs-0.23.1</artifactId>
+					<version>0.2.3-SNAPSHOT</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
 	</profiles>
 
 	<dependencies>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
index 5923e1e..5d35ec5 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
@@ -29,12 +29,24 @@
 public interface IKeyValueParser<K, V> {
 
     /**
+     * Initialize the key value parser.
+     * 
+     * @param writer
+     *            The hyracks writer for outputting data.
+     * @throws HyracksDataException
+     */
+    public void open(IFrameWriter writer) throws HyracksDataException;
+
+    /**
      * Parse a key-value pair returned by HDFS record reader to a tuple.
      * when the parsers' internal buffer is full, it can flush the buffer to the writer
      * 
      * @param key
+     *            The key returned from Hadoop's InputReader.
      * @param value
+     *            The value returned from Hadoop's InputReader.
      * @param writer
+     *            The hyracks writer for outputting data.
      * @throws HyracksDataException
      */
     public void parse(K key, V value, IFrameWriter writer) throws HyracksDataException;
@@ -44,7 +56,8 @@
      * This method is called in the close() of HDFSReadOperatorDescriptor.
      * 
      * @param writer
+     *            The hyracks writer for outputting data.
      * @throws HyracksDataException
      */
-    public void flush(IFrameWriter writer) throws HyracksDataException;
+    public void close(IFrameWriter writer) throws HyracksDataException;
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
index 6e943ad..7d6f868 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
@@ -18,6 +18,7 @@
 import java.io.Serializable;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * Users need to implement this interface to use the HDFSReadOperatorDescriptor.
@@ -36,6 +37,6 @@
      *            the IHyracksTaskContext
      * @return a key-value parser instance.
      */
-    public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx);
+    public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx) throws HyracksDataException;
 
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
index 25b9523..8e85627 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
@@ -26,6 +26,15 @@
 public interface ITupleWriter {
 
     /**
+     * Initialize the the tuple writer.
+     * 
+     * @param output
+     *            The channel for output data.
+     * @throws HyracksDataException
+     */
+    public void open(DataOutput output) throws HyracksDataException;
+
+    /**
      * Write the tuple to the DataOutput.
      * 
      * @param output
@@ -36,4 +45,13 @@
      */
     public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException;
 
+    /**
+     * Close the writer.
+     * 
+     * @param output
+     *            The channel for output data.
+     * @throws HyracksDataException
+     */
+    public void close(DataOutput output) throws HyracksDataException;
+
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
index 839de8f..9a025c2 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
@@ -17,14 +17,19 @@
 
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
 /**
  * Users need to implement this interface to use the HDFSWriteOperatorDescriptor.
  */
 public interface ITupleWriterFactory extends Serializable {
 
     /**
+     * @param ctx
+     *            the IHyracksTaskContext
      * @return a tuple writer instance
      */
-    public ITupleWriter getTupleWriter();
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException;
 
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index e924650..f49688b 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -102,6 +102,7 @@
                     JobConf conf = confFactory.getConf();
                     IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
                     writer.open();
+                    parser.open(writer);
                     InputFormat inputFormat = conf.getInputFormat();
                     for (int i = 0; i < inputSplits.length; i++) {
                         /**
@@ -131,7 +132,7 @@
                             }
                         }
                     }
-                    parser.flush(writer);
+                    parser.close(writer);
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index ff97a29..3ce6b2a 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -89,10 +89,11 @@
                 String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
                 String fileName = outputDirPath + File.separator + "part-" + partition;
 
-                tupleWriter = tupleWriterFactory.getTupleWriter();
+                tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
                 try {
                     FileSystem dfs = FileSystem.get(conf);
                     dos = dfs.create(new Path(fileName), true);
+                    tupleWriter.open(dos);
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
                 }
@@ -116,6 +117,7 @@
             @Override
             public void close() throws HyracksDataException {
                 try {
+                    tupleWriter.close(dos);
                     dos.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
index c691f5d..9574bb4 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
@@ -43,6 +43,11 @@
         return new IKeyValueParser<LongWritable, Text>() {
 
             @Override
+            public void open(IFrameWriter writer) {
+
+            }
+
+            @Override
             public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
                 tb.reset();
                 tb.addField(value.getBytes(), 0, value.getLength());
@@ -56,7 +61,7 @@
             }
 
             @Override
-            public void flush(IFrameWriter writer) throws HyracksDataException {
+            public void close(IFrameWriter writer) throws HyracksDataException {
                 FrameUtils.flushFrame(buffer, writer);
             }
 
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
index d26721d..0da14e5 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
@@ -17,6 +17,7 @@
 
 import java.io.DataOutput;
 
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
@@ -26,9 +27,14 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    public ITupleWriter getTupleWriter() {
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) {
         return new ITupleWriter() {
-            byte newLine = "\n".getBytes()[0];
+            private byte newLine = "\n".getBytes()[0];
+
+            @Override
+            public void open(DataOutput output) {
+
+            }
 
             @Override
             public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
@@ -43,6 +49,11 @@
                 }
             }
 
+            @Override
+            public void close(DataOutput output) {
+
+            }
+
         };
     }
 
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
index e7309d4..3f287cf 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
@@ -17,6 +17,7 @@
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -49,7 +50,7 @@
     private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
 
     /**
-     * The constructor of the scheduler
+     * The constructor of the scheduler.
      * 
      * @param ncNameToNcInfos
      * @throws HyracksException
@@ -64,12 +65,20 @@
         }
     }
 
+    /**
+     * The constructor of the scheduler.
+     * 
+     * @param ncNameToNcInfos the mapping from nc names to nc infos
+     * @throws HyracksException
+     */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
         loadIPAddressToNCMap(ncNameToNcInfos);
     }
 
     /**
-     * Set location constraints for a file scan operator with a list of file splits
+     * Set location constraints for a file scan operator with a list of file splits.
+     * It guarantees the maximum slots a machine can is at most one more than the minimum slots a
+     * machine can get.
      * 
      * @throws HyracksDataException
      */
@@ -77,93 +86,38 @@
         int[] capacity = new int[NCs.length];
         Arrays.fill(capacity, 0);
         String[] locations = new String[splits.length];
-        int slots = splits.length % capacity.length == 0 ? (splits.length / capacity.length) : (splits.length
+        /**
+         * upper bound number of slots that a machine can get
+         */
+        int upperBoundSlots = splits.length % capacity.length == 0 ? (splits.length / capacity.length) : (splits.length
                 / capacity.length + 1);
+        /**
+         * lower bound number of slots that a machine can get
+         */
+        int lowerBoundSlots = splits.length % capacity.length == 0 ? upperBoundSlots : upperBoundSlots - 1;
 
         try {
             Random random = new Random(System.currentTimeMillis());
             boolean scheduled[] = new boolean[splits.length];
             Arrays.fill(scheduled, false);
 
-            for (int i = 0; i < splits.length; i++) {
-                /**
-                 * get the location of all the splits
-                 */
-                String[] loc = splits[i].getLocations();
-                if (loc.length > 0) {
-                    for (int j = 0; j < loc.length; j++) {
-                        /**
-                         * get all the IP addresses from the name
-                         */
-                        InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
-                        /**
-                         * iterate overa all ips
-                         */
-                        for (InetAddress ip : allIps) {
-                            /**
-                             * if the node controller exists
-                             */
-                            if (ipToNcMapping.get(ip.getHostAddress()) != null) {
-                                /**
-                                 * set the ncs
-                                 */
-                                List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
-                                int arrayPos = random.nextInt(dataLocations.size());
-                                String nc = dataLocations.get(arrayPos);
-                                int pos = ncNameToIndex.get(nc);
-                                /**
-                                 * check if the node is already full
-                                 */
-                                if (capacity[pos] < slots) {
-                                    locations[i] = nc;
-                                    capacity[pos]++;
-                                    scheduled[i] = true;
-                                }
-                            }
-                        }
-
-                        /**
-                         * break the loop for data-locations if the schedule has already been found
-                         */
-                        if (scheduled[i] == true) {
-                            break;
-                        }
-                    }
-                }
-            }
+            /**
+             * push data-local lower-bounds slots to each machine
+             */
+            scheduleLocalSlots(splits, capacity, locations, lowerBoundSlots, random, scheduled);
+            /**
+             * push data-local upper-bounds slots to each machine
+             */
+            scheduleLocalSlots(splits, capacity, locations, upperBoundSlots, random, scheduled);
 
             /**
-             * find the lowest index the current available NCs
+             * push non-data-local lower-bounds slots to each machine
              */
-            int currentAvailableNC = 0;
-            for (int i = 0; i < capacity.length; i++) {
-                if (capacity[i] < slots) {
-                    currentAvailableNC = i;
-                    break;
-                }
-            }
-
+            scheduleNoLocalSlots(splits, capacity, locations, lowerBoundSlots, scheduled);
             /**
-             * schedule no-local file reads
+             * push non-data-local upper-bounds slots to each machine
              */
-            for (int i = 0; i < splits.length; i++) {
-                // if there is no data-local NC choice, choose a random one
-                if (!scheduled[i]) {
-                    locations[i] = NCs[currentAvailableNC];
-                    capacity[currentAvailableNC]++;
-                    scheduled[i] = true;
-
-                    /**
-                     * move the available NC cursor to the next one
-                     */
-                    for (int j = currentAvailableNC; j < capacity.length; j++) {
-                        if (capacity[j] < slots) {
-                            currentAvailableNC = j;
-                            break;
-                        }
-                    }
-                }
-            }
+            scheduleNoLocalSlots(splits, capacity, locations, upperBoundSlots, scheduled);
             return locations;
         } catch (IOException e) {
             throw new HyracksException(e);
@@ -171,6 +125,124 @@
     }
 
     /**
+     * Schedule non-local slots to each machine
+     * 
+     * @param splits
+     *            The HDFS file splits.
+     * @param capacity
+     *            The current capacity of each machine.
+     * @param locations
+     *            The result schedule.
+     * @param slots
+     *            The maximum slots of each machine.
+     * @param scheduled
+     *            Indicate which slot is scheduled.
+     */
+    private void scheduleNoLocalSlots(InputSplit[] splits, int[] capacity, String[] locations, int slots,
+            boolean[] scheduled) {
+        /**
+         * find the lowest index the current available NCs
+         */
+        int currentAvailableNC = 0;
+        for (int i = 0; i < capacity.length; i++) {
+            if (capacity[i] < slots) {
+                currentAvailableNC = i;
+                break;
+            }
+        }
+
+        /**
+         * schedule no-local file reads
+         */
+        for (int i = 0; i < splits.length; i++) {
+            // if there is no data-local NC choice, choose a random one
+            if (!scheduled[i]) {
+                locations[i] = NCs[currentAvailableNC];
+                capacity[currentAvailableNC]++;
+                scheduled[i] = true;
+
+                /**
+                 * move the available NC cursor to the next one
+                 */
+                for (int j = currentAvailableNC; j < capacity.length; j++) {
+                    if (capacity[j] < slots) {
+                        currentAvailableNC = j;
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Schedule data-local slots to each machine.
+     * 
+     * @param splits
+     *            The HDFS file splits.
+     * @param capacity
+     *            The current capacity of each machine.
+     * @param locations
+     *            The result schedule.
+     * @param slots
+     *            The maximum slots of each machine.
+     * @param random
+     *            The random generator.
+     * @param scheduled
+     *            Indicate which slot is scheduled.
+     * @throws IOException
+     * @throws UnknownHostException
+     */
+    private void scheduleLocalSlots(InputSplit[] splits, int[] capacity, String[] locations, int slots, Random random,
+            boolean[] scheduled) throws IOException, UnknownHostException {
+        for (int i = 0; i < splits.length; i++) {
+            /**
+             * get the location of all the splits
+             */
+            String[] loc = splits[i].getLocations();
+            if (loc.length > 0) {
+                for (int j = 0; j < loc.length; j++) {
+                    /**
+                     * get all the IP addresses from the name
+                     */
+                    InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
+                    /**
+                     * iterate overa all ips
+                     */
+                    for (InetAddress ip : allIps) {
+                        /**
+                         * if the node controller exists
+                         */
+                        if (ipToNcMapping.get(ip.getHostAddress()) != null) {
+                            /**
+                             * set the ncs
+                             */
+                            List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
+                            int arrayPos = random.nextInt(dataLocations.size());
+                            String nc = dataLocations.get(arrayPos);
+                            int pos = ncNameToIndex.get(nc);
+                            /**
+                             * check if the node is already full
+                             */
+                            if (capacity[pos] < slots) {
+                                locations[i] = nc;
+                                capacity[pos]++;
+                                scheduled[i] = true;
+                            }
+                        }
+                    }
+
+                    /**
+                     * break the loop for data-locations if the schedule has already been found
+                     */
+                    if (scheduled[i] == true) {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    /**
      * Load the IP-address-to-NC map from the NCNameToNCInfoMap
      * 
      * @param ncNameToNcInfos
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 90f5603..9e9abdf 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -139,8 +139,7 @@
                             /**
                              * read the split
                              */
-                            TaskAttemptContext context = ctxFactory.createContext(job.getConfiguration(),
-                                    inputSplits.get(i));
+                            TaskAttemptContext context = ctxFactory.createContext(job.getConfiguration(), i);
                             RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
                             reader.initialize(inputSplits.get(i), context);
                             while (reader.nextKeyValue() == true) {
@@ -148,7 +147,7 @@
                             }
                         }
                     }
-                    parser.flush(writer);
+                    parser.close(writer);
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 390a7b5..c1c227c 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -39,8 +39,8 @@
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
 
 /**
- * The HDFS file write operator using the Hadoop new API.
- * To use this operator, a user need to provide an ITupleWriterFactory.
+ * The HDFS file write operator using the Hadoop new API. To use this operator,
+ * a user need to provide an ITupleWriterFactory.
  */
 public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
@@ -88,10 +88,11 @@
                 String outputPath = FileOutputFormat.getOutputPath(conf).toString();
                 String fileName = outputPath + File.separator + "part-" + partition;
 
-                tupleWriter = tupleWriterFactory.getTupleWriter();
+                tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
                 try {
                     FileSystem dfs = FileSystem.get(conf.getConfiguration());
                     dos = dfs.create(new Path(fileName), true);
+                    tupleWriter.open(dos);
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
                 }
@@ -115,6 +116,7 @@
             @Override
             public void close() throws HyracksDataException {
                 try {
+                    tupleWriter.close(dos);
                     dos.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
index 3445d68..cb97ca1 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
@@ -15,18 +15,11 @@
 
 package edu.uci.ics.hyracks.hdfs2.scheduler;
 
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 
 import org.apache.hadoop.mapreduce.InputSplit;
 
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -35,16 +28,10 @@
  * The scheduler conduct data-local scheduling for data reading on HDFS.
  * This class works for Hadoop new API.
  */
+@SuppressWarnings("deprecation")
 public class Scheduler {
 
-    /** a list of NCs */
-    private String[] NCs;
-
-    /** a map from ip to NCs */
-    private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
-
-    /** a map from the NC name to the index */
-    private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
+    private edu.uci.ics.hyracks.hdfs.scheduler.Scheduler scheduler;
 
     /**
      * The constructor of the scheduler
@@ -53,17 +40,18 @@
      * @throws HyracksException
      */
     public Scheduler(String ipAddress, int port) throws HyracksException {
-        try {
-            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
-            Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
-            loadIPAddressToNCMap(ncNameToNcInfos);
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
+        scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ipAddress, port);
     }
 
+    /**
+     * The constructor of the scheduler.
+     * 
+     * @param ncNameToNcInfos
+     *            the mapping from nc names to nc infos
+     * @throws HyracksException
+     */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
-        loadIPAddressToNCMap(ncNameToNcInfos);
+        scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos);
     }
 
     /**
@@ -72,135 +60,11 @@
      * @throws HyracksDataException
      */
     public String[] getLocationConstraints(List<InputSplit> splits) throws HyracksException {
-        int[] capacity = new int[NCs.length];
-        Arrays.fill(capacity, 0);
-        String[] locations = new String[splits.size()];
-        int slots = splits.size() % capacity.length == 0 ? (splits.size() / capacity.length) : (splits.size()
-                / capacity.length + 1);
-
         try {
-            Random random = new Random(System.currentTimeMillis());
-            boolean scheduled[] = new boolean[splits.size()];
-            Arrays.fill(scheduled, false);
-
-            for (int i = 0; i < splits.size(); i++) {
-                /**
-                 * get the location of all the splits
-                 */
-                String[] loc = splits.get(i).getLocations();
-                if (loc.length > 0) {
-                    for (int j = 0; j < loc.length; j++) {
-                        /**
-                         * get all the IP addresses from the name
-                         */
-                        InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
-                        /**
-                         * iterate overa all ips
-                         */
-                        for (InetAddress ip : allIps) {
-                            /**
-                             * if the node controller exists
-                             */
-                            if (ipToNcMapping.get(ip.getHostAddress()) != null) {
-                                /**
-                                 * set the ncs
-                                 */
-                                List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
-                                int arrayPos = random.nextInt(dataLocations.size());
-                                String nc = dataLocations.get(arrayPos);
-                                int pos = ncNameToIndex.get(nc);
-                                /**
-                                 * check if the node is already full
-                                 */
-                                if (capacity[pos] < slots) {
-                                    locations[i] = nc;
-                                    capacity[pos]++;
-                                    scheduled[i] = true;
-                                }
-                            }
-                        }
-
-                        /**
-                         * break the loop for data-locations if the schedule has already been found
-                         */
-                        if (scheduled[i] == true) {
-                            break;
-                        }
-                    }
-                }
-            }
-
-            /**
-             * find the lowest index the current available NCs
-             */
-            int currentAvailableNC = 0;
-            for (int i = 0; i < capacity.length; i++) {
-                if (capacity[i] < slots) {
-                    currentAvailableNC = i;
-                    break;
-                }
-            }
-
-            /**
-             * schedule no-local file reads
-             */
-            for (int i = 0; i < splits.size(); i++) {
-                // if there is no data-local NC choice, choose a random one
-                if (!scheduled[i]) {
-                    locations[i] = NCs[currentAvailableNC];
-                    capacity[currentAvailableNC]++;
-                    scheduled[i] = true;
-
-                    /**
-                     * move the available NC cursor to the next one
-                     */
-                    for (int j = currentAvailableNC; j < capacity.length; j++) {
-                        if (capacity[j] < slots) {
-                            currentAvailableNC = j;
-                            break;
-                        }
-                    }
-                }
-            }
-            return locations;
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-
-    /**
-     * Load the IP-address-to-NC map from the NCNameToNCInfoMap
-     * 
-     * @param ncNameToNcInfos
-     * @throws HyracksException
-     */
-    private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
-        try {
-            NCs = new String[ncNameToNcInfos.size()];
-            int i = 0;
-
-            /**
-             * build the IP address to NC map
-             */
-            for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
-                String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
-                        .getHostAddress();
-                List<String> matchedNCs = ipToNcMapping.get(ipAddr);
-                if (matchedNCs == null) {
-                    matchedNCs = new ArrayList<String>();
-                    ipToNcMapping.put(ipAddr, matchedNCs);
-                }
-                matchedNCs.add(entry.getKey());
-                NCs[i] = entry.getKey();
-                i++;
-            }
-
-            /**
-             * set up the NC name to index mapping
-             */
-            for (i = 0; i < NCs.length; i++) {
-                ncNameToIndex.put(NCs[i], i);
-            }
+            org.apache.hadoop.mapred.InputSplit[] inputSplits = new org.apache.hadoop.mapred.InputSplit[splits.size()];
+            for (int i = 0; i < inputSplits.length; i++)
+                inputSplits[i] = new WrappedFileSplit(splits.get(i).getLocations(), splits.get(i).getLength());
+            return scheduler.getLocationConstraints(inputSplits);
         } catch (Exception e) {
             throw new HyracksException(e);
         }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/WrappedFileSplit.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/WrappedFileSplit.java
new file mode 100644
index 0000000..1deb469
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/WrappedFileSplit.java
@@ -0,0 +1,51 @@
+package edu.uci.ics.hyracks.hdfs2.scheduler;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+/**
+ * The wrapped implementation of InputSplit, for the new API scheduler
+ * to reuse the old API scheduler
+ */
+@SuppressWarnings("deprecation")
+public class WrappedFileSplit implements InputSplit {
+
+    private String[] locations;
+    private long length;
+
+    public WrappedFileSplit(String[] locations, long length) {
+        this.locations = locations;
+        this.length = length;
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        int len = input.readInt();
+        locations = new String[len];
+        for (int i = 0; i < len; i++)
+            locations[i] = input.readUTF();
+        length = input.readLong();
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        output.write(locations.length);
+        for (int i = 0; i < locations.length; i++)
+            output.writeUTF(locations[i]);
+        output.writeLong(length);
+    }
+
+    @Override
+    public long getLength() throws IOException {
+        return length;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+        return locations;
+    }
+
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
index 0087307..eccd5ee 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -223,8 +223,8 @@
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
 
-        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc3", "nc4", "nc2",
-                "nc4", "nc5", "nc5" };
+        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
+                "nc5", "nc5", "nc6" };
 
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
index 79f0874..df3feb2 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -34,7 +34,6 @@
 
 /**
  * Test case for the new HDFS API scheduler
- * 
  */
 public class SchedulerTest extends TestCase {
 
@@ -228,8 +227,8 @@
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
 
-        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc3", "nc4", "nc2",
-                "nc4", "nc5", "nc5" };
+        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
+                "nc5", "nc5", "nc6" };
 
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);
