add generalized hdfs read/writer operators

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@2790 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs-scheduler/pom.xml b/hyracks/hyracks-hdfs-scheduler/pom.xml
index 09de5f7..061f430 100644
--- a/hyracks/hyracks-hdfs-scheduler/pom.xml
+++ b/hyracks/hyracks-hdfs-scheduler/pom.xml
@@ -50,7 +50,19 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-api</artifactId>
-			<version>0.2.1</version>
+			<version>0.2.3-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-dataflow-std</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-dataflow-common</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
diff --git a/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
new file mode 100644
index 0000000..56e6a36
--- /dev/null
+++ b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs.api;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IKeyValueParser<K, V> {
+
+    public void parse(K key, V value, IFrameWriter writer) throws HyracksDataException;
+}
diff --git a/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
new file mode 100644
index 0000000..d935f1b
--- /dev/null
+++ b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs.api;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public interface IKeyValueParserFactory<K, V> {
+
+    public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx);
+
+}
diff --git a/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
new file mode 100644
index 0000000..8a4226b
--- /dev/null
+++ b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs.api;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public interface ITupleWriter {
+
+    public void write(DataOutput output, ITupleReference tuple);
+
+}
diff --git a/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
new file mode 100644
index 0000000..dc0cffb
--- /dev/null
+++ b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs.api;
+
+public interface ITupleWriterFactory {
+
+    public ITupleWriter getTupleWriter();
+
+}
diff --git a/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java
new file mode 100644
index 0000000..4fa0164
--- /dev/null
+++ b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.hyracks.hdfs.dataflow;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.Serializable;
+
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+@SuppressWarnings("deprecation")
+public class ConfFactory implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private byte[] confBytes;
+
+    public ConfFactory(JobConf conf) throws HyracksDataException {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(bos);
+            conf.write(dos);
+            confBytes = bos.toByteArray();
+            dos.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public JobConf getConf() throws HyracksDataException {
+        try {
+            JobConf conf = new JobConf();
+            DataInputStream dis = new DataInputStream(new ByteArrayInputStream(confBytes));
+            conf.readFields(dis);
+            dis.close();
+            return conf;
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
diff --git a/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
new file mode 100644
index 0000000..b951efa
--- /dev/null
+++ b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs.dataflow;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+@SuppressWarnings({ "deprecation", "rawtypes" })
+public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private ConfFactory confFactory;
+    private InputSplitsFactory splitsFactory;
+    private String[] scheduledLocations;
+    private IKeyValueParserFactory tupleParserFactory;
+
+    public HDFSReadOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, JobConf conf,
+            String[] scheduledLocations, IKeyValueParserFactory tupleParserFactory) throws HyracksException {
+        super(spec, 0, 1);
+        InputFormat inputFormat = conf.getInputFormat();
+        try {
+            InputSplit[] splits = inputFormat.getSplits(conf, 10);
+            this.splitsFactory = new InputSplitsFactory(splits);
+            this.confFactory = new ConfFactory(conf);
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+        this.scheduledLocations = scheduledLocations;
+        this.tupleParserFactory = tupleParserFactory;
+        this.recordDescriptors[0] = rd;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+            throws HyracksDataException {
+        final InputSplit[] inputSplits = splitsFactory.getSplits();
+        final JobConf conf = confFactory.getConf();
+
+        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+            private String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public void initialize() throws HyracksDataException {
+                try {
+                    IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
+                    writer.open();
+                    InputFormat inputFormat = conf.getInputFormat();
+                    for (int i = 0; i < inputSplits.length; i++) {
+                        /**
+                         * read all the partitions scheduled to the current node
+                         */
+                        if (scheduledLocations[i].equals(nodeName)) {
+                            RecordReader reader = inputFormat.getRecordReader(inputSplits[i], conf, null);
+                            Object key = reader.createKey();
+                            Object value = reader.createValue();
+                            while (reader.next(key, value) == true) {
+                                parser.parse(key, value, writer);
+                            }
+                        }
+                    }
+                    writer.close();
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
+}
diff --git a/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
new file mode 100644
index 0000000..63b96f4
--- /dev/null
+++ b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs.dataflow;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private String outputDirPath;
+    private ITupleWriterFactory tupleWriterFactory;
+
+    public HDFSWriteOperatorDescriptor(JobSpecification spec, String outputDirPath,
+            ITupleWriterFactory tupleWriterFactory) throws HyracksException {
+        super(spec, 1, 0);
+        this.outputDirPath = outputDirPath;
+        this.tupleWriterFactory = tupleWriterFactory;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+            throws HyracksDataException {
+
+        return new AbstractUnaryInputSinkOperatorNodePushable() {
+
+            private String fileName = outputDirPath + File.separator + "part-" + partition;
+            private DataOutputStream dos;
+            private RecordDescriptor inputRd;
+            private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
+            private FrameTupleReference tuple = new FrameTupleReference();
+            private ITupleWriter tupleWriter;
+
+            @Override
+            public void open() throws HyracksDataException {
+                inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+                tupleWriter = tupleWriterFactory.getTupleWriter();
+                try {
+                    dos = new DataOutputStream(new FileOutputStream(fileName));
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                accessor.reset(buffer);
+                int tupleCount = accessor.getTupleCount();
+                for (int i = 0; i < tupleCount; i++) {
+                    tuple.reset(accessor, i);
+                    tupleWriter.write(dos, tuple);
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                throw new HyracksDataException("HDFS operator fails!");
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                try {
+                    dos.close();
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+        };
+    }
+}
diff --git a/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
new file mode 100644
index 0000000..8c7fea1
--- /dev/null
+++ b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs.dataflow;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+@SuppressWarnings({ "deprecation", "rawtypes" })
+public class InputSplitsFactory implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private byte[] splitBytes;
+    private String splitClassName;
+
+    public InputSplitsFactory(InputSplit[] splits) throws HyracksDataException {
+        splitBytes = splitsToBytes(splits);
+        if (splits.length > 0) {
+            splitClassName = splits[0].getClass().getName();
+        }
+    }
+
+    public InputSplit[] getSplits() throws HyracksDataException {
+        return bytesToSplits(splitBytes);
+    }
+
+    /**
+     * Convert splits to bytes.
+     * 
+     * @param splits
+     *            input splits
+     * @return bytes which serialize the splits
+     * @throws IOException
+     */
+    private byte[] splitsToBytes(InputSplit[] splits) throws HyracksDataException {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(bos);
+            dos.writeInt(splits.length);
+            for (int i = 0; i < splits.length; i++) {
+                splits[i].write(dos);
+            }
+            dos.close();
+            return bos.toByteArray();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    /**
+     * Covert bytes to splits.
+     * 
+     * @param bytes
+     * @return
+     * @throws HyracksDataException
+     */
+    private InputSplit[] bytesToSplits(byte[] bytes) throws HyracksDataException {
+        try {
+            ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+            DataInputStream dis = new DataInputStream(bis);
+            int size = dis.readInt();
+            InputSplit[] splits = new InputSplit[size];
+            for (int i = 0; i < size; i++) {
+                Class splitNameClass = Class.forName(splitClassName);
+                splits[i] = (InputSplit) splitNameClass.newInstance();
+                splits[i].readFields(dis);
+            }
+            dis.close();
+            return splits;
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
diff --git a/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uc/ics/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
similarity index 97%
rename from hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uc/ics/hyracks/hdfs/scheduler/Scheduler.java
rename to hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
index 83e4269..ca075d3 100644
--- a/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uc/ics/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/Scheduler.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uc.ics.hyracks.hdfs.scheduler;
+package edu.uci.ics.hyracks.hdfs.scheduler;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -76,9 +76,9 @@
      * @param inputFormatClass
      * @throws HyracksException
      */
-    public String[] getLocationConstraints(JobConf conf, Class<InputFormat> inputFormatClass) throws HyracksException {
+    public String[] getLocationConstraints(JobConf conf) throws HyracksException {
         try {
-            InputFormat inputFormat = inputFormatClass.newInstance();
+            InputFormat inputFormat = conf.getInputFormat();
             InputSplit[] splits = inputFormat.getSplits(conf, NCs.length);
             return getLocationConstraints(splits);
         } catch (Exception e) {
diff --git a/hyracks/hyracks-hdfs-scheduler/src/test/java/edu/uc/ics/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs-scheduler/src/test/java/edu/uc/ics/hyracks/hdfs/scheduler/SchedulerTest.java
index ffc6e05..dd0daff 100644
--- a/hyracks/hyracks-hdfs-scheduler/src/test/java/edu/uc/ics/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs-scheduler/src/test/java/edu/uc/ics/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -29,6 +29,7 @@
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.client.NodeStatus;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 @SuppressWarnings("deprecation")
 public class SchedulerTest extends TestCase {