Refactored hadoop operators out of dataflow-std

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@59 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-hadoop/.classpath b/hyracks-dataflow-hadoop/.classpath
new file mode 100644
index 0000000..1f3c1ff
--- /dev/null
+++ b/hyracks-dataflow-hadoop/.classpath
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+	<classpathentry kind="src" output="target/classes" path="src/main/java"/>
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+	<classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks-dataflow-hadoop/.project b/hyracks-dataflow-hadoop/.project
new file mode 100644
index 0000000..d6edecf
--- /dev/null
+++ b/hyracks-dataflow-hadoop/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+	<name>hyracks-dataflow-hadoop</name>
+	<comment></comment>
+	<projects>
+	</projects>
+	<buildSpec>
+		<buildCommand>
+			<name>org.eclipse.jdt.core.javabuilder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+		<buildCommand>
+			<name>org.maven.ide.eclipse.maven2Builder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+	</buildSpec>
+	<natures>
+		<nature>org.maven.ide.eclipse.maven2Nature</nature>
+		<nature>org.eclipse.jdt.core.javanature</nature>
+	</natures>
+</projectDescription>
diff --git a/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs b/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..1a27838
--- /dev/null
+++ b/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Fri Aug 13 04:07:04 PDT 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
+org.eclipse.jdt.core.compiler.compliance=1.6
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks-dataflow-hadoop/.settings/org.maven.ide.eclipse.prefs b/hyracks-dataflow-hadoop/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..9e0c887
--- /dev/null
+++ b/hyracks-dataflow-hadoop/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Fri Aug 13 04:07:00 PDT 2010
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
diff --git a/hyracks-dataflow-hadoop/pom.xml b/hyracks-dataflow-hadoop/pom.xml
new file mode 100644
index 0000000..f277575
--- /dev/null
+++ b/hyracks-dataflow-hadoop/pom.xml
@@ -0,0 +1,54 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks</groupId>
+  <artifactId>hyracks-dataflow-hadoop</artifactId>
+  <version>0.1.0</version>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-api</artifactId>
+  		<version>0.1.0</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-dataflow-common</artifactId>
+  		<version>0.1.0</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>org.apache.hadoop</groupId>
+  		<artifactId>hadoop-core</artifactId>
+  		<version>0.20.2</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.dcache</groupId>
+  		<artifactId>dcache-client</artifactId>
+  		<version>0.0.1</version>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-dataflow-std</artifactId>
+  		<version>0.1.0</version>
+  		<scope>compile</scope>
+  	</dependency>
+  </dependencies>
+</project>
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopFileScanOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
new file mode 100644
index 0000000..4acc046
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop;
+
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.HadoopFileSplit;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
+import edu.uci.ics.hyracks.dataflow.std.file.IRecordReader;
+import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
+
+public abstract class AbstractHadoopFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    protected HadoopFileSplit[] splits;
+
+    public AbstractHadoopFileScanOperatorDescriptor(JobSpecification spec, HadoopFileSplit[] splits,
+            RecordDescriptor recordDescriptor) {
+        super(spec, 0, 1);
+        recordDescriptors[0] = recordDescriptor;
+        this.splits = splits;
+    }
+
+    protected abstract IRecordReader createRecordReader(HadoopFileSplit fileSplit, RecordDescriptor desc)
+            throws Exception;
+
+    protected class FileScanOperator implements IOpenableDataWriterOperator {
+        private IOpenableDataWriter<Object[]> writer;
+        private int index;
+
+        FileScanOperator(int index) {
+            this.index = index;
+        }
+
+        @Override
+        public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
+            if (index != 0) {
+                throw new IndexOutOfBoundsException("Invalid index: " + index);
+            }
+            this.writer = writer;
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            HadoopFileSplit split = splits[index];
+            RecordDescriptor desc = recordDescriptors[0];
+            try {
+                IRecordReader reader = createRecordReader(split, desc);
+                if (desc == null) {
+                    desc = recordDescriptors[0];
+                }
+                writer.open();
+                try {
+                    while (true) {
+                        Object[] record = new Object[desc.getFields().length];
+                        if (!reader.read(record)) {
+                            break;
+                        }
+                        writer.writeData(record);
+                    }
+                } finally {
+                    reader.close();
+                    writer.close();
+                }
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            // do nothing
+        }
+
+        @Override
+        public void writeData(Object[] data) throws HyracksDataException {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    protected Reporter createReporter() {
+        return new Reporter() {
+            @Override
+            public Counter getCounter(Enum<?> name) {
+                return null;
+            }
+
+            @Override
+            public Counter getCounter(String group, String name) {
+                return null;
+            }
+
+            @Override
+            public InputSplit getInputSplit() throws UnsupportedOperationException {
+                return null;
+            }
+
+            @Override
+            public void incrCounter(Enum<?> key, long amount) {
+
+            }
+
+            @Override
+            public void incrCounter(String group, String counter, long amount) {
+
+            }
+
+            @Override
+            public void progress() {
+
+            }
+
+            @Override
+            public void setStatus(String status) {
+
+            }
+        };
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(partition),
+                recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
new file mode 100644
index 0000000..5744600
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
+
+import edu.uci.ics.dcache.client.DCacheClient;
+import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public abstract class AbstractHadoopOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+	
+	protected static class DataWritingOutputCollector<K, V> implements OutputCollector<K, V> {
+        private IDataWriter<Object[]> writer;
+
+        public DataWritingOutputCollector() {
+        }
+
+        public DataWritingOutputCollector(IDataWriter<Object[]> writer) {
+            this.writer = writer;
+        }
+
+        @Override
+        public void collect(Object key, Object value) throws IOException {
+           	writer.writeData(new Object[] { key, value });
+        }
+
+        public void setWriter(IDataWriter<Object[]> writer) {
+            this.writer = writer;
+        }
+    }
+
+    public static String MAPRED_CACHE_FILES = "mapred.cache.files";
+    public static String MAPRED_CACHE_LOCALFILES = "mapred.cache.localFiles";
+
+    private static final long serialVersionUID = 1L;
+    private final HashMap<String, String> jobConfMap;
+    private IHadoopClassFactory hadoopClassFactory;
+    
+    public abstract RecordDescriptor getRecordDescriptor(JobConf jobConf);
+    	
+    public AbstractHadoopOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor, JobConf jobConf,
+            IHadoopClassFactory hadoopOperatorFactory) {
+        super(spec, 1, 1);
+        jobConfMap = DatatypeHelper.jobConf2HashMap(jobConf);
+        this.hadoopClassFactory = hadoopOperatorFactory;
+        if(recordDescriptor != null){
+        	recordDescriptors[0]= recordDescriptor;
+        }else{
+        	recordDescriptors[0] = getRecordDescriptor(jobConf);
+        }
+    }
+
+     
+    public HashMap<String, String> getJobConfMap() {
+		return jobConfMap;
+	}
+
+
+	public IHadoopClassFactory getHadoopClassFactory() {
+		return hadoopClassFactory;
+	}
+
+
+	public void setHadoopClassFactory(IHadoopClassFactory hadoopClassFactory) {
+		this.hadoopClassFactory = hadoopClassFactory;
+	}
+
+
+	protected Reporter createReporter() {
+        return new Reporter() {
+            @Override
+            public Counter getCounter(Enum<?> name) {
+                return null;
+            }
+
+            @Override
+            public Counter getCounter(String group, String name) {
+                return null;
+            }    
+              
+
+            @Override
+            public InputSplit getInputSplit() throws UnsupportedOperationException {
+                return null;
+            }
+
+            @Override
+            public void incrCounter(Enum<?> key, long amount) {
+
+            }
+
+            @Override
+            public void incrCounter(String group, String counter, long amount) {
+
+            }
+
+            @Override
+            public void progress() {
+
+            }
+
+            @Override
+            public void setStatus(String status) {
+
+            }
+        };
+    }
+
+    public JobConf getJobConf() {
+        return DatatypeHelper.hashMap2JobConf(jobConfMap);
+    }
+    
+    public void populateCache(JobConf jobConf) {
+        String cache = jobConf.get(MAPRED_CACHE_FILES);
+        System.out.println("cache:" + cache);
+        if (cache == null) {
+            return;
+        }
+        String localCache = jobConf.get(MAPRED_CACHE_LOCALFILES);
+        System.out.println("localCache:" + localCache);
+        if (localCache != null) {
+            return;
+        }
+        localCache = "";
+        StringTokenizer cacheTokenizer = new StringTokenizer(cache, ",");
+        while (cacheTokenizer.hasMoreTokens()) {
+            if (!"".equals(localCache)) {
+                localCache += ",";
+            }
+            try {
+                localCache += DCacheClient.get().get(cacheTokenizer.nextToken());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        jobConf.set(MAPRED_CACHE_LOCALFILES, localCache);
+        System.out.println("localCache:" + localCache);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HDFSWriteOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HDFSWriteOperatorDescriptor.java
new file mode 100644
index 0000000..1e94264
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HDFSWriteOperatorDescriptor.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IRecordWriter;
+import edu.uci.ics.hyracks.dataflow.std.file.RecordWriter;
+
+
+
+public class HDFSWriteOperatorDescriptor extends
+		AbstractFileWriteOperatorDescriptor {
+	
+	private static String nullWritableClassName = NullWritable.class.getName();
+	
+	private static class HDFSLineWriterImpl extends RecordWriter {
+		
+        HDFSLineWriterImpl(FileSystem fileSystem, String hdfsPath, int[] columns, char separator)
+                throws Exception {
+    		super(columns,separator,new Object[]{fileSystem,hdfsPath});
+        }
+
+		@Override
+		public OutputStream createOutputStream(Object[] args) throws Exception {
+			FSDataOutputStream fs = ((FileSystem)args[0]).create(new Path((String)args[1]));
+			return fs;
+		}
+
+		 @Override
+	     public void write(Object[] record) throws Exception {
+	         if(!nullWritableClassName.equals((record[0].getClass().getName()))){
+	             bufferedWriter.write(String.valueOf(record[0]));
+	         }
+	         if(!nullWritableClassName.equals((record[1].getClass().getName()))){
+	        	  bufferedWriter.write(separator);	 
+	        	  bufferedWriter.write(String.valueOf(record[1]));
+	         }	 
+	         bufferedWriter.write("\n");
+	     }
+    }
+
+	private static class HDFSSequenceWriterImpl extends RecordWriter {
+		
+		private Writer writer;
+		
+		HDFSSequenceWriterImpl(FileSystem fileSystem, String hdfsPath, Writer writer)
+                throws Exception {
+    		super(null,COMMA,new Object[]{fileSystem,hdfsPath});
+    		this.writer = writer;
+        }
+
+		@Override
+		public OutputStream createOutputStream(Object[] args) throws Exception {
+			return null;
+		}
+		
+		@Override
+	     public void close() {
+	         try {
+	             writer.close();
+	         } catch (IOException e) {
+	             e.printStackTrace();
+	         }
+	     }
+
+	     @Override
+	     public void write(Object[] record) throws Exception {
+	         Object key = record[0];
+	         Object value = record[1];
+	         writer.append(key, value);
+	     }
+
+    }
+	
+    private static final long serialVersionUID = 1L;
+    private static final char COMMA = ',';
+	private char separator;
+	private boolean sequenceFileOutput = false;
+	private String keyClassName;
+	private String valueClassName;
+	Map<String,String> jobConfMap;
+    
+
+    @Override
+    protected IRecordWriter createRecordWriter(File file,int index) throws Exception {
+    	JobConf conf = DatatypeHelper.hashMap2JobConf((HashMap)jobConfMap);
+		System.out.println("replication:" + conf.get("dfs.replication"));
+    	FileSystem fileSystem = null;
+		try{
+			fileSystem = FileSystem.get(conf);
+		}catch(IOException ioe){
+			ioe.printStackTrace();
+		}
+		Path path = new Path(file.getAbsolutePath());
+		checkIfCanWriteToHDFS(new FileSplit[]{new FileSplit("localhost",file)});
+		FSDataOutputStream outputStream = fileSystem.create(path);
+		outputStream.close();
+		if(sequenceFileOutput){
+			Class  keyClass = Class.forName(keyClassName);  
+			Class valueClass= Class.forName(valueClassName);
+			conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
+			Writer writer = SequenceFile.createWriter(fileSystem, conf,path, keyClass, valueClass);
+			return new HDFSSequenceWriterImpl(fileSystem, file.getAbsolutePath(), writer);
+		}else{
+			return new HDFSLineWriterImpl(fileSystem, file.getAbsolutePath(), null, COMMA);
+		}	
+    }
+    
+    private boolean checkIfCanWriteToHDFS(FileSplit[] fileSplits) throws Exception{
+    	boolean canWrite = true;
+    	JobConf conf = DatatypeHelper.hashMap2JobConf((HashMap)jobConfMap);
+		FileSystem fileSystem = null;
+		try{
+			fileSystem = FileSystem.get(conf);
+		    for(FileSplit fileSplit : fileSplits){
+				Path path = new Path(fileSplit.getLocalFile().getAbsolutePath());
+				canWrite = !fileSystem.exists(path);
+				if(!canWrite){
+					throw new Exception(" Output path :  already exists");
+				}	
+			}
+		}catch(IOException ioe){
+			ioe.printStackTrace();
+			throw ioe;
+		}
+	    return canWrite;
+    }
+		
+	
+	public HDFSWriteOperatorDescriptor(JobSpecification jobSpec,Map<String,String> jobConfMap, FileSplit[] fileSplits,char seperator,boolean sequenceFileOutput,String keyClassName, String valueClassName) throws Exception{
+		super(jobSpec,fileSplits);
+		this.jobConfMap = jobConfMap;
+		checkIfCanWriteToHDFS(fileSplits);
+		this.separator = seperator;
+		this.sequenceFileOutput = sequenceFileOutput;
+		this.keyClassName = keyClassName;
+		this.valueClassName = valueClassName;
+	}
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
new file mode 100644
index 0000000..8397d86
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
+import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
+
+public class HadoopMapperOperatorDescriptor<K1, V1, K2, V2> extends AbstractHadoopOperatorDescriptor {
+    private class MapperOperator implements IOpenableDataWriterOperator {
+        private OutputCollector<K2, V2> output;
+        private Reporter reporter;
+        private Mapper<K1, V1, K2, V2> mapper;
+        private IOpenableDataWriter<Object[]> writer;
+
+        @Override
+        public void close() throws HyracksDataException {
+            try {
+                mapper.close();
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+            writer.close();
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            JobConf jobConf = getJobConf();
+            populateCache(jobConf);
+            try {
+                mapper = createMapper();
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+            // -- - configure - --
+            mapper.configure(jobConf);
+            writer.open();
+            output = new DataWritingOutputCollector<K2, V2>(writer);
+            reporter = createReporter();
+        }
+
+        @Override
+        public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
+            if (index != 0) {
+                throw new IllegalArgumentException();
+            }
+            this.writer = writer;
+        }
+
+        @Override
+        public void writeData(Object[] data) throws HyracksDataException {
+            try {
+                mapper.map((K1) data[0], (V1) data[1], output, reporter);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+    }
+
+    private static final long serialVersionUID = 1L;
+    private static final String mapClassNameKey = "mapred.mapper.class";
+    private Class<? extends Mapper> mapperClass;
+
+    public HadoopMapperOperatorDescriptor(JobSpecification spec, Class<? extends Mapper> mapperClass,
+            RecordDescriptor recordDescriptor, JobConf jobConf) {
+        super(spec, recordDescriptor, jobConf, null);
+        this.mapperClass = mapperClass;
+    }
+
+    public HadoopMapperOperatorDescriptor(JobSpecification spec, JobConf jobConf, IHadoopClassFactory hadoopClassFactory) {
+        super(spec, null, jobConf, hadoopClassFactory);
+    }
+
+    public RecordDescriptor getRecordDescriptor(JobConf conf) {
+        RecordDescriptor recordDescriptor = null;
+        String mapOutputKeyClassName = conf.getMapOutputKeyClass().getName();
+        String mapOutputValueClassName = conf.getMapOutputValueClass().getName();
+        try {
+            if (getHadoopClassFactory() == null) {
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                        (Class<? extends Writable>) Class.forName(mapOutputKeyClassName),
+                        (Class<? extends Writable>) Class.forName(mapOutputValueClassName));
+            } else {
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                        (Class<? extends Writable>) getHadoopClassFactory().loadClass(mapOutputKeyClassName),
+                        (Class<? extends Writable>) getHadoopClassFactory().loadClass(mapOutputValueClassName));
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return recordDescriptor;
+    }
+
+    private Mapper<K1, V1, K2, V2> createMapper() throws Exception {
+        if (mapperClass != null) {
+            return mapperClass.newInstance();
+        } else {
+            String mapperClassName = super.getJobConfMap().get(mapClassNameKey);
+            Object mapper = getHadoopClassFactory().createMapper(mapperClassName);
+            mapperClass = (Class<? extends Mapper>) mapper.getClass();
+            return (Mapper) mapper;
+        }
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new DeserializedOperatorNodePushable(ctx, new MapperOperator(),
+                recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+    }
+
+    public Class<? extends Mapper> getMapperClass() {
+        return mapperClass;
+    }
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
new file mode 100644
index 0000000..288fdbd
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
@@ -0,0 +1,236 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.HadoopAdapter;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.HadoopFileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IRecordReader;
+
+public class HadoopReadOperatorDescriptor extends AbstractHadoopFileScanOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private String inputFormatClassName;
+    private Map<String, String> jobConfMap;
+
+    private static class HDFSCustomReader implements IRecordReader {
+        private RecordReader hadoopRecordReader;
+        private Class inputKeyClass;
+        private Class inputValueClass;
+        private Object key;
+        private Object value;
+
+        public HDFSCustomReader(Map<String, String> jobConfMap, HadoopFileSplit inputSplit,
+                String inputFormatClassName, Reporter reporter) {
+            try {
+                JobConf conf = DatatypeHelper.hashMap2JobConf((HashMap) jobConfMap);
+                FileSystem fileSystem = null;
+                try {
+                    fileSystem = FileSystem.get(conf);
+                } catch (IOException ioe) {
+                    ioe.printStackTrace();
+                }
+
+                Class inputFormatClass = Class.forName(inputFormatClassName);
+                InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
+                hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(getFileSplit(inputSplit), conf,
+                        reporter);
+                if (hadoopRecordReader instanceof SequenceFileRecordReader) {
+                    inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader).getKeyClass();
+                    inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader).getValueClass();
+                } else {
+                    inputKeyClass = hadoopRecordReader.createKey().getClass();
+                    inputValueClass = hadoopRecordReader.createValue().getClass();
+                }
+                key = inputKeyClass.newInstance();
+                value = inputValueClass.newInstance();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+        public Class getInputKeyClass() {
+            return inputKeyClass;
+        }
+
+        public void setInputKeyClass(Class inputKeyClass) {
+            this.inputKeyClass = inputKeyClass;
+        }
+
+        public Class getInputValueClass() {
+            return inputValueClass;
+        }
+
+        public void setInputValueClass(Class inputValueClass) {
+            this.inputValueClass = inputValueClass;
+        }
+
+        @Override
+        public void close() {
+            try {
+                hadoopRecordReader.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+
+        @Override
+        public boolean read(Object[] record) throws Exception {
+            if (!hadoopRecordReader.next(key, value)) {
+                return false;
+            }
+            if (record.length == 1) {
+                record[0] = value;
+            } else {
+                record[0] = key;
+                record[1] = value;
+            }
+            return true;
+        }
+
+        private FileSplit getFileSplit(HadoopFileSplit hadoopFileSplit) {
+            FileSplit fileSplit = new FileSplit(new Path(hadoopFileSplit.getFile()), hadoopFileSplit.getStart(),
+                    hadoopFileSplit.getLength(), hadoopFileSplit.getHosts());
+            return fileSplit;
+        }
+    }
+
+    public HadoopReadOperatorDescriptor(Map<String, String> jobConfMap, JobSpecification spec,
+            HadoopFileSplit[] splits, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+        super(spec, splits, recordDescriptor);
+        this.inputFormatClassName = inputFormatClassName;
+        this.jobConfMap = jobConfMap;
+    }
+
+    public HadoopReadOperatorDescriptor(Map<String, String> jobConfMap, InetSocketAddress nameNode,
+            JobSpecification spec, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+        super(spec, null, recordDescriptor);
+        this.inputFormatClassName = inputFormatClassName;
+        this.jobConfMap = jobConfMap;
+    }
+
+    // public HadoopReadOperatorDescriptor(IClusterController clusterController, Map<String, String> jobConfMap,
+    // JobSpecification spec, String fileSystemURL, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+    // super(spec, null, recordDescriptor);
+    // HadoopAdapter hadoopAdapter = HadoopAdapter.getInstance(fileSystemURL);
+    // String inputPathString = jobConfMap.get("mapred.input.dir");
+    // String[] inputPaths = inputPathString.split(",");
+    // Map<String, List<HadoopFileSplit>> blocksToRead = hadoopAdapter.getInputSplits(inputPaths);
+    // List<HadoopFileSplit> hadoopFileSplits = new ArrayList<HadoopFileSplit>();
+    // for (String filePath : blocksToRead.keySet()) {
+    // hadoopFileSplits.addAll(blocksToRead.get(filePath));
+    // }
+    // for (HadoopFileSplit hadoopFileSplit : hadoopFileSplits) {
+    // System.out.println(" Hadoop File Split : " + hadoopFileSplit);
+    // }
+    // super.splits = hadoopFileSplits.toArray(new HadoopFileSplit[] {});
+    // configurePartitionConstraints(clusterController, blocksToRead);
+    // this.inputFormatClassName = inputFormatClassName;
+    // this.jobConfMap = jobConfMap;
+    // }
+
+    // private void configurePartitionConstraints(IClusterController clusterController,
+    // Map<String, List<HadoopFileSplit>> blocksToRead) {
+    // List<LocationConstraint> locationConstraints = new ArrayList<LocationConstraint>();
+    // Map<String, INodeController> registry = null;
+    // try {
+    // // registry = clusterController.getRegistry();
+    // // TODO
+    // } catch (Exception e) {
+    // e.printStackTrace();
+    // }
+    // Map<String, String> hostnameToNodeIdMap = new HashMap<String, String>();
+    // NCConfig ncConfig = null;
+    // for (String nodeId : registry.keySet()) {
+    // try {
+    // ncConfig = ((INodeController) registry.get(nodeId)).getConfiguration();
+    // String ipAddress = ncConfig.dataIPAddress;
+    // String hostname = InetAddress.getByName(ipAddress).getHostName();
+    // System.out.println(" hostname :" + hostname + " nodeid:" + nodeId);
+    // hostnameToNodeIdMap.put(hostname, nodeId);
+    // } catch (Exception e) {
+    // e.printStackTrace();
+    // }
+    // }
+    //
+    // for (String filePath : blocksToRead.keySet()) {
+    // List<HadoopFileSplit> hadoopFileSplits = blocksToRead.get(filePath);
+    // for (HadoopFileSplit hadoopFileSplit : hadoopFileSplits) {
+    // String hostname = hadoopFileSplit.getHosts()[0];
+    // System.out.println("host name is :" + hostname);
+    // InetAddress address = null;
+    // try {
+    // address = InetAddress.getByName(hostname);
+    // if (address.isLoopbackAddress()) {
+    // Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
+    // while (netInterfaces.hasMoreElements()) {
+    // NetworkInterface ni = netInterfaces.nextElement();
+    // InetAddress inetAddress = (InetAddress) ni.getInetAddresses().nextElement();
+    // if (!inetAddress.isLoopbackAddress()) {
+    // address = inetAddress;
+    // break;
+    // }
+    // }
+    // }
+    // hostname = address.getHostName();
+    // System.out.println("cannonical host name hyracks :" + hostname);
+    // } catch (Exception e) {
+    // e.printStackTrace();
+    // }
+    // String nodeId = hostnameToNodeIdMap.get(hostname);
+    // System.out.println(" corresponding node id is :" + nodeId);
+    // LocationConstraint locationConstraint = new AbsoluteLocationConstraint(nodeId);
+    // locationConstraints.add(locationConstraint);
+    // }
+    // }
+    //
+    // PartitionConstraint partitionConstraint = new ExplicitPartitionConstraint(locationConstraints
+    // .toArray(new LocationConstraint[] {}));
+    // this.setPartitionConstraint(partitionConstraint);
+    // }
+
+    @Override
+    protected IRecordReader createRecordReader(HadoopFileSplit fileSplit, RecordDescriptor desc) throws Exception {
+        Reporter reporter = createReporter();
+        IRecordReader recordReader = new HDFSCustomReader(jobConfMap, fileSplit, inputFormatClassName, reporter);
+        return recordReader;
+    }
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
new file mode 100644
index 0000000..1f9cba8
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -0,0 +1,267 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IDataReader;
+import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.KeyComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.ClasspathBasedHadoopClassFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
+import edu.uci.ics.hyracks.dataflow.std.group.IGroupAggregator;
+import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperator;
+import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
+
+public class HadoopReducerOperatorDescriptor<K2, V2, K3, V3> extends AbstractHadoopOperatorDescriptor {
+    private class ReducerAggregator implements IGroupAggregator {
+        private Reducer<K2, V2, K3, V3> reducer;
+        private DataWritingOutputCollector<K3, V3> output;
+        private Reporter reporter;
+
+        public ReducerAggregator(Reducer<K2, V2, K3, V3> reducer) {
+            this.reducer = reducer;
+            reducer.configure(getJobConf());
+            output = new DataWritingOutputCollector<K3, V3>();
+            reporter = new Reporter() {
+
+                @Override
+                public void progress() {
+
+                }
+
+                @Override
+                public void setStatus(String arg0) {
+
+                }
+
+                @Override
+                public void incrCounter(String arg0, String arg1, long arg2) {
+
+                }
+
+                @Override
+                public void incrCounter(Enum<?> arg0, long arg1) {
+
+                }
+
+                @Override
+                public InputSplit getInputSplit() throws UnsupportedOperationException {
+                    return null;
+                }
+
+                @Override
+                public Counter getCounter(String arg0, String arg1) {
+                    return null;
+                }
+
+                @Override
+                public Counter getCounter(Enum<?> arg0) {
+                    return null;
+                }
+            };
+        }
+
+        @Override
+        public void aggregate(IDataReader<Object[]> reader, IDataWriter<Object[]> writer) throws HyracksDataException {
+
+            ValueIterator i = new ValueIterator();
+            i.reset(reader);
+            output.setWriter(writer);
+            try {
+
+                // -- - reduce - --
+                reducer.reduce(i.getKey(), i, output, reporter);
+
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            // -- - close - --
+            try {
+                reducer.close();
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+    }
+
+    private class ValueIterator implements Iterator<V2> {
+        private IDataReader<Object[]> reader;
+        private K2 key;
+        private V2 value;
+
+        public K2 getKey() {
+            return key;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (value == null) {
+                Object[] tuple;
+                try {
+                    tuple = reader.readData();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                if (tuple != null) {
+                    value = (V2) tuple[1];
+                }
+            }
+            return value != null;
+        }
+
+        @Override
+        public V2 next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            V2 v = value;
+            value = null;
+            return v;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        void reset(IDataReader<Object[]> reader) {
+            this.reader = reader;
+            try {
+                Object[] tuple = reader.readData();
+                key = (K2) tuple[0];
+                value = (V2) tuple[1];
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private static final long serialVersionUID = 1L;
+    private Class<? extends Reducer> reducerClass;
+    private static final String reducerClassKey = "mapred.reducer.class";
+    private static final String comparatorClassKey = "mapred.output.value.groupfn.class";
+    private IComparatorFactory comparatorFactory;
+
+    public HadoopReducerOperatorDescriptor(JobSpecification spec, IComparatorFactory comparatorFactory,
+            Class<? extends Reducer> reducerClass, RecordDescriptor recordDescriptor, JobConf jobConf) {
+        super(spec, recordDescriptor, jobConf, new ClasspathBasedHadoopClassFactory());
+        this.comparatorFactory = comparatorFactory;
+        this.reducerClass = reducerClass;
+    }
+
+    public HadoopReducerOperatorDescriptor(JobSpecification spec, JobConf conf, IHadoopClassFactory classFactory) {
+        super(spec, null, conf, classFactory);
+    }
+
+    private Reducer<K2, V2, K3, V3> createReducer() throws Exception {
+        if (reducerClass != null) {
+            return reducerClass.newInstance();
+        } else {
+            Object reducer = getHadoopClassFactory().createReducer(getJobConfMap().get(reducerClassKey));
+            reducerClass = (Class<? extends Reducer>) reducer.getClass();
+            return (Reducer) reducer;
+        }
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        try {
+            if (this.comparatorFactory == null) {
+                String comparatorClassName = getJobConfMap().get(comparatorClassKey);
+                RawComparator rawComparator = null;
+                if (comparatorClassName != null) {
+                    Class comparatorClazz = getHadoopClassFactory().loadClass(comparatorClassName);
+                    this.comparatorFactory = new KeyComparatorFactory(comparatorClazz);
+
+                } else {
+                    String mapOutputKeyClass = getJobConfMap().get("mapred.mapoutput.key.class");
+                    if (getHadoopClassFactory() != null) {
+                        rawComparator = WritableComparator.get(getHadoopClassFactory().loadClass(mapOutputKeyClass));
+                    } else {
+                        rawComparator = WritableComparator.get((Class<? extends WritableComparable>) Class
+                                .forName(mapOutputKeyClass));
+                    }
+                    this.comparatorFactory = new WritableComparingComparatorFactory(rawComparator.getClass());
+                }
+            }
+            IOpenableDataWriterOperator op = new PreclusteredGroupOperator(new int[] { 0 },
+                    new IComparator[] { comparatorFactory.createComparator() }, new ReducerAggregator(createReducer()));
+            return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
+                    getOperatorId(), 0));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Class<? extends Reducer> getReducerClass() {
+        return reducerClass;
+    }
+
+    public void setReducerClass(Class<? extends Reducer> reducerClass) {
+        this.reducerClass = reducerClass;
+    }
+
+    @Override
+    public RecordDescriptor getRecordDescriptor(JobConf conf) {
+        String outputKeyClassName = conf.get("mapred.output.key.class");
+        String outputValueClassName = conf.get("mapred.output.value.class");
+        RecordDescriptor recordDescriptor = null;
+        try {
+            if (getHadoopClassFactory() == null) {
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                        (Class<? extends Writable>) Class.forName(outputKeyClassName),
+                        (Class<? extends Writable>) Class.forName(outputValueClassName));
+            } else {
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                        (Class<? extends Writable>) getHadoopClassFactory().loadClass(outputKeyClassName),
+                        (Class<? extends Writable>) getHadoopClassFactory().loadClass(outputValueClassName));
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            return null;
+        }
+        return recordDescriptor;
+    }
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/AbstractClassBasedDelegate.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/AbstractClassBasedDelegate.java
new file mode 100644
index 0000000..666cc21
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/AbstractClassBasedDelegate.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.data;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+
+public class AbstractClassBasedDelegate<T> implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private Class<? extends T> klass;
+    protected transient T instance;
+
+    public AbstractClassBasedDelegate(Class<? extends T> klass) {
+        this.klass = klass;
+        init();
+    }
+
+    protected Object readResolve() throws ObjectStreamException {
+        init();
+        return this;
+    }
+
+    private void init() {
+        try {
+            instance = klass.newInstance();
+        } catch (InstantiationException e) {
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopHashTuplePartitionComputerFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopHashTuplePartitionComputerFactory.java
new file mode 100644
index 0000000..98da69c
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopHashTuplePartitionComputerFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.data;
+
+import java.io.DataInputStream;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+
+public class HadoopHashTuplePartitionComputerFactory<K extends Writable> implements ITuplePartitionComputerFactory {
+    private static final long serialVersionUID = 1L;
+    private final ISerializerDeserializer<K> keyIO;
+
+    public HadoopHashTuplePartitionComputerFactory(ISerializerDeserializer<K> keyIO) {
+        this.keyIO = keyIO;
+    }
+
+    @Override
+    public ITuplePartitionComputer createPartitioner() {
+        return new ITuplePartitionComputer() {
+            private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+            private final DataInputStream dis = new DataInputStream(bbis);
+
+            @Override
+            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+                int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                        + accessor.getFieldStartOffset(tIndex, 0);
+                bbis.setByteBuffer(accessor.getBuffer(), keyStart);
+                K key = keyIO.deserialize(dis);
+                int h = key.hashCode();
+                if (h < 0) {
+                    h = -h;
+                }
+                return h % nParts;
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopPartitionerTuplePartitionComputerFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopPartitionerTuplePartitionComputerFactory.java
new file mode 100644
index 0000000..81d7c6d
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopPartitionerTuplePartitionComputerFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.data;
+
+import java.io.DataInputStream;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Partitioner;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+
+public class HadoopPartitionerTuplePartitionComputerFactory<K extends Writable, V extends Writable> extends
+        AbstractClassBasedDelegate<Partitioner<K, V>> implements ITuplePartitionComputerFactory {
+    private static final long serialVersionUID = 1L;
+    private final ISerializerDeserializer<K> keyIO;
+    private final ISerializerDeserializer<V> valueIO;
+
+    public HadoopPartitionerTuplePartitionComputerFactory(Class<? extends Partitioner<K, V>> klass,
+            ISerializerDeserializer<K> keyIO, ISerializerDeserializer<V> valueIO) {
+        super(klass);
+        this.keyIO = keyIO;
+        this.valueIO = valueIO;
+    }
+
+    @Override
+    public ITuplePartitionComputer createPartitioner() {
+        return new ITuplePartitionComputer() {
+            private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+            private final DataInputStream dis = new DataInputStream(bbis);
+
+            @Override
+            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+                int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                        + accessor.getFieldStartOffset(tIndex, 0);
+                bbis.setByteBuffer(accessor.getBuffer(), keyStart);
+                K key = keyIO.deserialize(dis);
+                int valueStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                        + accessor.getFieldStartOffset(tIndex, 1);
+                bbis.setByteBuffer(accessor.getBuffer(), valueStart);
+                V value = valueIO.deserialize(dis);
+                return instance.getPartition(key, value, nParts);
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/KeyBinaryComparatorFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/KeyBinaryComparatorFactory.java
new file mode 100644
index 0000000..ad8a9e8
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/KeyBinaryComparatorFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.data;
+
+import org.apache.hadoop.io.RawComparator;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.util.ReflectionUtils;
+
+public class KeyBinaryComparatorFactory<T> implements IBinaryComparatorFactory {
+    private static final long serialVersionUID = 1L;
+
+    private Class<? extends RawComparator<T>> cmpClass;
+
+    public KeyBinaryComparatorFactory(Class<? extends RawComparator<T>> cmpClass) {
+        this.cmpClass = cmpClass;
+    }
+
+    @Override
+    public IBinaryComparator createBinaryComparator() {
+        final RawComparator<T> instance = ReflectionUtils.createInstance(cmpClass);
+        return new IBinaryComparator() {
+            @Override
+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+                return instance.compare(b1, s1, l1, b2, s2, l2);
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/KeyComparatorFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/KeyComparatorFactory.java
new file mode 100644
index 0000000..cba86d7
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/KeyComparatorFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.data;
+
+import org.apache.hadoop.io.RawComparator;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.util.ReflectionUtils;
+
+public class KeyComparatorFactory<T> implements IComparatorFactory<T> {
+    private static final long serialVersionUID = 1L;
+    private Class<? extends RawComparator<T>> cmpClass;
+
+    public KeyComparatorFactory(Class<? extends RawComparator<T>> cmpClass) {
+        this.cmpClass = cmpClass;
+    }
+
+    @Override
+    public IComparator<T> createComparator() {
+        final RawComparator<T> instance = ReflectionUtils.createInstance(cmpClass);
+        return new IComparator<T>() {
+            @Override
+            public int compare(T o1, T o2) {
+                return instance.compare(o1, o2);
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java
new file mode 100644
index 0000000..d06d6ff
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.data;
+
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.util.ReflectionUtils;
+
+public class WritableComparingBinaryComparatorFactory<T> implements IBinaryComparatorFactory {
+    private static final long serialVersionUID = 1L;
+
+    private Class<? extends WritableComparator> cmpClass;
+
+    public WritableComparingBinaryComparatorFactory(Class<? extends WritableComparator> cmpClass) {
+        this.cmpClass = cmpClass;
+    }
+
+    @Override
+    public IBinaryComparator createBinaryComparator() {
+        final WritableComparator instance = ReflectionUtils.createInstance(cmpClass);
+        return new IBinaryComparator() {
+            @Override
+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+                return instance.compare(b1, s1, l1, b2, s2, l2);
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingComparatorFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingComparatorFactory.java
new file mode 100644
index 0000000..c78648f
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingComparatorFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.data;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.util.ReflectionUtils;
+
+public class WritableComparingComparatorFactory<T> implements IComparatorFactory<WritableComparable<T>> {
+    private Class<? extends RawComparator> klass;
+
+    public WritableComparingComparatorFactory(Class<? extends WritableComparator> klass) {
+        this.klass = klass;
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IComparator<WritableComparable<T>> createComparator() {
+        final RawComparator<WritableComparable<T>> instance = ReflectionUtils.createInstance(klass);
+        return new IComparator<WritableComparable<T>>() {
+            @Override
+            public int compare(WritableComparable<T> o1, WritableComparable<T> o2) {
+                return instance.compare(o1, o2);
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
new file mode 100644
index 0000000..56ddbe5
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.util;
+
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+
+public class ClasspathBasedHadoopClassFactory implements IHadoopClassFactory {
+
+	@Override
+	public Mapper createMapper(String mapClassName) throws Exception {
+		Class clazz = loadClass(mapClassName);
+		return (Mapper)clazz.newInstance();
+	}
+
+	@Override
+	public Reducer createReducer(String reduceClassName) throws Exception {
+		Class clazz = loadClass(reduceClassName);
+		return (Reducer)clazz.newInstance();
+	}
+
+	@Override
+	public Class loadClass(String className) throws Exception {
+		Class clazz = Class.forName(className);
+		return clazz;
+	}
+
+	
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DatatypeHelper.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DatatypeHelper.java
new file mode 100644
index 0000000..7eea853
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DatatypeHelper.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+@SuppressWarnings("deprecation")
+public class DatatypeHelper {
+    private static final class WritableSerializerDeserializer<T extends Writable> implements ISerializerDeserializer<T> {
+        private static final long serialVersionUID = 1L;
+
+        private Class<T> clazz;
+
+        private WritableSerializerDeserializer(Class<T> clazz) {
+            this.clazz = clazz;
+        }
+
+        private T createInstance() throws HyracksDataException {
+            // TODO remove "if", create a new WritableInstanceOperations class
+            // that deals with Writables that don't have public constructors
+            if (NullWritable.class.equals(clazz)) {
+                return (T) NullWritable.get();
+            }
+            try {
+                return clazz.newInstance();
+            } catch (InstantiationException e) {
+                throw new HyracksDataException(e);
+            } catch (IllegalAccessException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        @Override
+        public T deserialize(DataInput in) throws HyracksDataException {
+            T o = createInstance();
+            try {
+                o.readFields(in);
+            } catch (IOException e) {
+                e.printStackTrace();
+                // throw new HyracksDataException(e);
+            }
+            return o;
+        }
+
+        @Override
+        public void serialize(T instance, DataOutput out) throws HyracksDataException {
+            try {
+                instance.write(out);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+    }
+
+    public static ISerializerDeserializer<? extends Writable> createSerializerDeserializer(
+            Class<? extends Writable> fClass) {
+        return new WritableSerializerDeserializer(fClass);
+    }
+
+    public static RecordDescriptor createKeyValueRecordDescriptor(Class<? extends Writable> keyClass,
+            Class<? extends Writable> valueClass) {
+        ISerializerDeserializer[] fields = new ISerializerDeserializer[2];
+        fields[0] = createSerializerDeserializer(keyClass);
+        fields[1] = createSerializerDeserializer(valueClass);
+        return new RecordDescriptor(fields);
+    }
+
+    public static RecordDescriptor createOneFieldRecordDescriptor(Class<? extends Writable> fieldClass) {
+        ISerializerDeserializer[] fields = new ISerializerDeserializer[1];
+        fields[0] = createSerializerDeserializer(fieldClass);
+        return new RecordDescriptor(fields);
+    }
+
+    public static JobConf hashMap2JobConf(HashMap<String, String> jobConfMap) {
+        JobConf jobConf;
+        synchronized (Configuration.class) {
+            jobConf = new JobConf();
+            for (Entry<String, String> entry : jobConfMap.entrySet()) {
+                jobConf.set(entry.getKey(), entry.getValue());
+            }
+        }
+        return jobConf;
+    }
+
+    public static HashMap<String, String> jobConf2HashMap(JobConf jobConf) {
+        HashMap<String, String> jobConfMap = new HashMap<String, String>();
+        for (Entry<String, String> entry : jobConf) {
+            jobConfMap.put(entry.getKey(), entry.getValue());
+        }
+        return jobConfMap;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DuplicateKeyMapper.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DuplicateKeyMapper.java
new file mode 100644
index 0000000..0003076
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DuplicateKeyMapper.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.util;
+
+import java.util.Properties;
+
+import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.map.IMapper;
+
+public class DuplicateKeyMapper implements IMapper {
+
+    @Override
+    public void map(Object[] data, IDataWriter<Object[]> writer) throws HyracksDataException {
+        writer.writeData(new Object[] { data[0], data[1], data[0] });
+
+    }
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopAdapter.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopAdapter.java
new file mode 100644
index 0000000..0c984bc
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopAdapter.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
+
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+
+public class HadoopAdapter {
+
+	private static ClientProtocol namenode;
+	private static FileSystem fileSystem;
+	private static JobConf jobConf;
+	private static HadoopAdapter instance;
+	
+	public static final String DFS_DATA_DIR = "dfs.data.dir";
+	public static final String FS_DEFAULT_NAME = "fs.default.name";
+	public static final String DFS_REPLICATION = "dfs.replication";
+	
+	public static HadoopAdapter getInstance(String fileSystemURL){
+		if(instance == null){
+			jobConf = new JobConf(true);
+			String [] urlParts = parseURL(fileSystemURL);
+			jobConf.set(FS_DEFAULT_NAME, fileSystemURL);
+			instance = new HadoopAdapter(new InetSocketAddress(urlParts[1], Integer.parseInt(urlParts[2])));
+		}
+		return instance;
+	}
+	
+	public static JobConf getConf() {
+		return jobConf;
+	}
+
+	private HadoopAdapter (InetSocketAddress address){
+		try{
+			this.namenode = getNameNode(address);
+			fileSystem = FileSystem.get(jobConf);
+		}catch(IOException ioe){
+			ioe.printStackTrace();
+		}
+	}
+	
+	private static String[] parseURL(String urlString){
+		String[] urlComponents = urlString.split(":");
+		urlComponents[1] = urlComponents[1].substring(2);
+		return urlComponents;
+	}
+	
+	
+	public Map<String,List<HadoopFileSplit>> getInputSplits(String[] inputPaths){
+	    List<HadoopFileSplit> hadoopFileSplits = new ArrayList<HadoopFileSplit>();
+    	Path[] paths = new Path[inputPaths.length];
+    	int index =0;
+    	for(String inputPath : inputPaths){
+    		paths[index++] = new Path(StringUtils.unEscapeString(inputPaths[0]));
+    	}
+    	Map<String,List<HadoopFileSplit>> fileSplitInfo = getBlocksToRead(paths);
+    	return fileSplitInfo;
+	}
+	
+	private static Map<String,List<HadoopFileSplit>> getBlocksToRead(Path[] inputPaths){
+		Map<String,List<HadoopFileSplit>> hadoopFileSplitMap = new HashMap<String,List<HadoopFileSplit>>();
+		for (int i=0;i<inputPaths.length;i++){
+			try{
+				String absolutePathPrint = getAbsolutePath(inputPaths[i]);
+				FileStatus[] fileStatuses = namenode.getListing(absolutePathPrint);
+				for(int j=0;j<fileStatuses.length;j++){
+			    	Path filePath = fileStatuses[j].getPath();
+			    	String absolutePath = getAbsolutePath(filePath);
+			    	List<HadoopFileSplit> fileSplits = getFileBlocks(absolutePath,fileStatuses[j]);
+			    	if(fileSplits!=null && fileSplits.size() > 0){
+			    		hadoopFileSplitMap.put(absolutePath, fileSplits);
+			    	}	
+			    }		
+			   }catch(IOException ioe){
+				ioe.printStackTrace();
+		    }
+			
+		}
+		return hadoopFileSplitMap;
+	}
+	
+	private static ClientProtocol getNameNode(InetSocketAddress address) throws IOException{
+		return (ClientProtocol)getProtocol(ClientProtocol.class, address, new JobConf());
+	}
+	
+	private static String getAbsolutePath(Path path){
+		StringBuffer absolutePath = new StringBuffer();
+		List<String> ancestorPath = new ArrayList<String>();
+		Path pathElement=path;
+		while(pathElement != null){
+			ancestorPath.add(0, pathElement.getName());
+			pathElement = pathElement.getParent();
+		}
+		ancestorPath.remove(0);
+		for(String s : ancestorPath){
+			absolutePath.append("/");
+			absolutePath.append(s);
+		}
+		return new String(absolutePath);
+	}
+	
+	private static VersionedProtocol getProtocol(Class protocolClass, InetSocketAddress inetAddress, JobConf jobConf) throws IOException{
+		VersionedProtocol versionedProtocol = RPC.getProxy(protocolClass, ClientProtocol.versionID, inetAddress, jobConf);	
+		return versionedProtocol;
+	}
+	
+	private static List<HadoopFileSplit> getFileBlocks(String absolutePath,FileStatus fileStatus){
+		List<HadoopFileSplit> hadoopFileSplits = new ArrayList<HadoopFileSplit>();
+		try{
+			LocatedBlocks locatedBlocks = namenode.getBlockLocations(absolutePath, 0, fileStatus.getLen());
+			long blockSize = fileSystem.getBlockSize(new Path(absolutePath));
+			if(locatedBlocks !=null){
+	    		int index = 0;
+				for(LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()){
+					DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); // all datanodes having this block
+					String [] hostnames = new String[datanodeInfos.length];
+					int datanodeCount =0;
+					for(DatanodeInfo datanodeInfo : datanodeInfos){
+						hostnames[datanodeCount++] = datanodeInfo.getHostName();
+					}	
+					HadoopFileSplit hadoopFileSplit = new HadoopFileSplit(absolutePath,new Long(index * blockSize).longValue(),new Long(blockSize).longValue(),hostnames);
+			    	hadoopFileSplits.add(hadoopFileSplit);
+			    	index++;
+			    	}
+	    	}	
+		}catch(Exception e){
+			e.printStackTrace();
+		}
+		return hadoopFileSplits;
+	}
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopFileSplit.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopFileSplit.java
new file mode 100644
index 0000000..6e7b76c
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopFileSplit.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.util;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.fs.Path;
+
+public class HadoopFileSplit implements Serializable{
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+	private String filePath;
+    private long start;
+	private long length;
+    private String[] hosts;
+    
+    public HadoopFileSplit(String filePath, long start, long length, String[] hosts){
+    	this.filePath = filePath;
+    	this.start = start;
+    	this.length = length;
+    	this.hosts = hosts;
+    }
+
+	public String getFile() {
+		return filePath;
+	}
+
+	public void setFile(String file) {
+		this.filePath = file;
+	}
+
+	public long getStart() {
+		return start;
+	}
+
+	public void setStart(long start) {
+		this.start = start;
+	}
+
+	public long getLength() {
+		return length;
+	}
+
+	public void setLength(long length) {
+		this.length = length;
+	}
+
+	public String[] getHosts() {
+		return hosts;
+	}
+
+	public void setHosts(String[] hosts) {
+		this.hosts = hosts;
+	}
+	
+	public String toString(){
+		StringBuilder stringBuilder = new StringBuilder();
+		stringBuilder.append(filePath + " " + start + " " + length +  "\n");
+		for(String host : hosts){
+			stringBuilder.append(host);
+			stringBuilder.append(",");
+		}
+		return new String(stringBuilder);
+	}
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
new file mode 100644
index 0000000..d465471
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.util;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+
+public interface IHadoopClassFactory extends Serializable{
+
+	public Mapper createMapper(String mapClassName) throws Exception;
+	
+	public Reducer createReducer(String reduceClassName) throws Exception;
+	
+	public Class loadClass(String className) throws Exception;
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/PreappendLongWritableMapper.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/PreappendLongWritableMapper.java
new file mode 100644
index 0000000..05be19e
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/PreappendLongWritableMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.util;
+
+import java.util.Properties;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.map.IMapper;
+
+public class PreappendLongWritableMapper implements IMapper {
+
+    @Override
+    public void map(Object[] data, IDataWriter<Object[]> writer) throws HyracksDataException {
+        writer.writeData(new Object[] { new LongWritable(0), new Text(String.valueOf(data[0])) });
+    }
+}