add tests for Hadoop new API code path

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@2817 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java
index 4fa0164..aa75019 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java
@@ -11,7 +11,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 @SuppressWarnings("deprecation")
-public class ConfFactory implements Serializable {
+class ConfFactory implements Serializable {
     private static final long serialVersionUID = 1L;
     private byte[] confBytes;
 
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index bc44db8..a0c821a3 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -35,6 +35,11 @@
 import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
 import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
 
+/**
+ * The HDFS file read operator using the Hadoop old API.
+ * To use this operator, a user need to provide an IKeyValueParserFactory implementation which convert
+ * key-value pairs into tuples.
+ */
 @SuppressWarnings({ "deprecation", "rawtypes" })
 public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index 8fac758..e29848c 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -38,6 +38,10 @@
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
 
+/**
+ * The HDFS file write operator using the Hadoop old API.
+ * To use this operator, a user need to provide an ITupleWriterFactory.
+ */
 @SuppressWarnings("deprecation")
 public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
index 9cc9ebc..e7227d2 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
@@ -28,7 +28,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 @SuppressWarnings({ "deprecation", "rawtypes" })
-public class InputSplitsFactory implements Serializable {
+class InputSplitsFactory implements Serializable {
 
     private static final long serialVersionUID = 1L;
     private byte[] splitBytes;
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
index 9fa74fa..e7309d4 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
@@ -33,7 +33,8 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 
 /**
- * The scheduler conduct data-local scheduling for data on HDFS
+ * The scheduler conduct data-local scheduling for data reading on HDFS.
+ * This class works for Hadoop old API.
  */
 @SuppressWarnings("deprecation")
 public class Scheduler {
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/FileSplitsFactory.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/FileSplitsFactory.java
new file mode 100644
index 0000000..01fc2f3
--- /dev/null
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/FileSplitsFactory.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs2.dataflow;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+@SuppressWarnings("rawtypes")
+class FileSplitsFactory implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private byte[] splitBytes;
+    private String splitClassName;
+
+    public FileSplitsFactory(List<FileSplit> splits) throws HyracksDataException {
+        splitBytes = splitsToBytes(splits);
+        if (splits.size() > 0) {
+            splitClassName = splits.get(0).getClass().getName();
+        }
+    }
+
+    public List<FileSplit> getSplits() throws HyracksDataException {
+        return bytesToSplits(splitBytes);
+    }
+
+    /**
+     * Convert splits to bytes.
+     * 
+     * @param splits
+     *            input splits
+     * @return bytes which serialize the splits
+     * @throws IOException
+     */
+    private byte[] splitsToBytes(List<FileSplit> splits) throws HyracksDataException {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(bos);
+            dos.writeInt(splits.size());
+            int size = splits.size();
+            for (int i = 0; i < size; i++) {
+                splits.get(i).write(dos);
+            }
+            dos.close();
+            return bos.toByteArray();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    /**
+     * Covert bytes to splits.
+     * 
+     * @param bytes
+     * @return
+     * @throws HyracksDataException
+     */
+    private List<FileSplit> bytesToSplits(byte[] bytes) throws HyracksDataException {
+        try {
+            Class splitClass = Class.forName(splitClassName);
+            Constructor[] constructors = splitClass.getDeclaredConstructors();
+            Constructor defaultConstructor = null;
+            for (Constructor constructor : constructors) {
+                if (constructor.getParameterTypes().length == 0) {
+                    constructor.setAccessible(true);
+                    defaultConstructor = constructor;
+                }
+            }
+            ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+            DataInputStream dis = new DataInputStream(bis);
+            int size = dis.readInt();
+            List<FileSplit> splits = new ArrayList<FileSplit>();
+            for (int i = 0; i < size; i++) {
+                splits.add((FileSplit) defaultConstructor.newInstance());
+                splits.get(i).readFields(dis);
+            }
+            dis.close();
+            return splits;
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 54d60c0..22559e6 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -15,7 +15,9 @@
 
 package edu.uci.ics.hyracks.hdfs2.dataflow;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -23,6 +25,7 @@
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -37,12 +40,17 @@
 import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
 import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
 
+/**
+ * The HDFS file read operator using the Hadoop new API.
+ * To use this operator, a user need to provide an IKeyValueParserFactory implementation which convert
+ * key-value pairs into tuples.
+ */
 @SuppressWarnings("rawtypes")
 public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
     private final ConfFactory confFactory;
-    private final InputSplit[] inputSplits;
+    private final FileSplitsFactory splitsFactory;
     private final String[] scheduledLocations;
     private final IKeyValueParserFactory tupleParserFactory;
     private final boolean[] executed;
@@ -65,11 +73,15 @@
      *            the ITupleParserFactory implementation instance.
      * @throws HyracksException
      */
-    public HDFSReadOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, Job conf, InputSplit[] splits,
+    public HDFSReadOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, Job conf, List<InputSplit> splits,
             String[] scheduledLocations, IKeyValueParserFactory tupleParserFactory) throws HyracksException {
         super(spec, 0, 1);
         try {
-            this.inputSplits = splits;
+            List<FileSplit> fileSplits = new ArrayList<FileSplit>();
+            for (int i = 0; i < splits.size(); i++) {
+                fileSplits.add((FileSplit) splits.get(i));
+            }
+            this.splitsFactory = new FileSplitsFactory(fileSplits);
             this.confFactory = new ConfFactory(conf);
         } catch (Exception e) {
             throw new HyracksException(e);
@@ -86,6 +98,7 @@
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
             throws HyracksDataException {
         final Job conf = confFactory.getConf();
+        final List<FileSplit> inputSplits = splitsFactory.getSplits();
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
             private String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
@@ -99,7 +112,8 @@
                     writer.open();
                     InputFormat inputFormat = ReflectionUtils.newInstance(conf.getInputFormatClass(),
                             conf.getConfiguration());
-                    for (int i = 0; i < inputSplits.length; i++) {
+                    int size = inputSplits.size();
+                    for (int i = 0; i < size; i++) {
                         /**
                          * read all the partitions scheduled to the current node
                          */
@@ -122,8 +136,8 @@
                              */
                             TaskAttemptContext context = new TaskAttemptContext(conf.getConfiguration(),
                                     new TaskAttemptID());
-                            RecordReader reader = inputFormat.createRecordReader(inputSplits[i], context);
-                            reader.initialize(inputSplits[i], context);
+                            RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
+                            reader.initialize(inputSplits.get(i), context);
                             while (reader.nextKeyValue() == true) {
                                 parser.parse(reader.getCurrentKey(), reader.getCurrentValue(), writer);
                             }
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 441815b..32bb9dc 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -22,6 +22,9 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
@@ -37,12 +40,15 @@
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
 
+/**
+ * The HDFS file write operator using the Hadoop new API.
+ * To use this operator, a user need to provide an ITupleWriterFactory.
+ */
 public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
     private ConfFactory confFactory;
     private ITupleWriterFactory tupleWriterFactory;
-    private String outputPath;
 
     /**
      * The constructor of HDFSWriteOperatorDescriptor.
@@ -55,12 +61,11 @@
      *            the ITupleWriterFactory implementation object
      * @throws HyracksException
      */
-    public HDFSWriteOperatorDescriptor(JobSpecification spec, Job conf, String outputPath,
-            ITupleWriterFactory tupleWriterFactory) throws HyracksException {
+    public HDFSWriteOperatorDescriptor(JobSpecification spec, Job conf, ITupleWriterFactory tupleWriterFactory)
+            throws HyracksException {
         super(spec, 1, 0);
         this.confFactory = new ConfFactory(conf);
         this.tupleWriterFactory = tupleWriterFactory;
-        this.outputPath = outputPath;
     }
 
     @Override
@@ -68,6 +73,8 @@
             final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
             throws HyracksDataException {
         final Job conf = confFactory.getConf();
+        final String outputPath = FileOutputFormat.getOutputPath(new JobContext(conf.getConfiguration(), new JobID()))
+                .toString();
 
         return new AbstractUnaryInputSinkOperatorNodePushable() {
 
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
index 5b70a4e..3445d68 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
@@ -32,7 +32,8 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 
 /**
- * The scheduler conduct data-local scheduling for data on HDFS
+ * The scheduler conduct data-local scheduling for data reading on HDFS.
+ * This class works for Hadoop new API.
  */
 public class Scheduler {
 
diff --git a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java b/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
index 23e1344..2686077 100644
--- a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
+++ b/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
@@ -57,7 +57,8 @@
 import edu.uci.ics.hyracks.hdfs.utils.TestUtils;
 
 /**
- * Test the edu.uci.ics.hyracks.hdfs.dataflow package
+ * Test the edu.uci.ics.hyracks.hdfs.dataflow package,
+ * the operators for the Hadoop old API.
  */
 @SuppressWarnings({ "deprecation" })
 public class DataflowTest extends TestCase {
diff --git a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java b/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
new file mode 100644
index 0000000..508ba07
--- /dev/null
+++ b/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs2.dataflow;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
+import edu.uci.ics.hyracks.hdfs.lib.RawBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.hdfs.lib.TextKeyValueParserFactory;
+import edu.uci.ics.hyracks.hdfs.lib.TextTupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
+import edu.uci.ics.hyracks.hdfs.utils.TestUtils;
+import edu.uci.ics.hyracks.hdfs2.scheduler.Scheduler;
+
+/**
+ * Test the edu.uci.ics.hyracks.hdfs2.dataflow package,
+ * the operators for the Hadoop new API.
+ */
+public class DataflowTest extends TestCase {
+
+    private static final String ACTUAL_RESULT_DIR = "actual";
+    private static final String EXPECTED_RESULT_PATH = "src/test/resources/expected";
+    private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+
+    private static final String DATA_PATH = "src/test/resources/data/customer.tbl";
+    private static final String HDFS_INPUT_PATH = "/customer/";
+    private static final String HDFS_OUTPUT_PATH = "/customer_result/";
+
+    private static final String HYRACKS_APP_NAME = "DataflowTest";
+    private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+    private MiniDFSCluster dfsCluster;
+
+    private Job conf;
+    private int numberOfNC = 2;
+
+    @Override
+    public void setUp() throws Exception {
+        conf = new Job();
+        cleanupStores();
+        HyracksUtils.init();
+        HyracksUtils.createApp(HYRACKS_APP_NAME);
+        FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+        startHDFS();
+    }
+
+    private void cleanupStores() throws IOException {
+        FileUtils.forceMkdir(new File("teststore"));
+        FileUtils.forceMkdir(new File("build"));
+        FileUtils.cleanDirectory(new File("teststore"));
+        FileUtils.cleanDirectory(new File("build"));
+    }
+
+    /**
+     * Start the HDFS cluster and setup the data files
+     * 
+     * @throws IOException
+     */
+    private void startHDFS() throws IOException {
+        conf.getConfiguration().addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+        conf.getConfiguration().addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+        conf.getConfiguration().addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        lfs.delete(new Path("build"), true);
+        System.setProperty("hadoop.log.dir", "logs");
+        dfsCluster = new MiniDFSCluster(conf.getConfiguration(), numberOfNC, true, null);
+        FileSystem dfs = FileSystem.get(conf.getConfiguration());
+        Path src = new Path(DATA_PATH);
+        Path dest = new Path(HDFS_INPUT_PATH);
+        Path result = new Path(HDFS_OUTPUT_PATH);
+        dfs.mkdirs(dest);
+        dfs.mkdirs(result);
+        dfs.copyFromLocalFile(src, dest);
+
+        DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+        conf.getConfiguration().writeXml(confOutput);
+        confOutput.flush();
+        confOutput.close();
+    }
+
+    /**
+     * Test a job with only HDFS read and writes.
+     * 
+     * @throws Exception
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public void testHDFSReadWriteOperators() throws Exception {
+        FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
+        FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
+        conf.setInputFormatClass(TextInputFormat.class);
+
+        Scheduler scheduler = new Scheduler(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
+        InputFormat inputFormat = ReflectionUtils.newInstance(conf.getInputFormatClass(), conf.getConfiguration());
+        List<InputSplit> splits = inputFormat.getSplits(new JobContext(conf.getConfiguration(), new JobID()));
+
+        String[] readSchedule = scheduler.getLocationConstraints(splits);
+        JobSpecification jobSpec = new JobSpecification();
+        RecordDescriptor recordDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+
+        String[] locations = new String[] { HyracksUtils.NC1_ID, HyracksUtils.NC1_ID, HyracksUtils.NC2_ID,
+                HyracksUtils.NC2_ID };
+        HDFSReadOperatorDescriptor readOperator = new HDFSReadOperatorDescriptor(jobSpec, recordDesc, conf, splits,
+                readSchedule, new TextKeyValueParserFactory());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, locations);
+
+        ExternalSortOperatorDescriptor sortOperator = new ExternalSortOperatorDescriptor(jobSpec, 10, new int[] { 0 },
+                new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, recordDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, sortOperator, locations);
+
+        HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(jobSpec, conf,
+                new TextTupleWriterFactory());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, HyracksUtils.NC1_ID);
+
+        jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), readOperator, 0, sortOperator, 0);
+        jobSpec.connect(new MToNPartitioningMergingConnectorDescriptor(jobSpec, new FieldHashPartitionComputerFactory(
+                new int[] { 0 }, new IBinaryHashFunctionFactory[] { RawBinaryHashFunctionFactory.INSTANCE }),
+                new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }), sortOperator,
+                0, writeOperator, 0);
+        jobSpec.addRoot(writeOperator);
+
+        IHyracksClientConnection client = new HyracksConnection(HyracksUtils.CC_HOST,
+                HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
+        JobId jobId = client.startJob(HYRACKS_APP_NAME, jobSpec);
+        client.waitForCompletion(jobId);
+
+        Assert.assertEquals(true, checkResults());
+    }
+
+    /**
+     * Check if the results are correct
+     * 
+     * @return true if correct
+     * @throws Exception
+     */
+    private boolean checkResults() throws Exception {
+        FileSystem dfs = FileSystem.get(conf.getConfiguration());
+        Path result = new Path(HDFS_OUTPUT_PATH);
+        Path actual = new Path(ACTUAL_RESULT_DIR);
+        dfs.copyToLocalFile(result, actual);
+
+        TestUtils.compareWithResult(new File(EXPECTED_RESULT_PATH + File.separator + "part-0"), new File(
+                ACTUAL_RESULT_DIR + File.separator + "customer_result" + File.separator + "part-0"));
+        return true;
+    }
+
+    /**
+     * cleanup hdfs cluster
+     */
+    private void cleanupHDFS() throws Exception {
+        dfsCluster.shutdown();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        HyracksUtils.destroyApp(HYRACKS_APP_NAME);
+        HyracksUtils.deinit();
+        cleanupHDFS();
+    }
+
+}
diff --git a/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
new file mode 100644
index 0000000..ea2af13
--- /dev/null
+++ b/hyracks/hyracks-hdfs/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs2.scheduler;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.client.NodeStatus;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+
+/**
+ * Test case for the new HDFS API scheduler
+ * 
+ */
+public class SchedulerTest extends TestCase {
+
+    /**
+     * Test the scheduler for the case when the Hyracks cluster is the HDFS cluster
+     * 
+     * @throws Exception
+     */
+    public void testSchedulerSimple() throws Exception {
+        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
+        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.1").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.2").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.3").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.4").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.5").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.6").getAddress(), 5099)));
+
+        List<InputSplit> fileSplits = new ArrayList<InputSplit>();
+        fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
+        fileSplits.add(new FileSplit(new Path("part-2"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" }));
+        fileSplits.add(new FileSplit(new Path("part-3"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" }));
+        fileSplits.add(new FileSplit(new Path("part-4"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.6" }));
+        fileSplits.add(new FileSplit(new Path("part-5"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" }));
+        fileSplits.add(new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" }));
+
+        Scheduler scheduler = new Scheduler(ncNameToNcInfos);
+        String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
+
+        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc5", "nc6" };
+
+        for (int i = 0; i < locationConstraints.length; i++) {
+            Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+        }
+    }
+
+    /**
+     * Test the case where the HDFS cluster is a larger than the Hyracks cluster
+     * 
+     * @throws Exception
+     */
+    public void testSchedulerLargerHDFS() throws Exception {
+        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
+        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.1").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.2").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.3").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.4").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.5").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.6").getAddress(), 5099)));
+
+        List<InputSplit> fileSplits = new ArrayList<InputSplit>();
+        fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
+        fileSplits.add(new FileSplit(new Path("part-2"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" }));
+        fileSplits.add(new FileSplit(new Path("part-3"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" }));
+        fileSplits.add(new FileSplit(new Path("part-4"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.6" }));
+        fileSplits.add(new FileSplit(new Path("part-5"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" }));
+        fileSplits.add(new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" }));
+        fileSplits.add(new FileSplit(new Path("part-7"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
+        fileSplits.add(new FileSplit(new Path("part-8"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" }));
+        fileSplits.add(new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" }));
+        fileSplits.add(new FileSplit(new Path("part-10"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.6" }));
+        fileSplits.add(new FileSplit(new Path("part-11"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.7" }));
+        fileSplits.add(new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" }));
+
+        Scheduler scheduler = new Scheduler(ncNameToNcInfos);
+        String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
+
+        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
+                "nc6", "nc5" };
+
+        for (int i = 0; i < locationConstraints.length; i++) {
+            Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+        }
+    }
+
+    /**
+     * Test the case where the HDFS cluster is a larger than the Hyracks cluster
+     * 
+     * @throws Exception
+     */
+    public void testSchedulerSmallerHDFS() throws Exception {
+        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
+        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.1").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.2").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.3").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.4").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.5").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.6").getAddress(), 5099)));
+
+        List<InputSplit> fileSplits = new ArrayList<InputSplit>();
+        fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
+        fileSplits.add(new FileSplit(new Path("part-2"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" }));
+        fileSplits.add(new FileSplit(new Path("part-3"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.3" }));
+        fileSplits.add(new FileSplit(new Path("part-4"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.3" }));
+        fileSplits.add(new FileSplit(new Path("part-5"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" }));
+        fileSplits.add(new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" }));
+        fileSplits.add(new FileSplit(new Path("part-7"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
+        fileSplits.add(new FileSplit(new Path("part-8"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" }));
+        fileSplits.add(new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.1" }));
+        fileSplits.add(new FileSplit(new Path("part-10"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.2" }));
+        fileSplits.add(new FileSplit(new Path("part-11"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" }));
+        fileSplits.add(new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" }));
+
+        Scheduler scheduler = new Scheduler(ncNameToNcInfos);
+        String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
+
+        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
+                "nc5", "nc6" };
+
+        for (int i = 0; i < locationConstraints.length; i++) {
+            Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+        }
+    }
+
+    /**
+     * Test the case where the HDFS cluster is a larger than the Hyracks cluster
+     * 
+     * @throws Exception
+     */
+    public void testSchedulerSmallerHDFSOdd() throws Exception {
+        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
+        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.1").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.2").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.3").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.4").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.5").getAddress(), 5099)));
+        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
+                .getByName("10.0.0.6").getAddress(), 5099)));
+
+        List<InputSplit> fileSplits = new ArrayList<InputSplit>();
+        fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
+        fileSplits.add(new FileSplit(new Path("part-2"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" }));
+        fileSplits.add(new FileSplit(new Path("part-3"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.3" }));
+        fileSplits.add(new FileSplit(new Path("part-4"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.3" }));
+        fileSplits.add(new FileSplit(new Path("part-5"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" }));
+        fileSplits.add(new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" }));
+        fileSplits.add(new FileSplit(new Path("part-7"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
+        fileSplits.add(new FileSplit(new Path("part-8"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" }));
+        fileSplits.add(new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.1" }));
+        fileSplits.add(new FileSplit(new Path("part-10"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.2" }));
+        fileSplits.add(new FileSplit(new Path("part-11"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" }));
+        fileSplits.add(new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" }));
+        fileSplits.add(new FileSplit(new Path("part-13"), 0, 0, new String[] { "10.0.0.2", "10.0.0.4", "10.0.0.5" }));
+
+        Scheduler scheduler = new Scheduler(ncNameToNcInfos);
+        String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
+
+        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc3", "nc4", "nc2",
+                "nc4", "nc5", "nc5" };
+
+        for (int i = 0; i < locationConstraints.length; i++) {
+            Assert.assertEquals(locationConstraints[i], expectedResults[i]);
+        }
+    }
+
+}