Refactored hadoop operators out of dataflow-std
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@59 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-hadoop/.classpath b/hyracks-dataflow-hadoop/.classpath
new file mode 100644
index 0000000..1f3c1ff
--- /dev/null
+++ b/hyracks-dataflow-hadoop/.classpath
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" output="target/classes" path="src/main/java"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+ <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+ <classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks-dataflow-hadoop/.project b/hyracks-dataflow-hadoop/.project
new file mode 100644
index 0000000..d6edecf
--- /dev/null
+++ b/hyracks-dataflow-hadoop/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>hyracks-dataflow-hadoop</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.maven.ide.eclipse.maven2Builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.maven.ide.eclipse.maven2Nature</nature>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
diff --git a/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs b/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..1a27838
--- /dev/null
+++ b/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Fri Aug 13 04:07:04 PDT 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
+org.eclipse.jdt.core.compiler.compliance=1.6
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks-dataflow-hadoop/.settings/org.maven.ide.eclipse.prefs b/hyracks-dataflow-hadoop/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..9e0c887
--- /dev/null
+++ b/hyracks-dataflow-hadoop/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Fri Aug 13 04:07:00 PDT 2010
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
diff --git a/hyracks-dataflow-hadoop/pom.xml b/hyracks-dataflow-hadoop/pom.xml
new file mode 100644
index 0000000..f277575
--- /dev/null
+++ b/hyracks-dataflow-hadoop/pom.xml
@@ -0,0 +1,54 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-hadoop</artifactId>
+ <version>0.1.0</version>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.1.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ <version>0.1.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.dcache</groupId>
+ <artifactId>dcache-client</artifactId>
+ <version>0.0.1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <version>0.1.0</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopFileScanOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
new file mode 100644
index 0000000..4acc046
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop;
+
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.HadoopFileSplit;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
+import edu.uci.ics.hyracks.dataflow.std.file.IRecordReader;
+import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
+
+public abstract class AbstractHadoopFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ protected HadoopFileSplit[] splits;
+
+ public AbstractHadoopFileScanOperatorDescriptor(JobSpecification spec, HadoopFileSplit[] splits,
+ RecordDescriptor recordDescriptor) {
+ super(spec, 0, 1);
+ recordDescriptors[0] = recordDescriptor;
+ this.splits = splits;
+ }
+
+ protected abstract IRecordReader createRecordReader(HadoopFileSplit fileSplit, RecordDescriptor desc)
+ throws Exception;
+
+ protected class FileScanOperator implements IOpenableDataWriterOperator {
+ private IOpenableDataWriter<Object[]> writer;
+ private int index;
+
+ FileScanOperator(int index) {
+ this.index = index;
+ }
+
+ @Override
+ public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
+ if (index != 0) {
+ throw new IndexOutOfBoundsException("Invalid index: " + index);
+ }
+ this.writer = writer;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ HadoopFileSplit split = splits[index];
+ RecordDescriptor desc = recordDescriptors[0];
+ try {
+ IRecordReader reader = createRecordReader(split, desc);
+ if (desc == null) {
+ desc = recordDescriptors[0];
+ }
+ writer.open();
+ try {
+ while (true) {
+ Object[] record = new Object[desc.getFields().length];
+ if (!reader.read(record)) {
+ break;
+ }
+ writer.writeData(record);
+ }
+ } finally {
+ reader.close();
+ writer.close();
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ // do nothing
+ }
+
+ @Override
+ public void writeData(Object[] data) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ protected Reporter createReporter() {
+ return new Reporter() {
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ return null;
+ }
+
+ @Override
+ public Counter getCounter(String group, String name) {
+ return null;
+ }
+
+ @Override
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ return null;
+ }
+
+ @Override
+ public void incrCounter(Enum<?> key, long amount) {
+
+ }
+
+ @Override
+ public void incrCounter(String group, String counter, long amount) {
+
+ }
+
+ @Override
+ public void progress() {
+
+ }
+
+ @Override
+ public void setStatus(String status) {
+
+ }
+ };
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(partition),
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
new file mode 100644
index 0000000..5744600
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
+
+import edu.uci.ics.dcache.client.DCacheClient;
+import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public abstract class AbstractHadoopOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+
+ protected static class DataWritingOutputCollector<K, V> implements OutputCollector<K, V> {
+ private IDataWriter<Object[]> writer;
+
+ public DataWritingOutputCollector() {
+ }
+
+ public DataWritingOutputCollector(IDataWriter<Object[]> writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public void collect(Object key, Object value) throws IOException {
+ writer.writeData(new Object[] { key, value });
+ }
+
+ public void setWriter(IDataWriter<Object[]> writer) {
+ this.writer = writer;
+ }
+ }
+
+ public static String MAPRED_CACHE_FILES = "mapred.cache.files";
+ public static String MAPRED_CACHE_LOCALFILES = "mapred.cache.localFiles";
+
+ private static final long serialVersionUID = 1L;
+ private final HashMap<String, String> jobConfMap;
+ private IHadoopClassFactory hadoopClassFactory;
+
+ public abstract RecordDescriptor getRecordDescriptor(JobConf jobConf);
+
+ public AbstractHadoopOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor, JobConf jobConf,
+ IHadoopClassFactory hadoopOperatorFactory) {
+ super(spec, 1, 1);
+ jobConfMap = DatatypeHelper.jobConf2HashMap(jobConf);
+ this.hadoopClassFactory = hadoopOperatorFactory;
+ if(recordDescriptor != null){
+ recordDescriptors[0]= recordDescriptor;
+ }else{
+ recordDescriptors[0] = getRecordDescriptor(jobConf);
+ }
+ }
+
+
+ public HashMap<String, String> getJobConfMap() {
+ return jobConfMap;
+ }
+
+
+ public IHadoopClassFactory getHadoopClassFactory() {
+ return hadoopClassFactory;
+ }
+
+
+ public void setHadoopClassFactory(IHadoopClassFactory hadoopClassFactory) {
+ this.hadoopClassFactory = hadoopClassFactory;
+ }
+
+
+ protected Reporter createReporter() {
+ return new Reporter() {
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ return null;
+ }
+
+ @Override
+ public Counter getCounter(String group, String name) {
+ return null;
+ }
+
+
+ @Override
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ return null;
+ }
+
+ @Override
+ public void incrCounter(Enum<?> key, long amount) {
+
+ }
+
+ @Override
+ public void incrCounter(String group, String counter, long amount) {
+
+ }
+
+ @Override
+ public void progress() {
+
+ }
+
+ @Override
+ public void setStatus(String status) {
+
+ }
+ };
+ }
+
+ public JobConf getJobConf() {
+ return DatatypeHelper.hashMap2JobConf(jobConfMap);
+ }
+
+ public void populateCache(JobConf jobConf) {
+ String cache = jobConf.get(MAPRED_CACHE_FILES);
+ System.out.println("cache:" + cache);
+ if (cache == null) {
+ return;
+ }
+ String localCache = jobConf.get(MAPRED_CACHE_LOCALFILES);
+ System.out.println("localCache:" + localCache);
+ if (localCache != null) {
+ return;
+ }
+ localCache = "";
+ StringTokenizer cacheTokenizer = new StringTokenizer(cache, ",");
+ while (cacheTokenizer.hasMoreTokens()) {
+ if (!"".equals(localCache)) {
+ localCache += ",";
+ }
+ try {
+ localCache += DCacheClient.get().get(cacheTokenizer.nextToken());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ jobConf.set(MAPRED_CACHE_LOCALFILES, localCache);
+ System.out.println("localCache:" + localCache);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HDFSWriteOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HDFSWriteOperatorDescriptor.java
new file mode 100644
index 0000000..1e94264
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HDFSWriteOperatorDescriptor.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IRecordWriter;
+import edu.uci.ics.hyracks.dataflow.std.file.RecordWriter;
+
+
+
+public class HDFSWriteOperatorDescriptor extends
+ AbstractFileWriteOperatorDescriptor {
+
+ private static String nullWritableClassName = NullWritable.class.getName();
+
+ private static class HDFSLineWriterImpl extends RecordWriter {
+
+ HDFSLineWriterImpl(FileSystem fileSystem, String hdfsPath, int[] columns, char separator)
+ throws Exception {
+ super(columns,separator,new Object[]{fileSystem,hdfsPath});
+ }
+
+ @Override
+ public OutputStream createOutputStream(Object[] args) throws Exception {
+ FSDataOutputStream fs = ((FileSystem)args[0]).create(new Path((String)args[1]));
+ return fs;
+ }
+
+ @Override
+ public void write(Object[] record) throws Exception {
+ if(!nullWritableClassName.equals((record[0].getClass().getName()))){
+ bufferedWriter.write(String.valueOf(record[0]));
+ }
+ if(!nullWritableClassName.equals((record[1].getClass().getName()))){
+ bufferedWriter.write(separator);
+ bufferedWriter.write(String.valueOf(record[1]));
+ }
+ bufferedWriter.write("\n");
+ }
+ }
+
+ private static class HDFSSequenceWriterImpl extends RecordWriter {
+
+ private Writer writer;
+
+ HDFSSequenceWriterImpl(FileSystem fileSystem, String hdfsPath, Writer writer)
+ throws Exception {
+ super(null,COMMA,new Object[]{fileSystem,hdfsPath});
+ this.writer = writer;
+ }
+
+ @Override
+ public OutputStream createOutputStream(Object[] args) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void write(Object[] record) throws Exception {
+ Object key = record[0];
+ Object value = record[1];
+ writer.append(key, value);
+ }
+
+ }
+
+ private static final long serialVersionUID = 1L;
+ private static final char COMMA = ',';
+ private char separator;
+ private boolean sequenceFileOutput = false;
+ private String keyClassName;
+ private String valueClassName;
+ Map<String,String> jobConfMap;
+
+
+ @Override
+ protected IRecordWriter createRecordWriter(File file,int index) throws Exception {
+ JobConf conf = DatatypeHelper.hashMap2JobConf((HashMap)jobConfMap);
+ System.out.println("replication:" + conf.get("dfs.replication"));
+ FileSystem fileSystem = null;
+ try{
+ fileSystem = FileSystem.get(conf);
+ }catch(IOException ioe){
+ ioe.printStackTrace();
+ }
+ Path path = new Path(file.getAbsolutePath());
+ checkIfCanWriteToHDFS(new FileSplit[]{new FileSplit("localhost",file)});
+ FSDataOutputStream outputStream = fileSystem.create(path);
+ outputStream.close();
+ if(sequenceFileOutput){
+ Class keyClass = Class.forName(keyClassName);
+ Class valueClass= Class.forName(valueClassName);
+ conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
+ Writer writer = SequenceFile.createWriter(fileSystem, conf,path, keyClass, valueClass);
+ return new HDFSSequenceWriterImpl(fileSystem, file.getAbsolutePath(), writer);
+ }else{
+ return new HDFSLineWriterImpl(fileSystem, file.getAbsolutePath(), null, COMMA);
+ }
+ }
+
+ private boolean checkIfCanWriteToHDFS(FileSplit[] fileSplits) throws Exception{
+ boolean canWrite = true;
+ JobConf conf = DatatypeHelper.hashMap2JobConf((HashMap)jobConfMap);
+ FileSystem fileSystem = null;
+ try{
+ fileSystem = FileSystem.get(conf);
+ for(FileSplit fileSplit : fileSplits){
+ Path path = new Path(fileSplit.getLocalFile().getAbsolutePath());
+ canWrite = !fileSystem.exists(path);
+ if(!canWrite){
+ throw new Exception(" Output path : already exists");
+ }
+ }
+ }catch(IOException ioe){
+ ioe.printStackTrace();
+ throw ioe;
+ }
+ return canWrite;
+ }
+
+
+ public HDFSWriteOperatorDescriptor(JobSpecification jobSpec,Map<String,String> jobConfMap, FileSplit[] fileSplits,char seperator,boolean sequenceFileOutput,String keyClassName, String valueClassName) throws Exception{
+ super(jobSpec,fileSplits);
+ this.jobConfMap = jobConfMap;
+ checkIfCanWriteToHDFS(fileSplits);
+ this.separator = seperator;
+ this.sequenceFileOutput = sequenceFileOutput;
+ this.keyClassName = keyClassName;
+ this.valueClassName = valueClassName;
+ }
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
new file mode 100644
index 0000000..8397d86
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
+import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
+
+public class HadoopMapperOperatorDescriptor<K1, V1, K2, V2> extends AbstractHadoopOperatorDescriptor {
+ private class MapperOperator implements IOpenableDataWriterOperator {
+ private OutputCollector<K2, V2> output;
+ private Reporter reporter;
+ private Mapper<K1, V1, K2, V2> mapper;
+ private IOpenableDataWriter<Object[]> writer;
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ mapper.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ writer.close();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ JobConf jobConf = getJobConf();
+ populateCache(jobConf);
+ try {
+ mapper = createMapper();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ // -- - configure - --
+ mapper.configure(jobConf);
+ writer.open();
+ output = new DataWritingOutputCollector<K2, V2>(writer);
+ reporter = createReporter();
+ }
+
+ @Override
+ public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
+ if (index != 0) {
+ throw new IllegalArgumentException();
+ }
+ this.writer = writer;
+ }
+
+ @Override
+ public void writeData(Object[] data) throws HyracksDataException {
+ try {
+ mapper.map((K1) data[0], (V1) data[1], output, reporter);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ private static final long serialVersionUID = 1L;
+ private static final String mapClassNameKey = "mapred.mapper.class";
+ private Class<? extends Mapper> mapperClass;
+
+ public HadoopMapperOperatorDescriptor(JobSpecification spec, Class<? extends Mapper> mapperClass,
+ RecordDescriptor recordDescriptor, JobConf jobConf) {
+ super(spec, recordDescriptor, jobConf, null);
+ this.mapperClass = mapperClass;
+ }
+
+ public HadoopMapperOperatorDescriptor(JobSpecification spec, JobConf jobConf, IHadoopClassFactory hadoopClassFactory) {
+ super(spec, null, jobConf, hadoopClassFactory);
+ }
+
+ public RecordDescriptor getRecordDescriptor(JobConf conf) {
+ RecordDescriptor recordDescriptor = null;
+ String mapOutputKeyClassName = conf.getMapOutputKeyClass().getName();
+ String mapOutputValueClassName = conf.getMapOutputValueClass().getName();
+ try {
+ if (getHadoopClassFactory() == null) {
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+ (Class<? extends Writable>) Class.forName(mapOutputKeyClassName),
+ (Class<? extends Writable>) Class.forName(mapOutputValueClassName));
+ } else {
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+ (Class<? extends Writable>) getHadoopClassFactory().loadClass(mapOutputKeyClassName),
+ (Class<? extends Writable>) getHadoopClassFactory().loadClass(mapOutputValueClassName));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return recordDescriptor;
+ }
+
+ private Mapper<K1, V1, K2, V2> createMapper() throws Exception {
+ if (mapperClass != null) {
+ return mapperClass.newInstance();
+ } else {
+ String mapperClassName = super.getJobConfMap().get(mapClassNameKey);
+ Object mapper = getHadoopClassFactory().createMapper(mapperClassName);
+ mapperClass = (Class<? extends Mapper>) mapper.getClass();
+ return (Mapper) mapper;
+ }
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new DeserializedOperatorNodePushable(ctx, new MapperOperator(),
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+ }
+
+ public Class<? extends Mapper> getMapperClass() {
+ return mapperClass;
+ }
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
new file mode 100644
index 0000000..288fdbd
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
@@ -0,0 +1,236 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.HadoopAdapter;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.HadoopFileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IRecordReader;
+
+public class HadoopReadOperatorDescriptor extends AbstractHadoopFileScanOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private String inputFormatClassName;
+ private Map<String, String> jobConfMap;
+
+ private static class HDFSCustomReader implements IRecordReader {
+ private RecordReader hadoopRecordReader;
+ private Class inputKeyClass;
+ private Class inputValueClass;
+ private Object key;
+ private Object value;
+
+ public HDFSCustomReader(Map<String, String> jobConfMap, HadoopFileSplit inputSplit,
+ String inputFormatClassName, Reporter reporter) {
+ try {
+ JobConf conf = DatatypeHelper.hashMap2JobConf((HashMap) jobConfMap);
+ FileSystem fileSystem = null;
+ try {
+ fileSystem = FileSystem.get(conf);
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+
+ Class inputFormatClass = Class.forName(inputFormatClassName);
+ InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
+ hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(getFileSplit(inputSplit), conf,
+ reporter);
+ if (hadoopRecordReader instanceof SequenceFileRecordReader) {
+ inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader).getKeyClass();
+ inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader).getValueClass();
+ } else {
+ inputKeyClass = hadoopRecordReader.createKey().getClass();
+ inputValueClass = hadoopRecordReader.createValue().getClass();
+ }
+ key = inputKeyClass.newInstance();
+ value = inputValueClass.newInstance();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public Class getInputKeyClass() {
+ return inputKeyClass;
+ }
+
+ public void setInputKeyClass(Class inputKeyClass) {
+ this.inputKeyClass = inputKeyClass;
+ }
+
+ public Class getInputValueClass() {
+ return inputValueClass;
+ }
+
+ public void setInputValueClass(Class inputValueClass) {
+ this.inputValueClass = inputValueClass;
+ }
+
+ @Override
+ public void close() {
+ try {
+ hadoopRecordReader.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public boolean read(Object[] record) throws Exception {
+ if (!hadoopRecordReader.next(key, value)) {
+ return false;
+ }
+ if (record.length == 1) {
+ record[0] = value;
+ } else {
+ record[0] = key;
+ record[1] = value;
+ }
+ return true;
+ }
+
+ private FileSplit getFileSplit(HadoopFileSplit hadoopFileSplit) {
+ FileSplit fileSplit = new FileSplit(new Path(hadoopFileSplit.getFile()), hadoopFileSplit.getStart(),
+ hadoopFileSplit.getLength(), hadoopFileSplit.getHosts());
+ return fileSplit;
+ }
+ }
+
+ public HadoopReadOperatorDescriptor(Map<String, String> jobConfMap, JobSpecification spec,
+ HadoopFileSplit[] splits, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+ super(spec, splits, recordDescriptor);
+ this.inputFormatClassName = inputFormatClassName;
+ this.jobConfMap = jobConfMap;
+ }
+
+ public HadoopReadOperatorDescriptor(Map<String, String> jobConfMap, InetSocketAddress nameNode,
+ JobSpecification spec, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+ super(spec, null, recordDescriptor);
+ this.inputFormatClassName = inputFormatClassName;
+ this.jobConfMap = jobConfMap;
+ }
+
+ // public HadoopReadOperatorDescriptor(IClusterController clusterController, Map<String, String> jobConfMap,
+ // JobSpecification spec, String fileSystemURL, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+ // super(spec, null, recordDescriptor);
+ // HadoopAdapter hadoopAdapter = HadoopAdapter.getInstance(fileSystemURL);
+ // String inputPathString = jobConfMap.get("mapred.input.dir");
+ // String[] inputPaths = inputPathString.split(",");
+ // Map<String, List<HadoopFileSplit>> blocksToRead = hadoopAdapter.getInputSplits(inputPaths);
+ // List<HadoopFileSplit> hadoopFileSplits = new ArrayList<HadoopFileSplit>();
+ // for (String filePath : blocksToRead.keySet()) {
+ // hadoopFileSplits.addAll(blocksToRead.get(filePath));
+ // }
+ // for (HadoopFileSplit hadoopFileSplit : hadoopFileSplits) {
+ // System.out.println(" Hadoop File Split : " + hadoopFileSplit);
+ // }
+ // super.splits = hadoopFileSplits.toArray(new HadoopFileSplit[] {});
+ // configurePartitionConstraints(clusterController, blocksToRead);
+ // this.inputFormatClassName = inputFormatClassName;
+ // this.jobConfMap = jobConfMap;
+ // }
+
+ // private void configurePartitionConstraints(IClusterController clusterController,
+ // Map<String, List<HadoopFileSplit>> blocksToRead) {
+ // List<LocationConstraint> locationConstraints = new ArrayList<LocationConstraint>();
+ // Map<String, INodeController> registry = null;
+ // try {
+ // // registry = clusterController.getRegistry();
+ // // TODO
+ // } catch (Exception e) {
+ // e.printStackTrace();
+ // }
+ // Map<String, String> hostnameToNodeIdMap = new HashMap<String, String>();
+ // NCConfig ncConfig = null;
+ // for (String nodeId : registry.keySet()) {
+ // try {
+ // ncConfig = ((INodeController) registry.get(nodeId)).getConfiguration();
+ // String ipAddress = ncConfig.dataIPAddress;
+ // String hostname = InetAddress.getByName(ipAddress).getHostName();
+ // System.out.println(" hostname :" + hostname + " nodeid:" + nodeId);
+ // hostnameToNodeIdMap.put(hostname, nodeId);
+ // } catch (Exception e) {
+ // e.printStackTrace();
+ // }
+ // }
+ //
+ // for (String filePath : blocksToRead.keySet()) {
+ // List<HadoopFileSplit> hadoopFileSplits = blocksToRead.get(filePath);
+ // for (HadoopFileSplit hadoopFileSplit : hadoopFileSplits) {
+ // String hostname = hadoopFileSplit.getHosts()[0];
+ // System.out.println("host name is :" + hostname);
+ // InetAddress address = null;
+ // try {
+ // address = InetAddress.getByName(hostname);
+ // if (address.isLoopbackAddress()) {
+ // Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
+ // while (netInterfaces.hasMoreElements()) {
+ // NetworkInterface ni = netInterfaces.nextElement();
+ // InetAddress inetAddress = (InetAddress) ni.getInetAddresses().nextElement();
+ // if (!inetAddress.isLoopbackAddress()) {
+ // address = inetAddress;
+ // break;
+ // }
+ // }
+ // }
+ // hostname = address.getHostName();
+ // System.out.println("cannonical host name hyracks :" + hostname);
+ // } catch (Exception e) {
+ // e.printStackTrace();
+ // }
+ // String nodeId = hostnameToNodeIdMap.get(hostname);
+ // System.out.println(" corresponding node id is :" + nodeId);
+ // LocationConstraint locationConstraint = new AbsoluteLocationConstraint(nodeId);
+ // locationConstraints.add(locationConstraint);
+ // }
+ // }
+ //
+ // PartitionConstraint partitionConstraint = new ExplicitPartitionConstraint(locationConstraints
+ // .toArray(new LocationConstraint[] {}));
+ // this.setPartitionConstraint(partitionConstraint);
+ // }
+
+ @Override
+ protected IRecordReader createRecordReader(HadoopFileSplit fileSplit, RecordDescriptor desc) throws Exception {
+ Reporter reporter = createReporter();
+ IRecordReader recordReader = new HDFSCustomReader(jobConfMap, fileSplit, inputFormatClassName, reporter);
+ return recordReader;
+ }
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
new file mode 100644
index 0000000..1f9cba8
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -0,0 +1,267 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IDataReader;
+import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.KeyComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.ClasspathBasedHadoopClassFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
+import edu.uci.ics.hyracks.dataflow.std.group.IGroupAggregator;
+import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperator;
+import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
+
+public class HadoopReducerOperatorDescriptor<K2, V2, K3, V3> extends AbstractHadoopOperatorDescriptor {
+ private class ReducerAggregator implements IGroupAggregator {
+ private Reducer<K2, V2, K3, V3> reducer;
+ private DataWritingOutputCollector<K3, V3> output;
+ private Reporter reporter;
+
+ public ReducerAggregator(Reducer<K2, V2, K3, V3> reducer) {
+ this.reducer = reducer;
+ reducer.configure(getJobConf());
+ output = new DataWritingOutputCollector<K3, V3>();
+ reporter = new Reporter() {
+
+ @Override
+ public void progress() {
+
+ }
+
+ @Override
+ public void setStatus(String arg0) {
+
+ }
+
+ @Override
+ public void incrCounter(String arg0, String arg1, long arg2) {
+
+ }
+
+ @Override
+ public void incrCounter(Enum<?> arg0, long arg1) {
+
+ }
+
+ @Override
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ return null;
+ }
+
+ @Override
+ public Counter getCounter(String arg0, String arg1) {
+ return null;
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> arg0) {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public void aggregate(IDataReader<Object[]> reader, IDataWriter<Object[]> writer) throws HyracksDataException {
+
+ ValueIterator i = new ValueIterator();
+ i.reset(reader);
+ output.setWriter(writer);
+ try {
+
+ // -- - reduce - --
+ reducer.reduce(i.getKey(), i, output, reporter);
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ // -- - close - --
+ try {
+ reducer.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ private class ValueIterator implements Iterator<V2> {
+ private IDataReader<Object[]> reader;
+ private K2 key;
+ private V2 value;
+
+ public K2 getKey() {
+ return key;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (value == null) {
+ Object[] tuple;
+ try {
+ tuple = reader.readData();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (tuple != null) {
+ value = (V2) tuple[1];
+ }
+ }
+ return value != null;
+ }
+
+ @Override
+ public V2 next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ V2 v = value;
+ value = null;
+ return v;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ void reset(IDataReader<Object[]> reader) {
+ this.reader = reader;
+ try {
+ Object[] tuple = reader.readData();
+ key = (K2) tuple[0];
+ value = (V2) tuple[1];
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static final long serialVersionUID = 1L;
+ private Class<? extends Reducer> reducerClass;
+ private static final String reducerClassKey = "mapred.reducer.class";
+ private static final String comparatorClassKey = "mapred.output.value.groupfn.class";
+ private IComparatorFactory comparatorFactory;
+
+ public HadoopReducerOperatorDescriptor(JobSpecification spec, IComparatorFactory comparatorFactory,
+ Class<? extends Reducer> reducerClass, RecordDescriptor recordDescriptor, JobConf jobConf) {
+ super(spec, recordDescriptor, jobConf, new ClasspathBasedHadoopClassFactory());
+ this.comparatorFactory = comparatorFactory;
+ this.reducerClass = reducerClass;
+ }
+
+ public HadoopReducerOperatorDescriptor(JobSpecification spec, JobConf conf, IHadoopClassFactory classFactory) {
+ super(spec, null, conf, classFactory);
+ }
+
+ private Reducer<K2, V2, K3, V3> createReducer() throws Exception {
+ if (reducerClass != null) {
+ return reducerClass.newInstance();
+ } else {
+ Object reducer = getHadoopClassFactory().createReducer(getJobConfMap().get(reducerClassKey));
+ reducerClass = (Class<? extends Reducer>) reducer.getClass();
+ return (Reducer) reducer;
+ }
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ try {
+ if (this.comparatorFactory == null) {
+ String comparatorClassName = getJobConfMap().get(comparatorClassKey);
+ RawComparator rawComparator = null;
+ if (comparatorClassName != null) {
+ Class comparatorClazz = getHadoopClassFactory().loadClass(comparatorClassName);
+ this.comparatorFactory = new KeyComparatorFactory(comparatorClazz);
+
+ } else {
+ String mapOutputKeyClass = getJobConfMap().get("mapred.mapoutput.key.class");
+ if (getHadoopClassFactory() != null) {
+ rawComparator = WritableComparator.get(getHadoopClassFactory().loadClass(mapOutputKeyClass));
+ } else {
+ rawComparator = WritableComparator.get((Class<? extends WritableComparable>) Class
+ .forName(mapOutputKeyClass));
+ }
+ this.comparatorFactory = new WritableComparingComparatorFactory(rawComparator.getClass());
+ }
+ }
+ IOpenableDataWriterOperator op = new PreclusteredGroupOperator(new int[] { 0 },
+ new IComparator[] { comparatorFactory.createComparator() }, new ReducerAggregator(createReducer()));
+ return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
+ getOperatorId(), 0));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Class<? extends Reducer> getReducerClass() {
+ return reducerClass;
+ }
+
+ public void setReducerClass(Class<? extends Reducer> reducerClass) {
+ this.reducerClass = reducerClass;
+ }
+
+ @Override
+ public RecordDescriptor getRecordDescriptor(JobConf conf) {
+ String outputKeyClassName = conf.get("mapred.output.key.class");
+ String outputValueClassName = conf.get("mapred.output.value.class");
+ RecordDescriptor recordDescriptor = null;
+ try {
+ if (getHadoopClassFactory() == null) {
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+ (Class<? extends Writable>) Class.forName(outputKeyClassName),
+ (Class<? extends Writable>) Class.forName(outputValueClassName));
+ } else {
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+ (Class<? extends Writable>) getHadoopClassFactory().loadClass(outputKeyClassName),
+ (Class<? extends Writable>) getHadoopClassFactory().loadClass(outputValueClassName));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ }
+ return recordDescriptor;
+ }
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/AbstractClassBasedDelegate.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/AbstractClassBasedDelegate.java
new file mode 100644
index 0000000..666cc21
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/AbstractClassBasedDelegate.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.data;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+
+public class AbstractClassBasedDelegate<T> implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private Class<? extends T> klass;
+ protected transient T instance;
+
+ public AbstractClassBasedDelegate(Class<? extends T> klass) {
+ this.klass = klass;
+ init();
+ }
+
+ protected Object readResolve() throws ObjectStreamException {
+ init();
+ return this;
+ }
+
+ private void init() {
+ try {
+ instance = klass.newInstance();
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopHashTuplePartitionComputerFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopHashTuplePartitionComputerFactory.java
new file mode 100644
index 0000000..98da69c
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopHashTuplePartitionComputerFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.data;
+
+import java.io.DataInputStream;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+
+public class HadoopHashTuplePartitionComputerFactory<K extends Writable> implements ITuplePartitionComputerFactory {
+ private static final long serialVersionUID = 1L;
+ private final ISerializerDeserializer<K> keyIO;
+
+ public HadoopHashTuplePartitionComputerFactory(ISerializerDeserializer<K> keyIO) {
+ this.keyIO = keyIO;
+ }
+
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ return new ITuplePartitionComputer() {
+ private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+ private final DataInputStream dis = new DataInputStream(bbis);
+
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+ int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, 0);
+ bbis.setByteBuffer(accessor.getBuffer(), keyStart);
+ K key = keyIO.deserialize(dis);
+ int h = key.hashCode();
+ if (h < 0) {
+ h = -h;
+ }
+ return h % nParts;
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopPartitionerTuplePartitionComputerFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopPartitionerTuplePartitionComputerFactory.java
new file mode 100644
index 0000000..81d7c6d
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopPartitionerTuplePartitionComputerFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.data;
+
+import java.io.DataInputStream;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Partitioner;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+
+public class HadoopPartitionerTuplePartitionComputerFactory<K extends Writable, V extends Writable> extends
+ AbstractClassBasedDelegate<Partitioner<K, V>> implements ITuplePartitionComputerFactory {
+ private static final long serialVersionUID = 1L;
+ private final ISerializerDeserializer<K> keyIO;
+ private final ISerializerDeserializer<V> valueIO;
+
+ public HadoopPartitionerTuplePartitionComputerFactory(Class<? extends Partitioner<K, V>> klass,
+ ISerializerDeserializer<K> keyIO, ISerializerDeserializer<V> valueIO) {
+ super(klass);
+ this.keyIO = keyIO;
+ this.valueIO = valueIO;
+ }
+
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ return new ITuplePartitionComputer() {
+ private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+ private final DataInputStream dis = new DataInputStream(bbis);
+
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+ int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, 0);
+ bbis.setByteBuffer(accessor.getBuffer(), keyStart);
+ K key = keyIO.deserialize(dis);
+ int valueStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, 1);
+ bbis.setByteBuffer(accessor.getBuffer(), valueStart);
+ V value = valueIO.deserialize(dis);
+ return instance.getPartition(key, value, nParts);
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/KeyBinaryComparatorFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/KeyBinaryComparatorFactory.java
new file mode 100644
index 0000000..ad8a9e8
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/KeyBinaryComparatorFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.data;
+
+import org.apache.hadoop.io.RawComparator;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.util.ReflectionUtils;
+
+public class KeyBinaryComparatorFactory<T> implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private Class<? extends RawComparator<T>> cmpClass;
+
+ public KeyBinaryComparatorFactory(Class<? extends RawComparator<T>> cmpClass) {
+ this.cmpClass = cmpClass;
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ final RawComparator<T> instance = ReflectionUtils.createInstance(cmpClass);
+ return new IBinaryComparator() {
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return instance.compare(b1, s1, l1, b2, s2, l2);
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/KeyComparatorFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/KeyComparatorFactory.java
new file mode 100644
index 0000000..cba86d7
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/KeyComparatorFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.data;
+
+import org.apache.hadoop.io.RawComparator;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.util.ReflectionUtils;
+
+public class KeyComparatorFactory<T> implements IComparatorFactory<T> {
+ private static final long serialVersionUID = 1L;
+ private Class<? extends RawComparator<T>> cmpClass;
+
+ public KeyComparatorFactory(Class<? extends RawComparator<T>> cmpClass) {
+ this.cmpClass = cmpClass;
+ }
+
+ @Override
+ public IComparator<T> createComparator() {
+ final RawComparator<T> instance = ReflectionUtils.createInstance(cmpClass);
+ return new IComparator<T>() {
+ @Override
+ public int compare(T o1, T o2) {
+ return instance.compare(o1, o2);
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java
new file mode 100644
index 0000000..d06d6ff
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.data;
+
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.util.ReflectionUtils;
+
+public class WritableComparingBinaryComparatorFactory<T> implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private Class<? extends WritableComparator> cmpClass;
+
+ public WritableComparingBinaryComparatorFactory(Class<? extends WritableComparator> cmpClass) {
+ this.cmpClass = cmpClass;
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ final WritableComparator instance = ReflectionUtils.createInstance(cmpClass);
+ return new IBinaryComparator() {
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return instance.compare(b1, s1, l1, b2, s2, l2);
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingComparatorFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingComparatorFactory.java
new file mode 100644
index 0000000..c78648f
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingComparatorFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.data;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.util.ReflectionUtils;
+
+public class WritableComparingComparatorFactory<T> implements IComparatorFactory<WritableComparable<T>> {
+ private Class<? extends RawComparator> klass;
+
+ public WritableComparingComparatorFactory(Class<? extends WritableComparator> klass) {
+ this.klass = klass;
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IComparator<WritableComparable<T>> createComparator() {
+ final RawComparator<WritableComparable<T>> instance = ReflectionUtils.createInstance(klass);
+ return new IComparator<WritableComparable<T>>() {
+ @Override
+ public int compare(WritableComparable<T> o1, WritableComparable<T> o2) {
+ return instance.compare(o1, o2);
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
new file mode 100644
index 0000000..56ddbe5
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.util;
+
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+
+public class ClasspathBasedHadoopClassFactory implements IHadoopClassFactory {
+
+ @Override
+ public Mapper createMapper(String mapClassName) throws Exception {
+ Class clazz = loadClass(mapClassName);
+ return (Mapper)clazz.newInstance();
+ }
+
+ @Override
+ public Reducer createReducer(String reduceClassName) throws Exception {
+ Class clazz = loadClass(reduceClassName);
+ return (Reducer)clazz.newInstance();
+ }
+
+ @Override
+ public Class loadClass(String className) throws Exception {
+ Class clazz = Class.forName(className);
+ return clazz;
+ }
+
+
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DatatypeHelper.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DatatypeHelper.java
new file mode 100644
index 0000000..7eea853
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DatatypeHelper.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+@SuppressWarnings("deprecation")
+public class DatatypeHelper {
+ private static final class WritableSerializerDeserializer<T extends Writable> implements ISerializerDeserializer<T> {
+ private static final long serialVersionUID = 1L;
+
+ private Class<T> clazz;
+
+ private WritableSerializerDeserializer(Class<T> clazz) {
+ this.clazz = clazz;
+ }
+
+ private T createInstance() throws HyracksDataException {
+ // TODO remove "if", create a new WritableInstanceOperations class
+ // that deals with Writables that don't have public constructors
+ if (NullWritable.class.equals(clazz)) {
+ return (T) NullWritable.get();
+ }
+ try {
+ return clazz.newInstance();
+ } catch (InstantiationException e) {
+ throw new HyracksDataException(e);
+ } catch (IllegalAccessException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public T deserialize(DataInput in) throws HyracksDataException {
+ T o = createInstance();
+ try {
+ o.readFields(in);
+ } catch (IOException e) {
+ e.printStackTrace();
+ // throw new HyracksDataException(e);
+ }
+ return o;
+ }
+
+ @Override
+ public void serialize(T instance, DataOutput out) throws HyracksDataException {
+ try {
+ instance.write(out);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ public static ISerializerDeserializer<? extends Writable> createSerializerDeserializer(
+ Class<? extends Writable> fClass) {
+ return new WritableSerializerDeserializer(fClass);
+ }
+
+ public static RecordDescriptor createKeyValueRecordDescriptor(Class<? extends Writable> keyClass,
+ Class<? extends Writable> valueClass) {
+ ISerializerDeserializer[] fields = new ISerializerDeserializer[2];
+ fields[0] = createSerializerDeserializer(keyClass);
+ fields[1] = createSerializerDeserializer(valueClass);
+ return new RecordDescriptor(fields);
+ }
+
+ public static RecordDescriptor createOneFieldRecordDescriptor(Class<? extends Writable> fieldClass) {
+ ISerializerDeserializer[] fields = new ISerializerDeserializer[1];
+ fields[0] = createSerializerDeserializer(fieldClass);
+ return new RecordDescriptor(fields);
+ }
+
+ public static JobConf hashMap2JobConf(HashMap<String, String> jobConfMap) {
+ JobConf jobConf;
+ synchronized (Configuration.class) {
+ jobConf = new JobConf();
+ for (Entry<String, String> entry : jobConfMap.entrySet()) {
+ jobConf.set(entry.getKey(), entry.getValue());
+ }
+ }
+ return jobConf;
+ }
+
+ public static HashMap<String, String> jobConf2HashMap(JobConf jobConf) {
+ HashMap<String, String> jobConfMap = new HashMap<String, String>();
+ for (Entry<String, String> entry : jobConf) {
+ jobConfMap.put(entry.getKey(), entry.getValue());
+ }
+ return jobConfMap;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DuplicateKeyMapper.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DuplicateKeyMapper.java
new file mode 100644
index 0000000..0003076
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DuplicateKeyMapper.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.util;
+
+import java.util.Properties;
+
+import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.map.IMapper;
+
+public class DuplicateKeyMapper implements IMapper {
+
+ @Override
+ public void map(Object[] data, IDataWriter<Object[]> writer) throws HyracksDataException {
+ writer.writeData(new Object[] { data[0], data[1], data[0] });
+
+ }
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopAdapter.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopAdapter.java
new file mode 100644
index 0000000..0c984bc
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopAdapter.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
+
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+
+public class HadoopAdapter {
+
+ private static ClientProtocol namenode;
+ private static FileSystem fileSystem;
+ private static JobConf jobConf;
+ private static HadoopAdapter instance;
+
+ public static final String DFS_DATA_DIR = "dfs.data.dir";
+ public static final String FS_DEFAULT_NAME = "fs.default.name";
+ public static final String DFS_REPLICATION = "dfs.replication";
+
+ public static HadoopAdapter getInstance(String fileSystemURL){
+ if(instance == null){
+ jobConf = new JobConf(true);
+ String [] urlParts = parseURL(fileSystemURL);
+ jobConf.set(FS_DEFAULT_NAME, fileSystemURL);
+ instance = new HadoopAdapter(new InetSocketAddress(urlParts[1], Integer.parseInt(urlParts[2])));
+ }
+ return instance;
+ }
+
+ public static JobConf getConf() {
+ return jobConf;
+ }
+
+ private HadoopAdapter (InetSocketAddress address){
+ try{
+ this.namenode = getNameNode(address);
+ fileSystem = FileSystem.get(jobConf);
+ }catch(IOException ioe){
+ ioe.printStackTrace();
+ }
+ }
+
+ private static String[] parseURL(String urlString){
+ String[] urlComponents = urlString.split(":");
+ urlComponents[1] = urlComponents[1].substring(2);
+ return urlComponents;
+ }
+
+
+ public Map<String,List<HadoopFileSplit>> getInputSplits(String[] inputPaths){
+ List<HadoopFileSplit> hadoopFileSplits = new ArrayList<HadoopFileSplit>();
+ Path[] paths = new Path[inputPaths.length];
+ int index =0;
+ for(String inputPath : inputPaths){
+ paths[index++] = new Path(StringUtils.unEscapeString(inputPaths[0]));
+ }
+ Map<String,List<HadoopFileSplit>> fileSplitInfo = getBlocksToRead(paths);
+ return fileSplitInfo;
+ }
+
+ private static Map<String,List<HadoopFileSplit>> getBlocksToRead(Path[] inputPaths){
+ Map<String,List<HadoopFileSplit>> hadoopFileSplitMap = new HashMap<String,List<HadoopFileSplit>>();
+ for (int i=0;i<inputPaths.length;i++){
+ try{
+ String absolutePathPrint = getAbsolutePath(inputPaths[i]);
+ FileStatus[] fileStatuses = namenode.getListing(absolutePathPrint);
+ for(int j=0;j<fileStatuses.length;j++){
+ Path filePath = fileStatuses[j].getPath();
+ String absolutePath = getAbsolutePath(filePath);
+ List<HadoopFileSplit> fileSplits = getFileBlocks(absolutePath,fileStatuses[j]);
+ if(fileSplits!=null && fileSplits.size() > 0){
+ hadoopFileSplitMap.put(absolutePath, fileSplits);
+ }
+ }
+ }catch(IOException ioe){
+ ioe.printStackTrace();
+ }
+
+ }
+ return hadoopFileSplitMap;
+ }
+
+ private static ClientProtocol getNameNode(InetSocketAddress address) throws IOException{
+ return (ClientProtocol)getProtocol(ClientProtocol.class, address, new JobConf());
+ }
+
+ private static String getAbsolutePath(Path path){
+ StringBuffer absolutePath = new StringBuffer();
+ List<String> ancestorPath = new ArrayList<String>();
+ Path pathElement=path;
+ while(pathElement != null){
+ ancestorPath.add(0, pathElement.getName());
+ pathElement = pathElement.getParent();
+ }
+ ancestorPath.remove(0);
+ for(String s : ancestorPath){
+ absolutePath.append("/");
+ absolutePath.append(s);
+ }
+ return new String(absolutePath);
+ }
+
+ private static VersionedProtocol getProtocol(Class protocolClass, InetSocketAddress inetAddress, JobConf jobConf) throws IOException{
+ VersionedProtocol versionedProtocol = RPC.getProxy(protocolClass, ClientProtocol.versionID, inetAddress, jobConf);
+ return versionedProtocol;
+ }
+
+ private static List<HadoopFileSplit> getFileBlocks(String absolutePath,FileStatus fileStatus){
+ List<HadoopFileSplit> hadoopFileSplits = new ArrayList<HadoopFileSplit>();
+ try{
+ LocatedBlocks locatedBlocks = namenode.getBlockLocations(absolutePath, 0, fileStatus.getLen());
+ long blockSize = fileSystem.getBlockSize(new Path(absolutePath));
+ if(locatedBlocks !=null){
+ int index = 0;
+ for(LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()){
+ DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); // all datanodes having this block
+ String [] hostnames = new String[datanodeInfos.length];
+ int datanodeCount =0;
+ for(DatanodeInfo datanodeInfo : datanodeInfos){
+ hostnames[datanodeCount++] = datanodeInfo.getHostName();
+ }
+ HadoopFileSplit hadoopFileSplit = new HadoopFileSplit(absolutePath,new Long(index * blockSize).longValue(),new Long(blockSize).longValue(),hostnames);
+ hadoopFileSplits.add(hadoopFileSplit);
+ index++;
+ }
+ }
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+ return hadoopFileSplits;
+ }
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopFileSplit.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopFileSplit.java
new file mode 100644
index 0000000..6e7b76c
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopFileSplit.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.util;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.fs.Path;
+
+public class HadoopFileSplit implements Serializable{
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private String filePath;
+ private long start;
+ private long length;
+ private String[] hosts;
+
+ public HadoopFileSplit(String filePath, long start, long length, String[] hosts){
+ this.filePath = filePath;
+ this.start = start;
+ this.length = length;
+ this.hosts = hosts;
+ }
+
+ public String getFile() {
+ return filePath;
+ }
+
+ public void setFile(String file) {
+ this.filePath = file;
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public void setStart(long start) {
+ this.start = start;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public void setLength(long length) {
+ this.length = length;
+ }
+
+ public String[] getHosts() {
+ return hosts;
+ }
+
+ public void setHosts(String[] hosts) {
+ this.hosts = hosts;
+ }
+
+ public String toString(){
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(filePath + " " + start + " " + length + "\n");
+ for(String host : hosts){
+ stringBuilder.append(host);
+ stringBuilder.append(",");
+ }
+ return new String(stringBuilder);
+ }
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
new file mode 100644
index 0000000..d465471
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.util;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+
+public interface IHadoopClassFactory extends Serializable{
+
+ public Mapper createMapper(String mapClassName) throws Exception;
+
+ public Reducer createReducer(String reduceClassName) throws Exception;
+
+ public Class loadClass(String className) throws Exception;
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/PreappendLongWritableMapper.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/PreappendLongWritableMapper.java
new file mode 100644
index 0000000..05be19e
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/PreappendLongWritableMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.util;
+
+import java.util.Properties;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.map.IMapper;
+
+public class PreappendLongWritableMapper implements IMapper {
+
+ @Override
+ public void map(Object[] data, IDataWriter<Object[]> writer) throws HyracksDataException {
+ writer.writeData(new Object[] { new LongWritable(0), new Text(String.valueOf(data[0])) });
+ }
+}