Remove Unused / Historical Hyracks Modules

Change-Id: Iaa058eb7c73696e1ead2c05c1ee34dbe9055ce52
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1714
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/pom.xml b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/pom.xml
deleted file mode 100644
index cd0b30a..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/pom.xml
+++ /dev/null
@@ -1,85 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements.  See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership.  The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License.  You may obtain a copy of the License at
- !
- !   http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied.  See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>hyracks-dataflow-hadoop</artifactId>
-  <name>hyracks-dataflow-hadoop</name>
-
-  <parent>
-    <groupId>org.apache.hyracks</groupId>
-    <artifactId>hyracks</artifactId>
-    <version>0.2.18-SNAPSHOT</version>
-  </parent>
-
-  <licenses>
-    <license>
-      <name>Apache License, Version 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-      <distribution>repo</distribution>
-      <comments>A business-friendly OSS license</comments>
-    </license>
-  </licenses>
-
-  <properties>
-    <root.dir>${basedir}/../..</root.dir>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-api</artifactId>
-      <version>${project.version}</version>
-      <type>jar</type>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-dataflow-common</artifactId>
-      <version>${project.version}</version>
-      <type>jar</type>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-hdfs-2.x</artifactId>
-      <version>${project.version}</version>
-      <type>jar</type>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
-      <type>jar</type>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>edu.uci.ics.dcache</groupId>
-      <artifactId>dcache-client</artifactId>
-      <version>0.0.1</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-dataflow-std</artifactId>
-      <version>${project.version}</version>
-      <scope>compile</scope>
-    </dependency>
-  </dependencies>
-</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
deleted file mode 100644
index 226250e..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-import edu.uci.ics.dcache.client.DCacheClient;
-import org.apache.hyracks.api.dataflow.IDataWriter;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper;
-import org.apache.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-
-public abstract class AbstractHadoopOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
-    protected transient JobConf jobConf;
-
-    protected static class DataWritingOutputCollector<K, V> implements OutputCollector<K, V> {
-        private IDataWriter<Object[]> writer;
-
-        public DataWritingOutputCollector() {
-        }
-
-        public DataWritingOutputCollector(IDataWriter<Object[]> writer) {
-            this.writer = writer;
-        }
-
-        @Override
-        public void collect(Object key, Object value) throws IOException {
-            writer.writeData(new Object[] { key, value });
-        }
-
-        public void setWriter(IDataWriter<Object[]> writer) {
-            this.writer = writer;
-        }
-    }
-
-    public static String MAPRED_CACHE_FILES = "mapred.cache.files";
-    public static String MAPRED_CACHE_LOCALFILES = "mapred.cache.localFiles";
-
-    private static final long serialVersionUID = 1L;
-    private final Map<String, String> jobConfMap;
-    private IHadoopClassFactory hadoopClassFactory;
-
-    public AbstractHadoopOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity,
-            RecordDescriptor recordDescriptor, JobConf jobConf, IHadoopClassFactory hadoopOperatorFactory) {
-        super(spec, inputArity, 1);
-        jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
-        this.hadoopClassFactory = hadoopOperatorFactory;
-        recordDescriptors[0] = recordDescriptor;
-    }
-
-    public Map<String, String> getJobConfMap() {
-        return jobConfMap;
-    }
-
-    public IHadoopClassFactory getHadoopClassFactory() {
-        return hadoopClassFactory;
-    }
-
-    public void setHadoopClassFactory(IHadoopClassFactory hadoopClassFactory) {
-        this.hadoopClassFactory = hadoopClassFactory;
-    }
-
-    protected Reporter createReporter() {
-        return new Reporter() {
-            @Override
-            public Counter getCounter(Enum<?> name) {
-                return null;
-            }
-
-            @Override
-            public Counter getCounter(String group, String name) {
-                return null;
-            }
-
-            @Override
-            public InputSplit getInputSplit() throws UnsupportedOperationException {
-                return null;
-            }
-
-            @Override
-            public void incrCounter(Enum<?> key, long amount) {
-
-            }
-
-            @Override
-            public void incrCounter(String group, String counter, long amount) {
-
-            }
-
-            @Override
-            public void progress() {
-
-            }
-
-            @Override
-            public void setStatus(String status) {
-
-            }
-
-            @Override
-            public float getProgress() {
-                return 0.0f;
-            }
-        };
-    }
-
-    public JobConf getJobConf() {
-        if (jobConf == null) {
-            jobConf = DatatypeHelper.map2JobConf(jobConfMap);
-            jobConf.setClassLoader(this.getClass().getClassLoader());
-        }
-        return jobConf;
-    }
-
-    public void populateCache(JobConf jobConf) {
-        try {
-            String cache = jobConf.get(MAPRED_CACHE_FILES);
-            System.out.println("cache:" + cache);
-            if (cache == null) {
-                return;
-            }
-            String localCache = jobConf.get(MAPRED_CACHE_LOCALFILES);
-            System.out.println("localCache:" + localCache);
-            if (localCache != null) {
-                return;
-            }
-            localCache = "";
-            StringTokenizer cacheTokenizer = new StringTokenizer(cache, ",");
-            while (cacheTokenizer.hasMoreTokens()) {
-                if (!"".equals(localCache)) {
-                    localCache += ",";
-                }
-                try {
-                    localCache += DCacheClient.get().get(cacheTokenizer.nextToken());
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-            jobConf.set(MAPRED_CACHE_LOCALFILES, localCache);
-            System.out.println("localCache:" + localCache);
-        } catch (Exception e) {
-
-        }
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
deleted file mode 100644
index b328f29..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ /dev/null
@@ -1,454 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileRecordReader;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.StatusReporter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOpenableDataWriter;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.comm.io.SerializingDataWriter;
-import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper;
-import org.apache.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
-import org.apache.hyracks.dataflow.hadoop.util.InputSplitsProxy;
-import org.apache.hyracks.dataflow.hadoop.util.MRContextUtil;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
-import org.apache.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
-import org.apache.hyracks.hdfs.ContextFactory;
-
-public class HadoopMapperOperatorDescriptor<K1, V1, K2, V2> extends AbstractHadoopOperatorDescriptor {
-
-    private class MapperBaseOperator {
-        protected OutputCollector<K2, V2> output;
-        protected Reporter reporter;
-        protected Object mapper;
-        // protected Mapper<K1, V1, K2, V2> mapper;
-        protected int partition;
-        protected JobConf conf;
-        protected IOpenableDataWriter<Object[]> writer;
-        protected boolean newMapreduceLib = false;
-        org.apache.hadoop.mapreduce.Mapper.Context context;
-
-        public MapperBaseOperator(int partition) {
-            this.partition = partition;
-        }
-
-        protected void initializeMapper() throws HyracksDataException {
-            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-            jobConf = getJobConf();
-            populateCache(jobConf);
-            conf = new JobConf(jobConf);
-            conf.setClassLoader(jobConf.getClassLoader());
-            reporter = createReporter();
-        }
-
-        protected void map(Object[] data) throws HyracksDataException {
-            try {
-                if (!conf.getUseNewMapper()) {
-                    ((org.apache.hadoop.mapred.Mapper) mapper).map((K1) data[0], (V1) data[1], output, reporter);
-                } else
-                    throw new IllegalStateException(
-                            " Incorrect map method called for MapReduce code written using mapreduce package");
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            } catch (RuntimeException re) {
-                System.out.println(" Runtime exceptione encoutered for row :" + data[0] + ": " + data[1]);
-                re.printStackTrace();
-            }
-        }
-
-        protected void closeMapper() throws HyracksDataException {
-            try {
-                if (!conf.getUseNewMapper()) {
-                    ((org.apache.hadoop.mapred.Mapper) mapper).close();
-                } else {
-                    // do nothing. closing the mapper is handled internally by
-                    // run method on context.
-                }
-            } catch (IOException ioe) {
-                throw new HyracksDataException(ioe);
-            }
-        }
-
-    }
-
-    private class MapperOperator extends MapperBaseOperator implements IOpenableDataWriterOperator {
-
-        public MapperOperator(int partition) {
-            super(partition);
-        };
-
-        @Override
-        public void close() throws HyracksDataException {
-            super.closeMapper();
-            writer.close();
-        }
-
-        @Override
-        public void fail() throws HyracksDataException {
-            writer.fail();
-        }
-
-        @Override
-        public void open() throws HyracksDataException {
-            initializeMapper();
-            writer.open();
-            output = new DataWritingOutputCollector<K2, V2>(writer);
-        }
-
-        @Override
-        public void writeData(Object[] data) throws HyracksDataException {
-            super.map(data);
-        }
-
-        public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
-            if (index != 0) {
-                throw new IllegalArgumentException();
-            }
-            this.writer = writer;
-        }
-
-        protected void initializeMapper() throws HyracksDataException {
-            super.initializeMapper();
-            try {
-                mapper = createMapper(conf);
-            } catch (Exception e) {
-                throw new HyracksDataException(e);
-            }
-            if (!conf.getUseNewMapper()) {
-                ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
-            }
-        }
-
-        @Override
-        public void flush() throws HyracksDataException {
-        }
-
-    }
-
-    private class ReaderMapperOperator extends MapperBaseOperator {
-
-        public ReaderMapperOperator(int partition, IOpenableDataWriter writer) throws HyracksDataException {
-            super(partition);
-            output = new DataWritingOutputCollector<K2, V2>(writer);
-            this.writer = writer;
-            this.writer.open();
-        }
-
-        protected void updateConfWithSplit(JobConf conf) {
-            try {
-                if (inputSplits == null) {
-                    inputSplits = inputSplitsProxy.toInputSplits(conf);
-                }
-                Object splitRead = inputSplits[partition];
-                if (splitRead instanceof FileSplit) {
-                    conf.set("map.input.file", ((FileSplit) splitRead).getPath().toString());
-                    conf.setLong("map.input.start", ((FileSplit) splitRead).getStart());
-                    conf.setLong("map.input.length", ((FileSplit) splitRead).getLength());
-                } else if (splitRead instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
-                    conf.set("map.input.file", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getPath()
-                            .toString());
-                    conf.setLong("map.input.start",
-                            ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getStart());
-                    conf.setLong("map.input.length",
-                            ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getLength());
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-                // we do not throw the exception here as we are setting
-                // additional parameters that may not be
-                // required by the mapper. If they are indeed required, the
-                // configure method invoked on the mapper
-                // shall report an exception because of the missing parameters.
-            }
-        }
-
-        protected void initializeMapper() throws HyracksDataException {
-            super.initializeMapper();
-            updateConfWithSplit(conf);
-            try {
-                mapper = createMapper(conf);
-            } catch (Exception e) {
-                throw new HyracksDataException(e);
-            }
-            if (!conf.getUseNewMapper()) {
-                ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
-            }
-        }
-
-        public void mapInput() throws HyracksDataException, InterruptedException, ClassNotFoundException {
-            try {
-                initializeMapper();
-                conf.setClassLoader(this.getClass().getClassLoader());
-                Object reader;
-                Object key = null;
-                Object value = null;
-                Object inputSplit = inputSplits[partition];
-                reader = getRecordReader(conf, inputSplit);
-                final Object[] data = new Object[2];
-                if (conf.getUseNewMapper()) {
-                    org.apache.hadoop.mapreduce.RecordReader newReader = (org.apache.hadoop.mapreduce.RecordReader) reader;
-                    org.apache.hadoop.mapreduce.RecordWriter recordWriter = new RecordWriter() {
-
-                        @Override
-                        public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
-                            // TODO Auto-generated method stub
-                        }
-
-                        @Override
-                        public void write(Object key, Object value) throws IOException, InterruptedException {
-                            data[0] = key;
-                            data[1] = value;
-                            writer.writeData(data);
-                        }
-                    };;;
-
-                    OutputCommitter outputCommitter = new org.apache.hadoop.mapreduce.lib.output.NullOutputFormat()
-                            .getOutputCommitter(new ContextFactory().createContext(conf, new TaskAttemptID()));
-                    StatusReporter statusReporter = new StatusReporter() {
-                        @Override
-                        public void setStatus(String arg0) {
-                        }
-
-                        @Override
-                        public void progress() {
-                        }
-
-                        @Override
-                        public Counter getCounter(String arg0, String arg1) {
-                            return null;
-                        }
-
-                        @Override
-                        public Counter getCounter(Enum<?> arg0) {
-                            return null;
-                        }
-
-                        @Override
-                        public float getProgress() {
-                            // TODO Auto-generated method stub
-                            return 0;
-                        }
-                    };;;
-                    context = new MRContextUtil().createMapContext(conf, new TaskAttemptID(), newReader, recordWriter,
-                            outputCommitter, statusReporter, (org.apache.hadoop.mapreduce.InputSplit) inputSplit);
-                    newReader.initialize((org.apache.hadoop.mapreduce.InputSplit) inputSplit, context);
-                    ((org.apache.hadoop.mapreduce.Mapper) mapper).run(context);
-                } else {
-                    Class inputKeyClass = null;
-                    Class inputValueClass = null;
-
-                    RecordReader oldReader = (RecordReader) reader;
-                    if (reader instanceof SequenceFileRecordReader) {
-                        inputKeyClass = ((SequenceFileRecordReader) oldReader).getKeyClass();
-                        inputValueClass = ((SequenceFileRecordReader) oldReader).getValueClass();
-                    } else {
-                        inputKeyClass = oldReader.createKey().getClass();
-                        inputValueClass = oldReader.createValue().getClass();
-                    }
-                    key = oldReader.createKey();
-                    value = oldReader.createValue();
-                    while (oldReader.next(key, value)) {
-                        data[0] = key;
-                        data[1] = value;
-                        super.map(data);
-                    }
-                    oldReader.close();
-                }
-
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
-
-        }
-
-        public void close() throws HyracksDataException {
-            super.closeMapper();
-            writer.close();
-        }
-    }
-
-    private static final long serialVersionUID = 1L;
-    private Class mapperClass;
-    private InputSplitsProxy inputSplitsProxy;
-    private transient Object[] inputSplits;
-    private boolean selfRead = false;
-
-    private void initializeSplitInfo(Object[] splits) throws IOException {
-        jobConf = super.getJobConf();
-        InputFormat inputFormat = jobConf.getInputFormat();
-        inputSplitsProxy = new InputSplitsProxy(jobConf, splits);
-    }
-
-    public HadoopMapperOperatorDescriptor(IOperatorDescriptorRegistry spec, JobConf jobConf,
-            IHadoopClassFactory hadoopClassFactory) throws IOException {
-        super(spec, 1, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory);
-    }
-
-    public HadoopMapperOperatorDescriptor(IOperatorDescriptorRegistry spec, JobConf jobConf, Object[] splits,
-            IHadoopClassFactory hadoopClassFactory) throws IOException {
-        super(spec, 0, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory);
-        initializeSplitInfo(splits);
-        this.selfRead = true;
-    }
-
-    public static RecordDescriptor getRecordDescriptor(JobConf conf, IHadoopClassFactory hadoopClassFactory) {
-        RecordDescriptor recordDescriptor = null;
-        String mapOutputKeyClassName = conf.getMapOutputKeyClass().getName();
-        String mapOutputValueClassName = conf.getMapOutputValueClass().getName();
-        try {
-            if (hadoopClassFactory == null) {
-                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                        (Class<? extends Writable>) Class.forName(mapOutputKeyClassName),
-                        (Class<? extends Writable>) Class.forName(mapOutputValueClassName));
-            } else {
-                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                        (Class<? extends Writable>) hadoopClassFactory.loadClass(mapOutputKeyClassName),
-                        (Class<? extends Writable>) hadoopClassFactory.loadClass(mapOutputValueClassName));
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        return recordDescriptor;
-    }
-
-    private Object createMapper(JobConf conf) throws Exception {
-        Object mapper;
-        if (mapperClass != null) {
-            return ReflectionUtils.newInstance(mapperClass, conf);
-        } else {
-            String mapperClassName = null;
-            if (jobConf.getUseNewMapper()) {
-                JobContext jobContext = new ContextFactory().createJobContext(conf);
-                mapperClass = jobContext.getMapperClass();
-                mapperClassName = mapperClass.getName();
-            } else {
-                mapperClass = conf.getMapperClass();
-                mapperClassName = mapperClass.getName();
-            }
-            mapper = getHadoopClassFactory().createMapper(mapperClassName, conf);
-        }
-        return mapper;
-    }
-
-    private Object getRecordReader(JobConf conf, Object inputSplit) throws ClassNotFoundException, IOException,
-            InterruptedException {
-        if (conf.getUseNewMapper()) {
-            JobContext context = new ContextFactory().createJobContext(conf);
-            org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils
-                    .newInstance(context.getInputFormatClass(), conf);
-            TaskAttemptContext taskAttemptContext = new ContextFactory().createContext(conf, new TaskAttemptID());
-            return inputFormat.createRecordReader((org.apache.hadoop.mapreduce.InputSplit) inputSplit,
-                    taskAttemptContext);
-        } else {
-            Class inputFormatClass = conf.getInputFormat().getClass();
-            InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
-            return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf,
-                    super.createReporter());
-        }
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-
-        JobConf conf = getJobConf();
-        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-        try {
-            if (selfRead) {
-                RecordDescriptor recordDescriptor = null;
-                if (inputSplits == null) {
-                    inputSplits = inputSplitsProxy.toInputSplits(conf);
-                }
-                Object reader = getRecordReader(conf, inputSplits[partition]);
-                if (conf.getUseNewMapper()) {
-                    org.apache.hadoop.mapreduce.RecordReader newReader = (org.apache.hadoop.mapreduce.RecordReader) reader;
-                    newReader.initialize((org.apache.hadoop.mapreduce.InputSplit) inputSplits[partition],
-                            new ContextFactory().createContext(conf, new TaskAttemptID()));
-                    newReader.nextKeyValue();
-                    Object key = newReader.getCurrentKey();
-                    Class keyClass = null;
-                    if (key == null) {
-                        keyClass = Class.forName("org.apache.hadoop.io.NullWritable");
-                    }
-                    recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                            (Class<? extends Writable>) keyClass, (Class<? extends Writable>) newReader
-                                    .getCurrentValue().getClass());
-                } else {
-                    RecordReader oldReader = (RecordReader) reader;
-                    recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                            (Class<? extends Writable>) oldReader.createKey().getClass(),
-                            (Class<? extends Writable>) oldReader.createValue().getClass());
-                }
-                return createSelfReadingMapper(ctx, recordDescriptor, partition);
-            } else {
-                return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition),
-                        recordDescProvider.getInputRecordDescriptor(this.activityNodeId, 0));
-            }
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    private IOperatorNodePushable createSelfReadingMapper(final IHyracksTaskContext ctx,
-            final RecordDescriptor recordDescriptor, final int partition) {
-        return new AbstractUnaryOutputSourceOperatorNodePushable() {
-            @Override
-            public void initialize() throws HyracksDataException {
-                SerializingDataWriter writer = new SerializingDataWriter(ctx, recordDescriptor, this.writer);
-                ReaderMapperOperator readMapOp = new ReaderMapperOperator(partition, writer);
-                try {
-                    readMapOp.mapInput();
-                } catch (Exception e) {
-                    writer.fail();
-                    throw new HyracksDataException(e);
-                } finally {
-                    readMapOp.close();
-                }
-            }
-        };
-    }
-
-    public Class<? extends Mapper> getMapperClass() {
-        return mapperClass;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
deleted file mode 100644
index 7764265..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileRecordReader;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper;
-import org.apache.hyracks.dataflow.hadoop.util.InputSplitsProxy;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import org.apache.hyracks.hdfs.ContextFactory;
-
-public class HadoopReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-    private static final long serialVersionUID = 1L;
-
-    private String inputFormatClassName;
-    private Map<String, String> jobConfMap;
-    private InputSplitsProxy inputSplitsProxy;
-    private transient JobConf jobConf;
-
-    public JobConf getJobConf() {
-        if (jobConf == null) {
-            jobConf = DatatypeHelper.map2JobConf(jobConfMap);
-        }
-        return jobConf;
-    }
-
-    public HadoopReadOperatorDescriptor(JobConf jobConf, JobSpecification spec, Object[] splits) throws IOException {
-        super(spec, 0, 1);
-        this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
-        InputFormat inputFormat = jobConf.getInputFormat();
-        RecordReader recordReader;
-        try {
-            recordReader = getRecordReader(DatatypeHelper.map2JobConf(jobConfMap), splits[0]);
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-        recordDescriptors[0] = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) recordReader
-                .createKey().getClass(), (Class<? extends Writable>) recordReader.createValue().getClass());
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, this, splits.length);
-        inputSplitsProxy = new InputSplitsProxy(jobConf, splits);
-        this.inputFormatClassName = inputFormat.getClass().getName();
-    }
-
-    private RecordReader getRecordReader(JobConf conf, Object inputSplit) throws ClassNotFoundException, IOException,
-            InterruptedException {
-        RecordReader hadoopRecordReader = null;
-        if (conf.getUseNewMapper()) {
-            JobContext context = new ContextFactory().createJobContext(conf);
-            org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils
-                    .newInstance(context.getInputFormatClass(), conf);
-            TaskAttemptContext taskAttemptContext = new ContextFactory().createContext(jobConf, null);
-            hadoopRecordReader = (RecordReader) inputFormat.createRecordReader(
-                    (org.apache.hadoop.mapreduce.InputSplit) inputSplit, taskAttemptContext);
-        } else {
-            Class inputFormatClass = conf.getInputFormat().getClass();
-            InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
-            hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(
-                    (org.apache.hadoop.mapred.InputSplit) inputSplit, conf, createReporter());
-        }
-        return hadoopRecordReader;
-    }
-
-    public Object[] getInputSplits() throws InstantiationException, IllegalAccessException, IOException {
-        return inputSplitsProxy.toInputSplits(getJobConf());
-    }
-
-    protected Reporter createReporter() {
-        return new Reporter() {
-            @Override
-            public Counter getCounter(Enum<?> name) {
-                return null;
-            }
-
-            @Override
-            public Counter getCounter(String group, String name) {
-                return null;
-            }
-
-            @Override
-            public InputSplit getInputSplit() throws UnsupportedOperationException {
-                return null;
-            }
-
-            @Override
-            public void incrCounter(Enum<?> key, long amount) {
-
-            }
-
-            @Override
-            public void incrCounter(String group, String counter, long amount) {
-
-            }
-
-            @Override
-            public void progress() {
-
-            }
-
-            @Override
-            public void setStatus(String status) {
-
-            }
-
-            @Override
-            public float getProgress() {
-                // TODO Auto-generated method stub
-                return 0;
-            }
-        };
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-            final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
-            throws HyracksDataException {
-        return new AbstractUnaryOutputSourceOperatorNodePushable() {
-            @Override
-            public void initialize() throws HyracksDataException {
-                try {
-                    JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
-                    Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-                    conf.setClassLoader(this.getClass().getClassLoader());
-                    RecordReader hadoopRecordReader;
-                    Object key;
-                    Object value;
-                    Object[] splits = inputSplitsProxy.toInputSplits(conf);
-                    Object inputSplit = splits[partition];
-
-                    if (conf.getUseNewMapper()) {
-                        JobContext context = new ContextFactory().createJobContext(conf);
-                        org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils
-                                .newInstance(context.getInputFormatClass(), conf);
-                        TaskAttemptContext taskAttemptContext = new ContextFactory().createContext(jobConf, null);
-                        hadoopRecordReader = (RecordReader) inputFormat.createRecordReader(
-                                (org.apache.hadoop.mapreduce.InputSplit) inputSplit, taskAttemptContext);
-                    } else {
-                        Class inputFormatClass = conf.getInputFormat().getClass();
-                        InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
-                        hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(
-                                (org.apache.hadoop.mapred.InputSplit) inputSplit, conf, createReporter());
-                    }
-
-                    Class inputKeyClass;
-                    Class inputValueClass;
-                    if (hadoopRecordReader instanceof SequenceFileRecordReader) {
-                        inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader).getKeyClass();
-                        inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader).getValueClass();
-                    } else {
-                        inputKeyClass = hadoopRecordReader.createKey().getClass();
-                        inputValueClass = hadoopRecordReader.createValue().getClass();
-                    }
-
-                    key = hadoopRecordReader.createKey();
-                    value = hadoopRecordReader.createValue();
-                    FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
-                    RecordDescriptor outputRecordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                            (Class<? extends Writable>) hadoopRecordReader.createKey().getClass(),
-                            (Class<? extends Writable>) hadoopRecordReader.createValue().getClass());
-                    int nFields = outputRecordDescriptor.getFieldCount();
-                    ArrayTupleBuilder tb = new ArrayTupleBuilder(nFields);
-                    writer.open();
-                    try {
-                        while (hadoopRecordReader.next(key, value)) {
-                            tb.reset();
-                            switch (nFields) {
-                                case 2:
-                                    tb.addField(outputRecordDescriptor.getFields()[0], key);
-                                case 1:
-                                    tb.addField(outputRecordDescriptor.getFields()[1], value);
-                            }
-                            FrameUtils
-                                    .appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(),
-                                            0, tb.getSize());
-                        }
-                        appender.write(writer, true);
-                    } catch (Exception e) {
-                        writer.fail();
-                        throw new HyracksDataException(e);
-                    } finally {
-                        writer.close();
-                    }
-                    hadoopRecordReader.close();
-                } catch (InstantiationException e) {
-                    throw new HyracksDataException(e);
-                } catch (IllegalAccessException e) {
-                    throw new HyracksDataException(e);
-                } catch (ClassNotFoundException e) {
-                    throw new HyracksDataException(e);
-                } catch (InterruptedException e) {
-                    throw new HyracksDataException(e);
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-        };
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
deleted file mode 100644
index 6913876..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ /dev/null
@@ -1,422 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RawKeyValueIterator;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IDataReader;
-import org.apache.hyracks.api.dataflow.IDataWriter;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IComparator;
-import org.apache.hyracks.api.dataflow.value.IComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.hadoop.data.KeyComparatorFactory;
-import org.apache.hyracks.dataflow.hadoop.data.RawComparingComparatorFactory;
-import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper;
-import org.apache.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
-import org.apache.hyracks.dataflow.hadoop.util.MRContextUtil;
-import org.apache.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
-import org.apache.hyracks.dataflow.std.group.DeserializedPreclusteredGroupOperator;
-import org.apache.hyracks.dataflow.std.group.IGroupAggregator;
-import org.apache.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
-import org.apache.hyracks.hdfs.ContextFactory;
-
-public class HadoopReducerOperatorDescriptor<K2, V2, K3, V3> extends AbstractHadoopOperatorDescriptor {
-    private class ReducerAggregator implements IGroupAggregator {
-        private Object reducer;
-        private DataWritingOutputCollector<K3, V3> output;
-        private Reporter reporter;
-        private ReducerContext reducerContext;
-        RawKeyValueIterator rawKeyValueIterator = new RawKeyValueIterator() {
-
-            @Override
-            public boolean next() throws IOException {
-                return false;
-            }
-
-            @Override
-            public DataInputBuffer getValue() throws IOException {
-                return null;
-            }
-
-            @Override
-            public Progress getProgress() {
-                return null;
-            }
-
-            @Override
-            public DataInputBuffer getKey() throws IOException {
-                return null;
-            }
-
-            @Override
-            public void close() throws IOException {
-
-            }
-        };
-
-        class ReducerContext extends org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer.Context {
-            private HadoopReducerOperatorDescriptor.ValueIterator iterator;
-
-            @SuppressWarnings("unchecked")
-            ReducerContext(org.apache.hadoop.mapreduce.Reducer reducer, JobConf conf) throws IOException,
-                    InterruptedException, ClassNotFoundException {
-                ((org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer) reducer).super(new MRContextUtil()
-                        .createReduceContext(conf, new TaskAttemptID(), rawKeyValueIterator, null, null, null, null,
-                                null, null, Class.forName("org.apache.hadoop.io.NullWritable"),
-                                Class.forName("org.apache.hadoop.io.NullWritable")));
-            }
-
-            public void setIterator(HadoopReducerOperatorDescriptor.ValueIterator iter) {
-                iterator = iter;
-            }
-
-            @Override
-            public Iterable<V2> getValues() throws IOException, InterruptedException {
-                return new Iterable<V2>() {
-                    @Override
-                    public Iterator<V2> iterator() {
-                        return iterator;
-                    }
-                };
-            }
-
-            /** Start processing next unique key. */
-            @Override
-            public boolean nextKey() throws IOException, InterruptedException {
-                boolean hasMore = iterator.hasNext();
-                if (hasMore) {
-                    nextKeyValue();
-                }
-                return hasMore;
-            }
-
-            /**
-             * Advance to the next key/value pair.
-             */
-            @Override
-            public boolean nextKeyValue() throws IOException, InterruptedException {
-                iterator.next();
-                return true;
-            }
-
-            public Object getCurrentKey() {
-                return iterator.getKey();
-            }
-
-            @Override
-            public Object getCurrentValue() {
-                return iterator.getValue();
-            }
-
-            /**
-             * Generate an output key/value pair.
-             */
-            @Override
-            public void write(Object key, Object value) throws IOException, InterruptedException {
-                output.collect(key, value);
-            }
-
-        }
-
-        public ReducerAggregator(Object reducer) throws HyracksDataException {
-            this.reducer = reducer;
-            initializeReducer();
-            output = new DataWritingOutputCollector<K3, V3>();
-            reporter = new Reporter() {
-                @Override
-                public void progress() {
-
-                }
-
-                @Override
-                public void setStatus(String arg0) {
-
-                }
-
-                @Override
-                public void incrCounter(String arg0, String arg1, long arg2) {
-
-                }
-
-                @Override
-                public void incrCounter(Enum<?> arg0, long arg1) {
-
-                }
-
-                @Override
-                public InputSplit getInputSplit() throws UnsupportedOperationException {
-                    return null;
-                }
-
-                @Override
-                public Counter getCounter(String arg0, String arg1) {
-                    return null;
-                }
-
-                @Override
-                public Counter getCounter(Enum<?> arg0) {
-                    return null;
-                }
-
-                @Override
-                public float getProgress() {
-                    // TODO Auto-generated method stub
-                    return 0;
-                }
-            };
-        }
-
-        @Override
-        public void aggregate(IDataReader<Object[]> reader, IDataWriter<Object[]> writer) throws HyracksDataException {
-            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-            ValueIterator i = new ValueIterator();
-            i.reset(reader);
-            output.setWriter(writer);
-            try {
-                if (jobConf.getUseNewReducer()) {
-                    try {
-                        reducerContext.setIterator(i);
-                        ((org.apache.hadoop.mapreduce.Reducer) reducer).run(reducerContext);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                        throw new HyracksDataException(e);
-                    }
-                } else {
-                    ((org.apache.hadoop.mapred.Reducer) reducer).reduce(i.getKey(), i, output, reporter);
-                }
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-
-        @Override
-        public void close() throws HyracksDataException {
-            // -- - close - --
-            try {
-                if (!jobConf.getUseNewMapper()) {
-                    ((org.apache.hadoop.mapred.Reducer) reducer).close();
-                }
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-
-        private void initializeReducer() throws HyracksDataException {
-            jobConf.setClassLoader(this.getClass().getClassLoader());
-            if (!jobConf.getUseNewReducer()) {
-                ((org.apache.hadoop.mapred.Reducer) reducer).configure(getJobConf());
-            } else {
-                try {
-                    reducerContext = new ReducerContext((org.apache.hadoop.mapreduce.Reducer) reducer, jobConf);
-                } catch (IOException e) {
-                    e.printStackTrace();
-                    throw new HyracksDataException(e);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                    throw new HyracksDataException(e);
-                } catch (RuntimeException e) {
-                    e.printStackTrace();
-                } catch (ClassNotFoundException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    private class ValueIterator implements Iterator<V2> {
-        private IDataReader<Object[]> reader;
-        private K2 key;
-        private V2 value;
-
-        public K2 getKey() {
-            return key;
-        }
-
-        public V2 getValue() {
-            return value;
-        }
-
-        @Override
-        public boolean hasNext() {
-            if (value == null) {
-                Object[] tuple;
-                try {
-                    tuple = reader.readData();
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-                if (tuple != null) {
-                    value = (V2) tuple[1];
-                }
-            }
-            return value != null;
-        }
-
-        @Override
-        public V2 next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            V2 v = value;
-            value = null;
-            return v;
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-        void reset(IDataReader<Object[]> reader) {
-            this.reader = reader;
-            try {
-                Object[] tuple = reader.readData();
-                key = (K2) tuple[0];
-                value = (V2) tuple[1];
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    private static final long serialVersionUID = 1L;
-    private Class reducerClass;
-    private IComparatorFactory comparatorFactory;
-    private boolean useAsCombiner = false;
-
-    public HadoopReducerOperatorDescriptor(IOperatorDescriptorRegistry spec, JobConf conf,
-            IComparatorFactory comparatorFactory, IHadoopClassFactory classFactory, boolean useAsCombiner) {
-        super(spec, 1, getRecordDescriptor(conf, classFactory), conf, classFactory);
-        this.comparatorFactory = comparatorFactory;
-        this.useAsCombiner = useAsCombiner;
-    }
-
-    private Object createReducer() throws Exception {
-        if (reducerClass != null) {
-            return ReflectionUtils.newInstance(reducerClass, getJobConf());
-        } else {
-            Object reducer;
-            if (!useAsCombiner) {
-                if (getJobConf().getUseNewReducer()) {
-                    JobContext jobContext = new ContextFactory().createJobContext(getJobConf());
-                    reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?>>) jobContext
-                            .getReducerClass();
-                } else {
-                    reducerClass = (Class<? extends Reducer>) getJobConf().getReducerClass();
-                }
-            } else {
-                if (getJobConf().getUseNewReducer()) {
-                    JobContext jobContext = new ContextFactory().createJobContext(getJobConf());
-                    reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?>>) jobContext
-                            .getCombinerClass();
-                } else {
-                    reducerClass = (Class<? extends Reducer>) getJobConf().getCombinerClass();
-                }
-            }
-            reducer = getHadoopClassFactory().createReducer(reducerClass.getName(), getJobConf());
-            return reducer;
-        }
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        try {
-            if (this.comparatorFactory == null) {
-                String comparatorClassName = getJobConf().getOutputValueGroupingComparator().getClass().getName();
-                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-                RawComparator rawComparator = null;
-                if (comparatorClassName != null) {
-                    Class comparatorClazz = getHadoopClassFactory().loadClass(comparatorClassName);
-                    this.comparatorFactory = new KeyComparatorFactory(comparatorClazz);
-
-                } else {
-                    String mapOutputKeyClass = getJobConf().getMapOutputKeyClass().getName();
-                    if (getHadoopClassFactory() != null) {
-                        rawComparator = WritableComparator.get(getHadoopClassFactory().loadClass(mapOutputKeyClass));
-                    } else {
-                        rawComparator = WritableComparator.get((Class<? extends WritableComparable>) Class
-                                .forName(mapOutputKeyClass));
-                    }
-                    this.comparatorFactory = new RawComparingComparatorFactory(rawComparator.getClass());
-                }
-            }
-            IOpenableDataWriterOperator op = new DeserializedPreclusteredGroupOperator(new int[] { 0 },
-                    new IComparator[] { comparatorFactory.createComparator() }, new ReducerAggregator(createReducer()));
-            return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
-                    getActivityId(), 0));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public static RecordDescriptor getRecordDescriptor(JobConf conf, IHadoopClassFactory classFactory) {
-        String outputKeyClassName = null;
-        String outputValueClassName = null;
-
-        if (conf.getUseNewMapper()) {
-            JobContext context = new ContextFactory().createJobContext(conf);
-            outputKeyClassName = context.getOutputKeyClass().getName();
-            outputValueClassName = context.getOutputValueClass().getName();
-        } else {
-            outputKeyClassName = conf.getOutputKeyClass().getName();
-            outputValueClassName = conf.getOutputValueClass().getName();
-        }
-
-        RecordDescriptor recordDescriptor = null;
-        try {
-            if (classFactory == null) {
-                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                        (Class<? extends Writable>) Class.forName(outputKeyClassName),
-                        (Class<? extends Writable>) Class.forName(outputValueClassName));
-            } else {
-                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                        (Class<? extends Writable>) classFactory.loadClass(outputKeyClassName),
-                        (Class<? extends Writable>) classFactory.loadClass(outputValueClassName));
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            return null;
-        }
-        return recordDescriptor;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
deleted file mode 100644
index 5709082..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.FileOutputCommitter;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper;
-import org.apache.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.IRecordWriter;
-import org.apache.hyracks.hdfs.ContextFactory;
-
-public class HadoopWriteOperatorDescriptor extends AbstractFileWriteOperatorDescriptor {
-
-    private class HadoopFileWriter implements IRecordWriter {
-
-        Object recordWriter;
-        JobConf conf;
-        Path finalOutputFile;
-        Path tempOutputFile;
-        Path tempDir;
-
-        HadoopFileWriter(Object recordWriter, int index, JobConf conf) throws Exception {
-            this.recordWriter = recordWriter;
-            this.conf = conf;
-            initialize(index, conf);
-        }
-
-        private void initialize(int index, JobConf conf) throws Exception {
-            if (!(conf.getOutputFormat() instanceof NullOutputFormat)) {
-                boolean isMap = conf.getNumReduceTasks() == 0;
-                TaskAttemptID taskAttempId = new TaskAttemptID("0", index, isMap, index, index);
-                conf.set("mapred.task.id", taskAttempId.toString());
-                String suffix = new String("part-00000");
-                suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
-                suffix = suffix + index;
-                outputPath = new Path(conf.get("mapred.output.dir"));
-                tempDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
-                FileSystem fileSys = tempDir.getFileSystem(conf);
-                if (!fileSys.mkdirs(tempDir)) {
-                    throw new IOException("Mkdirs failed to create " + tempDir.toString());
-                }
-                tempOutputFile = new Path(tempDir, new Path("_" + taskAttempId.toString()));
-                tempOutputFile = new Path(tempOutputFile, suffix);
-                finalOutputFile = new Path(outputPath, suffix);
-                if (conf.getUseNewMapper()) {
-                    org.apache.hadoop.mapreduce.JobContext jobContext = new ContextFactory().createJobContext(conf);
-                    org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat) ReflectionUtils
-                            .newInstance(jobContext.getOutputFormatClass(), conf);
-                    recordWriter = newOutputFormat.getRecordWriter(new ContextFactory().createContext(conf,
-                            taskAttempId));
-                } else {
-                    recordWriter = conf.getOutputFormat().getRecordWriter(FileSystem.get(conf), conf, suffix,
-                            new Progressable() {
-                                @Override
-                                public void progress() {
-                                }
-                            });
-                }
-            }
-        }
-
-        @Override
-        public void write(Object[] record) throws Exception {
-            if (recordWriter != null) {
-                if (conf.getUseNewMapper()) {
-                    ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).write(record[0], record[1]);
-                } else {
-                    ((org.apache.hadoop.mapred.RecordWriter) recordWriter).write(record[0], record[1]);
-                }
-            }
-        }
-
-        @Override
-        public void close() {
-            try {
-                if (recordWriter != null) {
-                    if (conf.getUseNewMapper()) {
-                        ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).close(new ContextFactory()
-                                .createContext(conf, new TaskAttemptID()));
-                    } else {
-                        ((org.apache.hadoop.mapred.RecordWriter) recordWriter).close(null);
-                    }
-                    if (outputPath != null) {
-                        FileSystem fileSystem = FileSystem.get(conf);
-                        fileSystem.rename(tempOutputFile, finalOutputFile);
-                        fileSystem.delete(tempDir, true);
-                    }
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    private static final long serialVersionUID = 1L;
-    Map<String, String> jobConfMap;
-
-    @Override
-    protected IRecordWriter createRecordWriter(FileSplit fileSplit, int index) throws Exception {
-        JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
-        conf.setClassLoader(this.getClass().getClassLoader());
-        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-        FileSystem fileSystem = FileSystem.get(conf);
-        Object recordWriter = null;
-        return new HadoopFileWriter(recordWriter, index, conf);
-    }
-
-    Path outputPath;
-    Path outputTempPath;
-
-    protected Reporter createReporter() {
-        return new Reporter() {
-            @Override
-            public Counter getCounter(Enum<?> name) {
-                return null;
-            }
-
-            @Override
-            public Counter getCounter(String group, String name) {
-                return null;
-            }
-
-            @Override
-            public InputSplit getInputSplit() throws UnsupportedOperationException {
-                return null;
-            }
-
-            @Override
-            public void incrCounter(Enum<?> key, long amount) {
-
-            }
-
-            @Override
-            public void incrCounter(String group, String counter, long amount) {
-
-            }
-
-            @Override
-            public void progress() {
-
-            }
-
-            @Override
-            public void setStatus(String status) {
-
-            }
-
-            @Override
-            public float getProgress() {
-                // TODO Auto-generated method stub
-                return 0;
-            }
-        };
-    }
-
-    private boolean checkIfCanWriteToHDFS(FileSplit[] fileSplits) throws Exception {
-        JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
-        try {
-            FileSystem fileSystem = FileSystem.get(conf);
-            for (FileSplit fileSplit : fileSplits) {
-                Path path = new Path(fileSplit.getLocalFile().getFile().getPath());
-                if (fileSystem.exists(path)) {
-                    throw new Exception(" Output path :  already exists : " + path);
-                }
-            }
-        } catch (IOException ioe) {
-            ioe.printStackTrace();
-            throw ioe;
-        }
-        return true;
-    }
-
-    private static FileSplit[] getOutputSplits(JobConf conf, int noOfMappers) throws ClassNotFoundException {
-        int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
-        Object outputFormat = null;
-        if (conf.getUseNewMapper()) {
-            outputFormat = ReflectionUtils.newInstance(new ContextFactory().createJobContext(conf)
-                    .getOutputFormatClass(), conf);
-        } else {
-            outputFormat = conf.getOutputFormat();
-        }
-        if (outputFormat instanceof NullOutputFormat) {
-            FileSplit[] outputFileSplits = new FileSplit[numOutputters];
-            for (int i = 0; i < numOutputters; i++) {
-                String outputPath = "/tmp/" + System.currentTimeMillis() + i;
-                outputFileSplits[i] = new FileSplit("localhost", new FileReference(new File(outputPath)));
-            }
-            return outputFileSplits;
-        } else {
-
-            FileSplit[] outputFileSplits = new FileSplit[numOutputters];
-            String absolutePath = FileOutputFormat.getOutputPath(conf).toString();
-            for (int index = 0; index < numOutputters; index++) {
-                String suffix = new String("part-00000");
-                suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
-                suffix = suffix + index;
-                String outputPath = absolutePath + "/" + suffix;
-                outputFileSplits[index] = new FileSplit("localhost", outputPath);
-            }
-            return outputFileSplits;
-        }
-    }
-
-    public HadoopWriteOperatorDescriptor(IOperatorDescriptorRegistry jobSpec, JobConf jobConf, int numMapTasks)
-            throws Exception {
-        super(jobSpec, getOutputSplits(jobConf, numMapTasks));
-        this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
-        checkIfCanWriteToHDFS(super.splits);
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/AbstractClassBasedDelegate.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/AbstractClassBasedDelegate.java
deleted file mode 100644
index 62c37ed..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/AbstractClassBasedDelegate.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.data;
-
-import java.io.ObjectStreamException;
-import java.io.Serializable;
-
-public class AbstractClassBasedDelegate<T> implements Serializable {
-    private static final long serialVersionUID = 1L;
-    private Class<? extends T> klass;
-    protected transient T instance;
-
-    public AbstractClassBasedDelegate(Class<? extends T> klass) {
-        this.klass = klass;
-        init();
-    }
-
-    protected Object readResolve() throws ObjectStreamException {
-        init();
-        return this;
-    }
-
-    private void init() {
-        try {
-            instance = klass.newInstance();
-        } catch (InstantiationException e) {
-            throw new RuntimeException(e);
-        } catch (IllegalAccessException e) {
-            throw new RuntimeException(e);
-        }
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopHashTuplePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopHashTuplePartitionComputerFactory.java
deleted file mode 100644
index fe8a612..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopHashTuplePartitionComputerFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.data;
-
-import java.io.DataInputStream;
-
-import org.apache.hadoop.io.Writable;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-
-public class HadoopHashTuplePartitionComputerFactory<K extends Writable> implements ITuplePartitionComputerFactory {
-    private static final long serialVersionUID = 1L;
-    private final ISerializerDeserializer<K> keyIO;
-
-    public HadoopHashTuplePartitionComputerFactory(ISerializerDeserializer<K> keyIO) {
-        this.keyIO = keyIO;
-    }
-
-    @Override
-    public ITuplePartitionComputer createPartitioner() {
-        return new ITuplePartitionComputer() {
-            private final ByteBufferInputStream bbis = new ByteBufferInputStream();
-            private final DataInputStream dis = new DataInputStream(bbis);
-
-            @Override
-            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
-                int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
-                        + accessor.getFieldStartOffset(tIndex, 0);
-                bbis.setByteBuffer(accessor.getBuffer(), keyStart);
-                K key = keyIO.deserialize(dis);
-                int h = key.hashCode();
-                if (h < 0) {
-                    h = -h;
-                }
-                return h % nParts;
-            }
-        };
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopNewPartitionerTuplePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopNewPartitionerTuplePartitionComputerFactory.java
deleted file mode 100644
index b20c6e0..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopNewPartitionerTuplePartitionComputerFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.data;
-
-import java.io.DataInputStream;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Partitioner;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-
-public class HadoopNewPartitionerTuplePartitionComputerFactory<K extends Writable, V extends Writable> extends
-        AbstractClassBasedDelegate<Partitioner<K, V>> implements ITuplePartitionComputerFactory {
-    private static final long serialVersionUID = 1L;
-    private final ISerializerDeserializer<K> keyIO;
-    private final ISerializerDeserializer<V> valueIO;
-
-    public HadoopNewPartitionerTuplePartitionComputerFactory(Class<? extends Partitioner<K, V>> klass,
-            ISerializerDeserializer<K> keyIO, ISerializerDeserializer<V> valueIO) {
-        super(klass);
-        this.keyIO = keyIO;
-        this.valueIO = valueIO;
-    }
-
-    @Override
-    public ITuplePartitionComputer createPartitioner() {
-        return new ITuplePartitionComputer() {
-            private final ByteBufferInputStream bbis = new ByteBufferInputStream();
-            private final DataInputStream dis = new DataInputStream(bbis);
-
-            @Override
-            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
-                int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
-                        + accessor.getFieldStartOffset(tIndex, 0);
-                bbis.setByteBuffer(accessor.getBuffer(), keyStart);
-                K key = keyIO.deserialize(dis);
-                int valueStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
-                        + accessor.getFieldStartOffset(tIndex, 1);
-                bbis.setByteBuffer(accessor.getBuffer(), valueStart);
-                V value = valueIO.deserialize(dis);
-                return instance.getPartition(key, value, nParts);
-            }
-        };
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopPartitionerTuplePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopPartitionerTuplePartitionComputerFactory.java
deleted file mode 100644
index 127eed9..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopPartitionerTuplePartitionComputerFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.data;
-
-import java.io.DataInputStream;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.Partitioner;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-
-public class HadoopPartitionerTuplePartitionComputerFactory<K extends Writable, V extends Writable> extends
-        AbstractClassBasedDelegate<Partitioner<K, V>> implements ITuplePartitionComputerFactory {
-    private static final long serialVersionUID = 1L;
-    private final ISerializerDeserializer<K> keyIO;
-    private final ISerializerDeserializer<V> valueIO;
-
-    public HadoopPartitionerTuplePartitionComputerFactory(Class<? extends Partitioner<K, V>> klass,
-            ISerializerDeserializer<K> keyIO, ISerializerDeserializer<V> valueIO) {
-        super(klass);
-        this.keyIO = keyIO;
-        this.valueIO = valueIO;
-    }
-
-    @Override
-    public ITuplePartitionComputer createPartitioner() {
-        return new ITuplePartitionComputer() {
-            private final ByteBufferInputStream bbis = new ByteBufferInputStream();
-            private final DataInputStream dis = new DataInputStream(bbis);
-
-            @Override
-            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
-                int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
-                        + accessor.getFieldStartOffset(tIndex, 0);
-                bbis.setByteBuffer(accessor.getBuffer(), keyStart);
-                K key = keyIO.deserialize(dis);
-                int valueStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
-                        + accessor.getFieldStartOffset(tIndex, 1);
-                bbis.setByteBuffer(accessor.getBuffer(), valueStart);
-                V value = valueIO.deserialize(dis);
-                return instance.getPartition(key, value, nParts);
-            }
-        };
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyBinaryComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyBinaryComparatorFactory.java
deleted file mode 100644
index b74fdde..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyBinaryComparatorFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.data;
-
-import org.apache.hadoop.io.RawComparator;
-
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.dataflow.common.util.ReflectionUtils;
-
-public class KeyBinaryComparatorFactory<T> implements IBinaryComparatorFactory {
-    private static final long serialVersionUID = 1L;
-
-    private Class<? extends RawComparator<T>> cmpClass;
-
-    public KeyBinaryComparatorFactory(Class<? extends RawComparator<T>> cmpClass) {
-        this.cmpClass = cmpClass;
-    }
-
-    @Override
-    public IBinaryComparator createBinaryComparator() {
-        final RawComparator<T> instance = ReflectionUtils.createInstance(cmpClass);
-        return new IBinaryComparator() {
-            @Override
-            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-                return instance.compare(b1, s1, l1, b2, s2, l2);
-            }
-        };
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyComparatorFactory.java
deleted file mode 100644
index 01024bd..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyComparatorFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.data;
-
-import org.apache.hadoop.io.RawComparator;
-
-import org.apache.hyracks.api.dataflow.value.IComparator;
-import org.apache.hyracks.api.dataflow.value.IComparatorFactory;
-import org.apache.hyracks.dataflow.common.util.ReflectionUtils;
-
-public class KeyComparatorFactory<T> implements IComparatorFactory<T> {
-    private static final long serialVersionUID = 1L;
-    private Class<? extends RawComparator<T>> cmpClass;
-
-    public KeyComparatorFactory(Class<? extends RawComparator<T>> cmpClass) {
-        this.cmpClass = cmpClass;
-    }
-
-    @Override
-    public IComparator<T> createComparator() {
-        final RawComparator<T> instance = ReflectionUtils.createInstance(cmpClass);
-        return new IComparator<T>() {
-            @Override
-            public int compare(T o1, T o2) {
-                return instance.compare(o1, o2);
-            }
-        };
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/RawComparingComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/RawComparingComparatorFactory.java
deleted file mode 100644
index d2f0ead..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/RawComparingComparatorFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.data;
-
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparable;
-
-import org.apache.hyracks.api.dataflow.value.IComparator;
-import org.apache.hyracks.api.dataflow.value.IComparatorFactory;
-import org.apache.hyracks.dataflow.common.util.ReflectionUtils;
-
-public class RawComparingComparatorFactory<T> implements IComparatorFactory<WritableComparable<T>> {
-    private Class<? extends RawComparator> klass;
-
-    public RawComparingComparatorFactory(Class<? extends RawComparator> klass) {
-        this.klass = klass;
-    }
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public IComparator<WritableComparable<T>> createComparator() {
-        final RawComparator instance = ReflectionUtils.createInstance(klass);
-        return new IComparator<WritableComparable<T>>() {
-            @Override
-            public int compare(WritableComparable<T> o1, WritableComparable<T> o2) {
-                return instance.compare(o1, o2);
-            }
-        };
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java
deleted file mode 100644
index 01f8755..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.data;
-
-import org.apache.hadoop.io.RawComparator;
-
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.dataflow.common.util.ReflectionUtils;
-
-public class WritableComparingBinaryComparatorFactory<T> implements IBinaryComparatorFactory {
-    private static final long serialVersionUID = 1L;
-
-    private Class<? extends RawComparator<T>> cmpClass;
-
-    public WritableComparingBinaryComparatorFactory(Class<? extends RawComparator<T>> cmpClass) {
-        this.cmpClass = cmpClass;
-    }
-
-    @Override
-    public IBinaryComparator createBinaryComparator() {
-        final RawComparator<T> instance = ReflectionUtils.createInstance(cmpClass);
-        return new IBinaryComparator() {
-            @Override
-            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-                return instance.compare(b1, s1, l1, b2, s2, l2);
-            }
-        };
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java
deleted file mode 100644
index b248bff..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import org.apache.hyracks.dataflow.hadoop.data.HadoopNewPartitionerTuplePartitionComputerFactory;
-import org.apache.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
-import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper;
-import org.apache.hyracks.hdfs.ContextFactory;
-
-public class HadoopHelper {
-    public static final int KEY_FIELD_INDEX = 0;
-    public static final int VALUE_FIELD_INDEX = 1;
-    public static final int BLOCKID_FIELD_INDEX = 2;
-    private static final int[] KEY_SORT_FIELDS = new int[] { 0 };
-
-    private MarshalledWritable<Configuration> mConfig;
-    private Configuration config;
-    private Job job;
-
-    public HadoopHelper(MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
-        this.mConfig = mConfig;
-        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
-        try {
-            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
-            config = mConfig.get();
-            config.setClassLoader(getClass().getClassLoader());
-            job = new Job(config);
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        } finally {
-            Thread.currentThread().setContextClassLoader(ctxCL);
-        }
-    }
-
-    public RecordDescriptor getMapOutputRecordDescriptor() throws HyracksDataException {
-        try {
-            return new RecordDescriptor(
-                    new ISerializerDeserializer[] {
-                            DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
-                                    .getMapOutputKeyClass()),
-                            DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
-                                    .getMapOutputValueClass()), IntegerSerializerDeserializer.INSTANCE });
-
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    public RecordDescriptor getMapOutputRecordDescriptorWithoutExtraFields() throws HyracksDataException {
-        try {
-            return new RecordDescriptor(
-                    new ISerializerDeserializer[] {
-                            DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
-                                    .getMapOutputKeyClass()),
-                            DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
-                                    .getMapOutputValueClass()) });
-
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    public TaskAttemptContext createTaskAttemptContext(TaskAttemptID taId) throws HyracksDataException {
-        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
-        try {
-            Thread.currentThread().setContextClassLoader(config.getClassLoader());
-            return new ContextFactory().createContext(config, taId);
-        } catch (HyracksDataException e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
-        } finally {
-            Thread.currentThread().setContextClassLoader(ctxCL);
-        }
-    }
-
-    public JobContext createJobContext() {
-        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
-        try {
-            Thread.currentThread().setContextClassLoader(config.getClassLoader());
-            return new ContextFactory().createJobContext(config);
-        } finally {
-            Thread.currentThread().setContextClassLoader(ctxCL);
-        }
-    }
-
-    public <K1, V1, K2, V2> Mapper<K1, V1, K2, V2> getMapper() throws HyracksDataException {
-        try {
-            return (Mapper<K1, V1, K2, V2>) HadoopTools.newInstance(job.getMapperClass());
-        } catch (ClassNotFoundException e) {
-            throw new HyracksDataException(e);
-        } catch (InstantiationException e) {
-            throw new HyracksDataException(e);
-        } catch (IllegalAccessException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    public <K2, V2, K3, V3> Reducer<K2, V2, K3, V3> getReducer() throws HyracksDataException {
-        try {
-            return (Reducer<K2, V2, K3, V3>) HadoopTools.newInstance(job.getReducerClass());
-        } catch (ClassNotFoundException e) {
-            throw new HyracksDataException(e);
-        } catch (InstantiationException e) {
-            throw new HyracksDataException(e);
-        } catch (IllegalAccessException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    public <K2, V2> Reducer<K2, V2, K2, V2> getCombiner() throws HyracksDataException {
-        try {
-            return (Reducer<K2, V2, K2, V2>) HadoopTools.newInstance(job.getCombinerClass());
-        } catch (ClassNotFoundException e) {
-            throw new HyracksDataException(e);
-        } catch (InstantiationException e) {
-            throw new HyracksDataException(e);
-        } catch (IllegalAccessException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    public <K, V> InputFormat<K, V> getInputFormat() throws HyracksDataException {
-        try {
-            return (InputFormat<K, V>) ReflectionUtils.newInstance(job.getInputFormatClass(), config);
-        } catch (ClassNotFoundException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    public <K, V> List<InputSplit> getInputSplits() throws HyracksDataException {
-        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
-        try {
-            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
-            InputFormat<K, V> fmt = getInputFormat();
-            JobContext jCtx = new ContextFactory().createJobContext(config);
-            try {
-                return fmt.getSplits(jCtx);
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
-            }
-        } finally {
-            Thread.currentThread().setContextClassLoader(ctxCL);
-        }
-    }
-
-    public IBinaryComparatorFactory[] getSortComparatorFactories() {
-        WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
-                .getSortComparator().getClass());
-
-        return new IBinaryComparatorFactory[] { comparatorFactory };
-    }
-
-    public IBinaryComparatorFactory[] getGroupingComparatorFactories() {
-        WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
-                .getGroupingComparator().getClass());
-
-        return new IBinaryComparatorFactory[] { comparatorFactory };
-    }
-
-    public RawComparator<?> getRawGroupingComparator() {
-        return job.getGroupingComparator();
-    }
-
-    public int getSortFrameLimit(IHyracksCommonContext ctx) {
-        int sortMemory = job.getConfiguration().getInt("io.sort.mb", 100);
-        return (int) (((long) sortMemory * 1024 * 1024) / ctx.getInitialFrameSize());
-    }
-
-    public Job getJob() {
-        return job;
-    }
-
-    public MarshalledWritable<Configuration> getMarshalledConfiguration() {
-        return mConfig;
-    }
-
-    public Configuration getConfiguration() {
-        return config;
-    }
-
-    public ITuplePartitionComputerFactory getTuplePartitionComputer() throws HyracksDataException {
-        int nReducers = job.getNumReduceTasks();
-        try {
-            return new HadoopNewPartitionerTuplePartitionComputerFactory<Writable, Writable>(
-                    (Class<? extends Partitioner<Writable, Writable>>) job.getPartitionerClass(),
-                    (ISerializerDeserializer<Writable>) DatatypeHelper
-                            .createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputKeyClass()),
-                    (ISerializerDeserializer<Writable>) DatatypeHelper
-                            .createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputValueClass()));
-        } catch (ClassNotFoundException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    public int[] getSortFields() {
-        return KEY_SORT_FIELDS;
-    }
-
-    public <K> ISerializerDeserializer<K> getMapOutputKeySerializerDeserializer() {
-        return (ISerializerDeserializer<K>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
-                .getMapOutputKeyClass());
-    }
-
-    public <V> ISerializerDeserializer<V> getMapOutputValueSerializerDeserializer() {
-        return (ISerializerDeserializer<V>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
-                .getMapOutputValueClass());
-    }
-
-    public FileSystem getFilesystem() throws HyracksDataException {
-        try {
-            return FileSystem.get(config);
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    public <K, V> OutputFormat<K, V> getOutputFormat() throws HyracksDataException {
-        try {
-            return (OutputFormat<K, V>) ReflectionUtils.newInstance(job.getOutputFormatClass(), config);
-        } catch (ClassNotFoundException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    public boolean hasCombiner() throws HyracksDataException {
-        try {
-            return job.getCombinerClass() != null;
-        } catch (ClassNotFoundException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java
deleted file mode 100644
index fb86385..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-public class HadoopTools {
-    public static Object newInstance(String className) throws ClassNotFoundException, InstantiationException,
-            IllegalAccessException {
-        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
-        try {
-            Thread.currentThread().setContextClassLoader(HadoopTools.class.getClassLoader());
-            Class<?> clazz = Class.forName(className, true, HadoopTools.class.getClassLoader());
-            return newInstance(clazz);
-        } finally {
-            Thread.currentThread().setContextClassLoader(ctxCL);
-        }
-    }
-
-    public static Object newInstance(Class<?> clazz) throws InstantiationException, IllegalAccessException {
-        return clazz.newInstance();
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java
deleted file mode 100644
index 4892935..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.util.BitSet;
-
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.hyracks.api.comm.IFrameReader;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.IPartitionCollector;
-import org.apache.hyracks.api.comm.IPartitionWriterFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
-import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
-import org.apache.hyracks.dataflow.std.connectors.PartitionDataWriter;
-
-public class HashPartitioningShuffleConnectorDescriptor extends AbstractMToNConnectorDescriptor {
-    private static final long serialVersionUID = 1L;
-
-    private final MarshalledWritable<Configuration> mConfig;
-
-    public HashPartitioningShuffleConnectorDescriptor(IConnectorDescriptorRegistry spec, MarshalledWritable<Configuration> mConfig) {
-        super(spec);
-        this.mConfig = mConfig;
-    }
-
-    @Override
-    public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
-            IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException {
-        HadoopHelper helper = new HadoopHelper(mConfig);
-        ITuplePartitionComputerFactory tpcf = helper.getTuplePartitionComputer();
-        return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner());
-    }
-
-    @Override
-    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
-            int receiverIndex, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
-        BitSet expectedPartitions = new BitSet();
-        expectedPartitions.set(0, nProducerPartitions);
-        NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
-                expectedPartitions);
-        IFrameReader frameReader = new ShuffleFrameReader(ctx, channelReader, mConfig);
-        return new PartitionCollector(ctx, getConnectorId(), receiverIndex, expectedPartitions, frameReader,
-                channelReader);
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java
deleted file mode 100644
index 9cafaea..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IInputSplitProvider {
-    public InputSplit next() throws HyracksDataException;
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java
deleted file mode 100644
index 87f7cd7..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.io.Serializable;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IInputSplitProviderFactory extends Serializable {
-    public IInputSplitProvider createInputSplitProvider(int id) throws HyracksDataException;
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java
deleted file mode 100644
index d0e6ce2..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-public class InputFileSplit extends InputSplit implements Writable {
-    private Path file;
-    private long start;
-    private long length;
-    private int blockId;
-    private String[] hosts;
-    private long scheduleTime;
-
-    public InputFileSplit() {
-    }
-
-    public InputFileSplit(int blockId, Path file, long start, long length, String[] hosts, long schedule_time) {
-        this.blockId = blockId;
-        this.file = file;
-        this.start = start;
-        this.length = length;
-        this.hosts = hosts;
-        this.scheduleTime = schedule_time;
-    }
-
-    public int blockId() {
-        return blockId;
-    }
-
-    public long scheduleTime() {
-        return this.scheduleTime;
-    }
-
-    public Path getPath() {
-        return file;
-    }
-
-    /** The position of the first byte in the file to process. */
-    public long getStart() {
-        return start;
-    }
-
-    /** The number of bytes in the file to process. */
-    @Override
-    public long getLength() {
-        return length;
-    }
-
-    @Override
-    public String toString() {
-        return file + ":" + start + "+" + length;
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        Text.writeString(out, file.toString());
-        out.writeLong(start);
-        out.writeLong(length);
-        out.writeInt(blockId);
-        out.writeLong(this.scheduleTime);
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        file = new Path(Text.readString(in));
-        start = in.readLong();
-        length = in.readLong();
-        hosts = null;
-        this.blockId = in.readInt();
-        this.scheduleTime = in.readLong();
-    }
-
-    @Override
-    public String[] getLocations() throws IOException {
-        if (this.hosts == null) {
-            return new String[] {};
-        } else {
-            return this.hosts;
-        }
-    }
-
-    public FileSplit toFileSplit() {
-        return new FileSplit(file, start, length, hosts);
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/KVIterator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/KVIterator.java
deleted file mode 100644
index 3f02279..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/KVIterator.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.mapred.RawKeyValueIterator;
-import org.apache.hadoop.util.Progress;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class KVIterator implements RawKeyValueIterator {
-    private final HadoopHelper helper;
-    private FrameTupleAccessor accessor;
-    private DataInputBuffer kBuffer;
-    private DataInputBuffer vBuffer;
-    private List<IFrame> buffers;
-    private int bSize;
-    private int bPtr;
-    private int tIdx;
-    private boolean eog;
-
-    public KVIterator(HadoopHelper helper, RecordDescriptor recordDescriptor) {
-        this.helper = helper;
-        accessor = new FrameTupleAccessor(recordDescriptor);
-        kBuffer = new DataInputBuffer();
-        vBuffer = new DataInputBuffer();
-    }
-
-    void reset(List<IFrame> buffers, int bSize) {
-        this.buffers = buffers;
-        this.bSize = bSize;
-        bPtr = 0;
-        tIdx = 0;
-        eog = false;
-        if (bSize > 0) {
-            accessor.reset(buffers.get(0).getBuffer());
-            tIdx = -1;
-        } else {
-            eog = true;
-        }
-    }
-
-    @Override
-    public DataInputBuffer getKey() throws IOException {
-        return kBuffer;
-    }
-
-    @Override
-    public DataInputBuffer getValue() throws IOException {
-        return vBuffer;
-    }
-
-    @Override
-    public boolean next() throws IOException {
-        while (true) {
-            if (eog) {
-                return false;
-            }
-            ++tIdx;
-            if (accessor.getTupleCount() <= tIdx) {
-                ++bPtr;
-                if (bPtr >= bSize) {
-                    eog = true;
-                    continue;
-                }
-                tIdx = -1;
-                accessor.reset(buffers.get(bPtr).getBuffer());
-                continue;
-            }
-            kBuffer.reset(accessor.getBuffer().array(),
-                    accessor.getAbsoluteFieldStartOffset(tIdx, helper.KEY_FIELD_INDEX),
-                    accessor.getFieldLength(tIdx, helper.KEY_FIELD_INDEX));
-            vBuffer.reset(accessor.getBuffer().array(),
-                    accessor.getAbsoluteFieldStartOffset(tIdx, helper.KEY_FIELD_INDEX),
-                    accessor.getFieldLength(tIdx, helper.VALUE_FIELD_INDEX));
-            break;
-        }
-        return true;
-    }
-
-    @Override
-    public void close() throws IOException {
-
-    }
-
-    @Override
-    public Progress getProgress() {
-        return null;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
deleted file mode 100644
index 85cd34d..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.hadoop.util.MRContextUtil;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.sort.Algorithm;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortRunMerger;
-
-public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable>
-        extends AbstractSingleActivityOperatorDescriptor {
-    private static final long serialVersionUID = 1L;
-    private final int jobId;
-    private final MarshalledWritable<Configuration> config;
-    private final IInputSplitProviderFactory factory;
-
-    public MapperOperatorDescriptor(IOperatorDescriptorRegistry spec, int jobId,
-            MarshalledWritable<Configuration> config, IInputSplitProviderFactory factory) throws HyracksDataException {
-        super(spec, 0, 1);
-        this.jobId = jobId;
-        this.config = config;
-        this.factory = factory;
-        HadoopHelper helper = new HadoopHelper(config);
-        recordDescriptors[0] = helper.getMapOutputRecordDescriptor();
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-                    throws HyracksDataException {
-        final HadoopHelper helper = new HadoopHelper(config);
-        final Configuration conf = helper.getConfiguration();
-        final Mapper<K1, V1, K2, V2> mapper = helper.getMapper();
-        final InputFormat<K1, V1> inputFormat = helper.getInputFormat();
-        final IInputSplitProvider isp = factory.createInputSplitProvider(partition);
-        final TaskAttemptID taId = new TaskAttemptID("foo", jobId, true, partition, 0);
-        final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
-
-        final int framesLimit = helper.getSortFrameLimit(ctx);
-        final IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
-
-        class SortingRecordWriter extends RecordWriter<K2, V2> {
-            private final ArrayTupleBuilder tb;
-            private final IFrame frame;
-            private final FrameTupleAppender fta;
-            private ExternalSortRunGenerator runGen;
-            private int blockId;
-
-            public SortingRecordWriter() throws HyracksDataException {
-                tb = new ArrayTupleBuilder(2);
-                frame = new VSizeFrame(ctx);
-                fta = new FrameTupleAppender(frame);
-            }
-
-            public void initBlock(int blockId) throws HyracksDataException {
-                runGen = new ExternalSortRunGenerator(ctx, new int[] { 0 }, null, comparatorFactories,
-                        helper.getMapOutputRecordDescriptorWithoutExtraFields(), Algorithm.MERGE_SORT, framesLimit);
-                this.blockId = blockId;
-            }
-
-            @Override
-            public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
-            }
-
-            @Override
-            public void write(K2 key, V2 value) throws IOException, InterruptedException {
-                DataOutput dos = tb.getDataOutput();
-                tb.reset();
-                key.write(dos);
-                tb.addFieldEndOffset();
-                value.write(dos);
-                tb.addFieldEndOffset();
-                if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    runGen.nextFrame(frame.getBuffer());
-                    fta.reset(frame, true);
-                    if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                        throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size ("
-                                + frame.getBuffer().capacity() + ")");
-                    }
-                }
-            }
-
-            public void sortAndFlushBlock(final IFrameWriter writer) throws HyracksDataException {
-                if (fta.getTupleCount() > 0) {
-                    runGen.nextFrame(frame.getBuffer());
-                    fta.reset(frame, true);
-                }
-                runGen.close();
-                IFrameWriter delegatingWriter = new IFrameWriter() {
-                    private final FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
-                    private final FrameTupleAccessor fta = new FrameTupleAccessor(
-                            helper.getMapOutputRecordDescriptorWithoutExtraFields());
-                    private final ArrayTupleBuilder tb = new ArrayTupleBuilder(3);
-
-                    @Override
-                    public void open() throws HyracksDataException {
-                    }
-
-                    @Override
-                    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                        fta.reset(buffer);
-                        int n = fta.getTupleCount();
-                        for (int i = 0; i < n; ++i) {
-                            tb.reset();
-                            tb.addField(fta, i, 0);
-                            tb.addField(fta, i, 1);
-                            try {
-                                tb.getDataOutput().writeInt(blockId);
-                            } catch (IOException e) {
-                                throw new HyracksDataException(e);
-                            }
-                            tb.addFieldEndOffset();
-                            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                                appender.write(writer, true);
-                                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                                    throw new IllegalStateException();
-                                }
-                            }
-                        }
-                    }
-
-                    @Override
-                    public void close() throws HyracksDataException {
-                        appender.write(writer, true);
-                    }
-
-                    @Override
-                    public void fail() throws HyracksDataException {
-                        // TODO Auto-generated method stub
-
-                    }
-                };
-                if (helper.hasCombiner()) {
-                    Reducer<K2, V2, K2, V2> combiner = helper.getCombiner();
-                    TaskAttemptID ctaId = new TaskAttemptID("foo", jobId, true, partition, 0);
-                    TaskAttemptContext ctaskAttemptContext = helper.createTaskAttemptContext(taId);
-                    final IFrameWriter outputWriter = delegatingWriter;
-                    RecordWriter<K2, V2> recordWriter = new RecordWriter<K2, V2>() {
-                        private final FrameTupleAppender fta = new FrameTupleAppender(new VSizeFrame(ctx));
-                        private final ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
-
-                        {
-                            outputWriter.open();
-                        }
-
-                        @Override
-                        public void write(K2 key, V2 value) throws IOException, InterruptedException {
-                            DataOutput dos = tb.getDataOutput();
-                            tb.reset();
-                            key.write(dos);
-                            tb.addFieldEndOffset();
-                            value.write(dos);
-                            tb.addFieldEndOffset();
-                            if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                                fta.write(outputWriter, true);
-                                if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                                    throw new IllegalStateException();
-                                }
-                            }
-                        }
-
-                        @Override
-                        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-                            fta.write(outputWriter, true);
-                        }
-                    };
-                    delegatingWriter = new ReduceWriter<K2, V2, K2, V2>(ctx, helper,
-                            new int[] { HadoopHelper.KEY_FIELD_INDEX }, helper.getGroupingComparatorFactories(),
-                            helper.getMapOutputRecordDescriptorWithoutExtraFields(), combiner, recordWriter, ctaId,
-                            ctaskAttemptContext);
-                }
-                IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-                for (int i = 0; i < comparatorFactories.length; ++i) {
-                    comparators[i] = comparatorFactories[i].createBinaryComparator();
-                }
-                ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getSorter(), runGen.getRuns(),
-                        new int[] { 0 }, comparators, null, helper.getMapOutputRecordDescriptorWithoutExtraFields(),
-                        framesLimit, delegatingWriter);
-                merger.process();
-            }
-        }
-
-        return new AbstractUnaryOutputSourceOperatorNodePushable() {
-            @SuppressWarnings("unchecked")
-            @Override
-            public void initialize() throws HyracksDataException {
-                try {
-                    writer.open();
-                    SortingRecordWriter recordWriter = new SortingRecordWriter();
-                    InputSplit split = null;
-                    int blockId = 0;
-                    while ((split = isp.next()) != null) {
-                        try {
-                            RecordReader<K1, V1> recordReader = inputFormat.createRecordReader(split,
-                                    taskAttemptContext);
-                            ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
-                            try {
-                                Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
-                                recordReader.initialize(split, taskAttemptContext);
-                            } finally {
-                                Thread.currentThread().setContextClassLoader(ctxCL);
-                            }
-                            recordWriter.initBlock(blockId);
-                            Mapper<K1, V1, K2, V2>.Context mCtx = new MRContextUtil().createMapContext(conf, taId,
-                                    recordReader, recordWriter, null, null, split);
-                            mapper.run(mCtx);
-                            recordReader.close();
-                            recordWriter.sortAndFlushBlock(writer);
-                            ++blockId;
-                        } catch (IOException e) {
-                            throw new HyracksDataException(e);
-                        } catch (InterruptedException e) {
-                            throw new HyracksDataException(e);
-                        }
-                    }
-                } catch (Throwable th) {
-                    writer.fail();
-                    throw th;
-                } finally {
-                    writer.close();
-                }
-            }
-        };
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java
deleted file mode 100644
index df53c39..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.hadoop.io.Writable;
-
-public class MarshalledWritable<T extends Writable> implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private byte[] bytes;
-
-    public MarshalledWritable() {
-    }
-
-    public void set(T o) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(baos);
-        dos.writeUTF(o.getClass().getName());
-        o.write(dos);
-        dos.close();
-        bytes = baos.toByteArray();
-    }
-
-    public T get() throws Exception {
-        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-        DataInputStream dis = new DataInputStream(bais);
-        String className = dis.readUTF();
-        T o = (T) HadoopTools.newInstance(className);
-        o.readFields(dis);
-        return o;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
deleted file mode 100644
index e64978f..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.counters.GenericCounter;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.hadoop.util.MRContextUtil;
-
-public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
-    private final IHyracksTaskContext ctx;
-    private final HadoopHelper helper;
-    private final int[] groupFields;
-    private final FrameTupleAccessor accessor0;
-    private final FrameTupleAccessor accessor1;
-    private final IFrame copyFrame;
-    private final IBinaryComparator[] comparators;
-    private final KVIterator kvi;
-    private final Reducer<K2, V2, K3, V3> reducer;
-    private final RecordWriter<K3, V3> recordWriter;
-    private final TaskAttemptID taId;
-    private final TaskAttemptContext taskAttemptContext;
-
-    private boolean first;
-    private boolean groupStarted;
-    private List<IFrame> group;
-    private int bPtr;
-    private FrameTupleAppender fta;
-    private Counter keyCounter;
-    private Counter valueCounter;
-
-    public ReduceWriter(IHyracksTaskContext ctx, HadoopHelper helper, int[] groupFields,
-            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
-            Reducer<K2, V2, K3, V3> reducer, RecordWriter<K3, V3> recordWriter, TaskAttemptID taId,
-            TaskAttemptContext taskAttemptContext) throws HyracksDataException {
-        this.ctx = ctx;
-        this.helper = helper;
-        this.groupFields = groupFields;
-        accessor0 = new FrameTupleAccessor(recordDescriptor);
-        accessor1 = new FrameTupleAccessor(recordDescriptor);
-        copyFrame = new VSizeFrame(ctx);
-        accessor1.reset(copyFrame.getBuffer());
-        comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-        this.reducer = reducer;
-        this.recordWriter = recordWriter;
-        this.taId = taId;
-        this.taskAttemptContext = taskAttemptContext;
-
-        kvi = new KVIterator(helper, recordDescriptor);
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        first = true;
-        groupStarted = false;
-        group = new ArrayList<>();
-        bPtr = 0;
-        group.add(new VSizeFrame(ctx));
-        fta = new FrameTupleAppender();
-        keyCounter = new GenericCounter();
-        valueCounter = new GenericCounter();
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        accessor0.reset(buffer);
-        int nTuples = accessor0.getTupleCount();
-        for (int i = 0; i < nTuples; ++i) {
-            if (first) {
-                groupInit();
-                first = false;
-            } else {
-                if (i == 0) {
-                    accessor1.reset(copyFrame.getBuffer());
-                    switchGroupIfRequired(accessor1, accessor1.getTupleCount() - 1, accessor0, i);
-                } else {
-                    switchGroupIfRequired(accessor0, i - 1, accessor0, i);
-                }
-            }
-            accumulate(accessor0, i);
-        }
-        copyFrame.ensureFrameSize(buffer.capacity());
-        FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
-    }
-
-    private void accumulate(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
-        if (!fta.append(accessor, tIndex)) {
-            ++bPtr;
-            if (group.size() <= bPtr) {
-                group.add(new VSizeFrame(ctx));
-            }
-            fta.reset(group.get(bPtr), true);
-            if (!fta.append(accessor, tIndex)) {
-                throw new HyracksDataException("Record size ("
-                        + (accessor.getTupleEndOffset(tIndex) - accessor.getTupleStartOffset(tIndex))
-                        + ") larger than frame size (" + group.get(bPtr).getBuffer().capacity() + ")");
-            }
-        }
-    }
-
-    private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
-            FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
-        if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
-            reduce();
-            groupInit();
-        }
-    }
-
-    private void groupInit() throws HyracksDataException {
-        groupStarted = true;
-        bPtr = 0;
-        fta.reset(group.get(0), true);
-    }
-
-    private void reduce() throws HyracksDataException {
-        kvi.reset(group, bPtr + 1);
-        try {
-            Reducer<K2, V2, K3, V3>.Context rCtx = new MRContextUtil().createReduceContext(helper.getConfiguration(),
-                    taId, kvi, keyCounter, valueCounter, recordWriter, null, null, (RawComparator<K2>) helper
-                            .getRawGroupingComparator(), (Class<K2>) helper.getJob().getMapOutputKeyClass(),
-                    (Class<V2>) helper.getJob().getMapOutputValueClass());
-            reducer.run(rCtx);
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-        groupStarted = false;
-    }
-
-    private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx)
-            throws HyracksDataException {
-        for (int i = 0; i < comparators.length; ++i) {
-            int fIdx = groupFields[i];
-            int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
-            int l1 = a1.getFieldLength(t1Idx, fIdx);
-            int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
-            int l2 = a2.getFieldLength(t2Idx, fIdx);
-            if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (groupStarted) {
-            reduce();
-        }
-        try {
-            recordWriter.close(taskAttemptContext);
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java
deleted file mode 100644
index d1bfbf7..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-
-public class ReducerOperatorDescriptor<K2 extends Writable, V2 extends Writable, K3 extends Writable, V3 extends Writable>
-        extends AbstractSingleActivityOperatorDescriptor {
-    private static final long serialVersionUID = 1L;
-
-    private final int jobId;
-
-    private MarshalledWritable<Configuration> mConfig;
-
-    public ReducerOperatorDescriptor(IOperatorDescriptorRegistry spec, int jobId, MarshalledWritable<Configuration> mConfig) {
-        super(spec, 1, 0);
-        this.jobId = jobId;
-        this.mConfig = mConfig;
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        final HadoopHelper helper = new HadoopHelper(mConfig);
-        final Reducer<K2, V2, K3, V3> reducer = helper.getReducer();
-        final RecordDescriptor recordDescriptor = helper.getMapOutputRecordDescriptor();
-        final int[] groupFields = helper.getSortFields();
-        IBinaryComparatorFactory[] groupingComparators = helper.getGroupingComparatorFactories();
-
-        final TaskAttemptID taId = new TaskAttemptID("foo", jobId, false, partition, 0);
-        final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
-        final RecordWriter recordWriter;
-        try {
-            recordWriter = helper.getOutputFormat().getRecordWriter(taskAttemptContext);
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-
-        final ReduceWriter<K2, V2, K3, V3> rw = new ReduceWriter<K2, V2, K3, V3>(ctx, helper, groupFields,
-                groupingComparators, recordDescriptor, reducer, recordWriter, taId, taskAttemptContext);
-
-        return new AbstractUnaryInputSinkOperatorNodePushable() {
-            @Override
-            public void open() throws HyracksDataException {
-                rw.open();
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                rw.nextFrame(buffer);
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-                rw.close();
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-            }
-        };
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
deleted file mode 100644
index fb9c6fb..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hyracks.api.comm.FrameHelper;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameReader;
-import org.apache.hyracks.api.comm.NoShrinkVSizeFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortRunMerger;
-import org.apache.hyracks.dataflow.std.structures.RunAndMaxFrameSizePair;
-
-public class ShuffleFrameReader implements IFrameReader {
-    private final IHyracksTaskContext ctx;
-    private final NonDeterministicChannelReader channelReader;
-    private final HadoopHelper helper;
-    private final RecordDescriptor recordDescriptor;
-    private final IFrame vframe;
-    private List<RunFileWriter> runFileWriters;
-    private List<Integer> runFileMaxFrameSize;
-    private RunFileReader reader;
-
-    public ShuffleFrameReader(IHyracksTaskContext ctx, NonDeterministicChannelReader channelReader,
-            MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
-        this.ctx = ctx;
-        this.channelReader = channelReader;
-        this.helper = new HadoopHelper(mConfig);
-        this.recordDescriptor = helper.getMapOutputRecordDescriptor();
-        this.vframe = new NoShrinkVSizeFrame(ctx);
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        channelReader.open();
-        int nSenders = channelReader.getSenderPartitionCount();
-        runFileWriters = new ArrayList<RunFileWriter>();
-        runFileMaxFrameSize = new ArrayList<>();
-        RunInfo[] infos = new RunInfo[nSenders];
-        FrameTupleAccessor accessor = new FrameTupleAccessor(recordDescriptor);
-        while (true) {
-            int entry = channelReader.findNextSender();
-            if (entry < 0) {
-                break;
-            }
-            RunInfo info = infos[entry];
-            ByteBuffer netBuffer = channelReader.getNextBuffer(entry);
-            netBuffer.clear();
-            int nBlocks = FrameHelper.deserializeNumOfMinFrame(netBuffer);
-
-            if (nBlocks > 1) {
-                netBuffer = getCompleteBuffer(nBlocks, netBuffer, entry);
-            }
-
-            accessor.reset(netBuffer, 0, netBuffer.limit());
-            int nTuples = accessor.getTupleCount();
-            for (int i = 0; i < nTuples; ++i) {
-                int tBlockId = IntegerPointable.getInteger(accessor.getBuffer().array(),
-                        accessor.getAbsoluteFieldStartOffset(i, HadoopHelper.BLOCKID_FIELD_INDEX));
-                if (info == null) {
-                    info = new RunInfo();
-                    info.reset(tBlockId);
-                    infos[entry] = info;
-                } else if (info.blockId != tBlockId) {
-                    info.close();
-                    info.reset(tBlockId);
-                }
-                info.write(accessor, i);
-            }
-
-            if (nBlocks == 1) {
-                channelReader.recycleBuffer(entry, netBuffer);
-            }
-        }
-        for (int i = 0; i < infos.length; ++i) {
-            RunInfo info = infos[i];
-            if (info != null) {
-                info.close();
-            }
-        }
-
-        FileReference outFile = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
-        int framesLimit = helper.getSortFrameLimit(ctx);
-        IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
-        IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-        List<RunAndMaxFrameSizePair> runs = new LinkedList<>();
-        for (int i = 0; i < runFileWriters.size(); i++) {
-            runs.add(new RunAndMaxFrameSizePair(runFileWriters.get(i).createDeleteOnCloseReader(), runFileMaxFrameSize
-                    .get(i)));
-        }
-        RunFileWriter rfw = new RunFileWriter(outFile, ctx.getIOManager());
-        ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 }, comparators, null,
-                recordDescriptor, framesLimit, rfw);
-        merger.process();
-
-        reader = rfw.createDeleteOnCloseReader();
-        reader.open();
-    }
-
-    private ByteBuffer getCompleteBuffer(int nBlocks, ByteBuffer netBuffer, int entry) throws HyracksDataException {
-        vframe.reset();
-        vframe.ensureFrameSize(vframe.getMinSize() * nBlocks);
-        FrameUtils.copyWholeFrame(netBuffer, vframe.getBuffer());
-        channelReader.recycleBuffer(entry, netBuffer);
-        for (int i = 1; i < nBlocks; ++i) {
-            netBuffer = channelReader.getNextBuffer(entry);
-            netBuffer.clear();
-            vframe.getBuffer().put(netBuffer);
-            channelReader.recycleBuffer(entry, netBuffer);
-        }
-        if (vframe.getBuffer().hasRemaining()) { // bigger frame
-            FrameHelper.clearRemainingFrame(vframe.getBuffer(), vframe.getBuffer().position());
-        }
-        vframe.getBuffer().flip();
-        return vframe.getBuffer();
-    }
-
-    @Override
-    public boolean nextFrame(IFrame frame) throws HyracksDataException {
-        return reader.nextFrame(frame);
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        reader.close();
-    }
-
-    private class RunInfo {
-        private final IFrame buffer;
-        private final FrameTupleAppender fta;
-
-        private FileReference file;
-        private RunFileWriter rfw;
-        private int blockId;
-        private int maxFrameSize = ctx.getInitialFrameSize();
-
-        public RunInfo() throws HyracksDataException {
-            buffer = new VSizeFrame(ctx);
-            fta = new FrameTupleAppender();
-        }
-
-        public void reset(int blockId) throws HyracksDataException {
-            this.blockId = blockId;
-            this.maxFrameSize = ctx.getInitialFrameSize();
-            fta.reset(buffer, true);
-            try {
-                file = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
-                rfw = new RunFileWriter(file, ctx.getIOManager());
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-
-        public void write(FrameTupleAccessor accessor, int tIdx) throws HyracksDataException {
-            if (!fta.append(accessor, tIdx)) {
-                flush();
-                if (!fta.append(accessor, tIdx)) {
-                    throw new HyracksDataException("Record size ("
-                            + (accessor.getTupleEndOffset(tIdx) - accessor.getTupleStartOffset(tIdx))
-                            + ") larger than frame size (" + fta.getBuffer().capacity() + ")");
-                }
-            }
-        }
-
-        public void close() throws HyracksDataException {
-            flush();
-            rfw.close();
-            runFileWriters.add(rfw);
-            runFileMaxFrameSize.add(maxFrameSize);
-        }
-
-        private void flush() throws HyracksDataException {
-            if (fta.getTupleCount() <= 0) {
-                return;
-            }
-            maxFrameSize = buffer.getFrameSize() > maxFrameSize ? buffer.getFrameSize() : maxFrameSize;
-            rfw.nextFrame((ByteBuffer) buffer.getBuffer().clear());
-            fta.reset(buffer, true);
-        }
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
deleted file mode 100644
index 438b817..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.util;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-
-public class ClasspathBasedHadoopClassFactory implements IHadoopClassFactory {
-
-    @Override
-    public Object createMapper(String mapClassName, JobConf conf) throws Exception {
-        Class clazz = loadClass(mapClassName);
-        return ReflectionUtils.newInstance(clazz, conf);
-    }
-
-    @Override
-    public Object createReducer(String reduceClassName, JobConf conf) throws Exception {
-        Class clazz = loadClass(reduceClassName);
-        return ReflectionUtils.newInstance(clazz, conf);
-    }
-
-    @Override
-    public Class loadClass(String className) throws Exception {
-        Class clazz = Class.forName(className);
-        return clazz;
-    }
-
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/DatatypeHelper.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/DatatypeHelper.java
deleted file mode 100644
index ff4d282..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/DatatypeHelper.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.util;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-@SuppressWarnings("deprecation")
-public class DatatypeHelper {
-    private static final class WritableSerializerDeserializer<T extends Writable> implements ISerializerDeserializer<T> {
-        private static final long serialVersionUID = 1L;
-
-        private Class<T> clazz;
-
-        private WritableSerializerDeserializer(Class<T> clazz) {
-            this.clazz = clazz;
-        }
-
-        private T createInstance() throws HyracksDataException {
-            // TODO remove "if", create a new WritableInstanceOperations class
-            // that deals with Writables that don't have public constructors
-            if (NullWritable.class.equals(clazz)) {
-                return (T) NullWritable.get();
-            }
-            try {
-                return clazz.newInstance();
-            } catch (InstantiationException e) {
-                throw new HyracksDataException(e);
-            } catch (IllegalAccessException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-
-        @Override
-        public T deserialize(DataInput in) throws HyracksDataException {
-            T o = createInstance();
-            try {
-                o.readFields(in);
-            } catch (IOException e) {
-                e.printStackTrace();
-                throw new HyracksDataException(e);
-            }
-            return o;
-        }
-
-        @Override
-        public void serialize(T instance, DataOutput out) throws HyracksDataException {
-            try {
-                instance.write(out);
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-    }
-
-    public static ISerializerDeserializer<? extends Writable> createSerializerDeserializer(
-            Class<? extends Writable> fClass) {
-        return new WritableSerializerDeserializer(fClass);
-    }
-
-    public static RecordDescriptor createKeyValueRecordDescriptor(Class<? extends Writable> keyClass,
-            Class<? extends Writable> valueClass) {
-        ISerializerDeserializer[] fields = new ISerializerDeserializer[2];
-        fields[0] = createSerializerDeserializer(keyClass);
-        fields[1] = createSerializerDeserializer(valueClass);
-        return new RecordDescriptor(fields);
-    }
-
-    public static RecordDescriptor createOneFieldRecordDescriptor(Class<? extends Writable> fieldClass) {
-        ISerializerDeserializer[] fields = new ISerializerDeserializer[1];
-        fields[0] = createSerializerDeserializer(fieldClass);
-        return new RecordDescriptor(fields);
-    }
-
-    public static JobConf map2JobConf(Map<String, String> jobConfMap) {
-        JobConf jobConf;
-        synchronized (Configuration.class) {
-            jobConf = new JobConf();
-            for (Entry<String, String> entry : jobConfMap.entrySet()) {
-                jobConf.set(entry.getKey(), entry.getValue());
-            }
-        }
-        return jobConf;
-    }
-
-    public static Map<String, String> jobConf2Map(JobConf jobConf) {
-        Map<String, String> jobConfMap = new HashMap<String, String>();
-        for (Entry<String, String> entry : jobConf) {
-            jobConfMap.put(entry.getKey(), entry.getValue());
-        }
-        return jobConfMap;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/DuplicateKeyMapper.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/DuplicateKeyMapper.java
deleted file mode 100644
index ab32ade..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/DuplicateKeyMapper.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.util;
-
-import java.util.Properties;
-
-import org.apache.hyracks.api.dataflow.IDataWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.map.IDeserializedMapper;
-
-public class DuplicateKeyMapper implements IDeserializedMapper {
-
-    @Override
-    public void map(Object[] data, IDataWriter<Object[]> writer) throws HyracksDataException {
-        writer.writeData(new Object[] { data[0], data[1], data[0] });
-
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
deleted file mode 100644
index 07c4c89..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.util;
-
-import java.io.Serializable;
-
-import org.apache.hadoop.mapred.JobConf;
-
-public interface IHadoopClassFactory extends Serializable {
-
-    public Object createMapper(String mapClassName, JobConf conf) throws Exception;
-
-    public Object createReducer(String reduceClassName, JobConf conf) throws Exception;
-
-    public Class loadClass(String className) throws Exception;
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/InputSplitsProxy.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
deleted file mode 100644
index eed2885..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.util;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-
-public class InputSplitsProxy implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private final Class[] isClasses;
-    private final byte[] bytes;
-
-    public InputSplitsProxy(JobConf conf, Object[] inputSplits) throws IOException {
-        isClasses = new Class[inputSplits.length];
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(baos);
-        if (conf.getUseNewMapper()) {
-            for (int i = 0; i < inputSplits.length; ++i) {
-                isClasses[i] = ((org.apache.hadoop.mapreduce.InputSplit) inputSplits[i]).getClass();
-                ((Writable) inputSplits[i]).write(dos);
-            }
-        } else {
-            for (int i = 0; i < inputSplits.length; ++i) {
-                isClasses[i] = ((org.apache.hadoop.mapred.InputSplit) inputSplits[i]).getClass();
-                ((Writable) inputSplits[i]).write(dos);
-            }
-        }
-        dos.close();
-        bytes = baos.toByteArray();
-
-    }
-
-    public Object[] toInputSplits(JobConf jobConf) throws InstantiationException, IllegalAccessException, IOException {
-        Object[] splits = new Object[isClasses.length];
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
-        for (int i = 0; i < splits.length; ++i) {
-            splits[i] = ReflectionUtils.newInstance(isClasses[i], jobConf);
-            if (jobConf.getUseNewMapper()) {
-                ((Writable) splits[i]).readFields(dis);
-            } else {
-                ((Writable) splits[i]).readFields(dis);
-            }
-        }
-        return splits;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/MRContextUtil.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/MRContextUtil.java
deleted file mode 100644
index 047a6fc..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/MRContextUtil.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.util;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.mapred.RawKeyValueIterator;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.StatusReporter;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
-import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
-import org.apache.hadoop.mapreduce.task.MapContextImpl;
-import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * The wrapper to generate TaskTattemptContext
- */
-public class MRContextUtil {
-    //public static Reducer.Context = create
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public Mapper.Context createMapContext(Configuration conf, TaskAttemptID taskid, RecordReader reader,
-            RecordWriter writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) {
-        return new WrappedMapper().getMapContext(new MapContextImpl(conf, taskid, reader, writer, committer, reporter,
-                split));
-    }
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public Reducer.Context createReduceContext(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input,
-            Counter inputKeyCounter, Counter inputValueCounter, RecordWriter output, OutputCommitter committer,
-            StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass)
-            throws HyracksDataException {
-        try {
-            return new WrappedReducer().getReducerContext(new ReduceContextImpl(conf, taskid, input, inputKeyCounter,
-                    inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass));
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/PreappendLongWritableMapper.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/PreappendLongWritableMapper.java
deleted file mode 100644
index ac5aec2..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/PreappendLongWritableMapper.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.util;
-
-import java.util.Properties;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-import org.apache.hyracks.api.dataflow.IDataWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.map.IDeserializedMapper;
-
-public class PreappendLongWritableMapper implements IDeserializedMapper {
-
-    @Override
-    public void map(Object[] data, IDataWriter<Object[]> writer) throws HyracksDataException {
-        writer.writeData(new Object[] { new LongWritable(0), new Text(String.valueOf(data[0])) });
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/pom.xml b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/pom.xml
deleted file mode 100644
index a247eff..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/pom.xml
+++ /dev/null
@@ -1,88 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements.  See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership.  The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License.  You may obtain a copy of the License at
- !
- !   http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied.  See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>hyracks-yarn-am</artifactId>
-  <parent>
-    <groupId>org.apache.hyracks</groupId>
-    <artifactId>hyracks-yarn</artifactId>
-    <version>0.2.1-SNAPSHOT</version>
-  </parent>
-
-  <properties>
-    <root.dir>${basedir}/../../..</root.dir>
-  </properties>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>appassembler-maven-plugin</artifactId>
-        <executions>
-          <execution>
-            <configuration>
-              <programs>
-                <program>
-                  <mainClass>org.apache.hyracks.yarn.am.HyracksYarnApplicationMaster</mainClass>
-                  <name>hyracks-yarn-am</name>
-                </program>
-              </programs>
-              <repositoryLayout>flat</repositoryLayout>
-              <repositoryName>lib</repositoryName>
-            </configuration>
-            <phase>package</phase>
-            <goals>
-              <goal>assemble</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <artifactId>maven-assembly-plugin</artifactId>
-        <version>2.2-beta-5</version>
-        <executions>
-          <execution>
-            <configuration>
-              <descriptors>
-                <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
-              </descriptors>
-            </configuration>
-            <phase>package</phase>
-            <goals>
-              <goal>attached</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-  <dependencies>
-    <dependency>
-      <groupId>args4j</groupId>
-      <artifactId>args4j</artifactId>
-      <version>2.0.16</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-yarn-common</artifactId>
-      <version>0.2.1-SNAPSHOT</version>
-    </dependency>
-  </dependencies>
-</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/assembly/binary-assembly.xml b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/assembly/binary-assembly.xml
deleted file mode 100644
index d4a2ea1..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/assembly/binary-assembly.xml
+++ /dev/null
@@ -1,38 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements.  See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership.  The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License.  You may obtain a copy of the License at
- !
- !   http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied.  See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<assembly>
-  <id>binary-assembly</id>
-  <formats>
-    <format>zip</format>
-    <format>dir</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-  <fileSets>
-    <fileSet>
-      <directory>target/appassembler/bin</directory>
-      <outputDirectory>bin</outputDirectory>
-      <fileMode>0755</fileMode>
-    </fileSet>
-    <fileSet>
-      <directory>target/appassembler/lib</directory>
-      <outputDirectory>lib</outputDirectory>
-    </fileSet>
-  </fileSets>
-</assembly>
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/HyracksYarnApplicationMaster.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/HyracksYarnApplicationMaster.java
deleted file mode 100644
index 86f1539..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/HyracksYarnApplicationMaster.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.am;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.kohsuke.args4j.CmdLineParser;
-
-import org.apache.hyracks.yarn.am.manifest.AbstractProcess;
-import org.apache.hyracks.yarn.am.manifest.ContainerSpecification;
-import org.apache.hyracks.yarn.am.manifest.HyracksCluster;
-import org.apache.hyracks.yarn.am.manifest.ManifestParser;
-import org.apache.hyracks.yarn.am.manifest.NodeController;
-import org.apache.hyracks.yarn.common.protocols.amrm.AMRMConnection;
-
-public class HyracksYarnApplicationMaster {
-    private final Options options;
-
-    private final Timer timer;
-
-    private final List<ResourceRequest> asks;
-
-    private final Map<Resource, Set<AskRecord>> resource2AskMap;
-
-    private final Map<AbstractProcess, AskRecord> proc2AskMap;
-
-    private final AtomicInteger lastResponseId;
-
-    private final ApplicationAttemptId appAttemptId;
-
-    private YarnConfiguration config;
-
-    private AMRMConnection amrmc;
-
-    private RegisterApplicationMasterResponse registration;
-
-    private HyracksCluster hcManifest;
-
-    private HyracksYarnApplicationMaster(Options options) {
-        this.options = options;
-        timer = new Timer(true);
-        asks = new ArrayList<ResourceRequest>();
-        resource2AskMap = new HashMap<Resource, Set<AskRecord>>();
-        proc2AskMap = new HashMap<AbstractProcess, AskRecord>();
-        lastResponseId = new AtomicInteger();
-
-        String containerIdStr = System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
-        ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
-        appAttemptId = containerId.getApplicationAttemptId();
-    }
-
-    private void run() throws Exception {
-        Configuration conf = new Configuration();
-        config = new YarnConfiguration(conf);
-        amrmc = new AMRMConnection(config);
-
-        performRegistration();
-        setupHeartbeats();
-        parseManifest();
-        setupAsks();
-        while (true) {
-            Thread.sleep(1000);
-        }
-    }
-
-    private synchronized void setupAsks() {
-        setupAsk(hcManifest.getClusterController());
-        for (NodeController nc : hcManifest.getNodeControllers()) {
-            setupAsk(nc);
-        }
-    }
-
-    private void setupAsk(AbstractProcess proc) {
-        ContainerSpecification cSpec = proc.getContainerSpecification();
-        ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
-
-        rsrcRequest.setHostName(cSpec.getHostname());
-
-        Priority pri = Records.newRecord(Priority.class);
-        pri.setPriority(0);
-        rsrcRequest.setPriority(pri);
-
-        Resource capability = Records.newRecord(Resource.class);
-        capability.setMemory(cSpec.getMemory());
-        rsrcRequest.setCapability(capability);
-
-        rsrcRequest.setNumContainers(1);
-
-        AskRecord ar = new AskRecord();
-        ar.req = rsrcRequest;
-        ar.proc = proc;
-
-        Set<AskRecord> arSet = resource2AskMap.get(capability);
-        if (arSet == null) {
-            arSet = new HashSet<AskRecord>();
-            resource2AskMap.put(capability, arSet);
-        }
-        arSet.add(ar);
-        proc2AskMap.put(proc, ar);
-
-        System.err.println(proc + " -> [" + rsrcRequest.getHostName() + ", " + rsrcRequest.getNumContainers() + ", "
-                + rsrcRequest.getPriority() + ", " + rsrcRequest.getCapability().getMemory() + "]");
-
-        asks.add(rsrcRequest);
-    }
-
-    private void parseManifest() throws Exception {
-        String str = FileUtils.readFileToString(new File("manifest.xml"));
-        hcManifest = ManifestParser.parse(str);
-    }
-
-    private void setupHeartbeats() {
-        long heartbeatInterval = config.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
-                YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
-        System.err.println("Heartbeat interval: " + heartbeatInterval);
-        heartbeatInterval = Math.min(heartbeatInterval, 1000);
-        System.err.println("Heartbeat interval: " + heartbeatInterval);
-        timer.schedule(new TimerTask() {
-            @Override
-            public void run() {
-                AllocateRequest hb = Records.newRecord(AllocateRequest.class);
-                populateAllocateRequest(hb);
-                hb.setApplicationAttemptId(amrmc.getApplicationAttemptId());
-                hb.setProgress(0);
-                try {
-                    AllocateResponse allocateResponse = amrmc.getAMRMProtocol().allocate(hb);
-                    List<Container> allocatedContainers = allocateResponse.getAMResponse().getAllocatedContainers();
-                    List<ContainerStatus> completedContainers = allocateResponse.getAMResponse()
-                            .getCompletedContainersStatuses();
-                    processAllocation(allocatedContainers, completedContainers);
-                } catch (YarnRemoteException e) {
-                    e.printStackTrace();
-                }
-            }
-        }, 0, heartbeatInterval);
-    }
-
-    private synchronized void populateAllocateRequest(AllocateRequest hb) {
-        hb.addAllAsks(asks);
-        hb.addAllReleases(new ArrayList<ContainerId>());
-        hb.setResponseId(lastResponseId.incrementAndGet());
-        hb.setApplicationAttemptId(appAttemptId);
-    }
-
-    private synchronized void processAllocation(List<Container> allocatedContainers,
-            List<ContainerStatus> completedContainers) {
-        System.err.println(allocatedContainers);
-        for (Container c : allocatedContainers) {
-            System.err.println("Got container: " + c.getContainerStatus());
-            NodeId nodeId = c.getNodeId();
-            Resource resource = c.getResource();
-
-            Set<AskRecord> arSet = resource2AskMap.get(resource);
-            boolean found = false;
-            if (arSet != null) {
-                AskRecord wildcardMatch = null;
-                AskRecord nameMatch = null;
-                for (AskRecord ar : arSet) {
-                    ResourceRequest req = ar.req;
-                    if (ar.allocation == null) {
-                        if ("*".equals(req.getHostName()) && wildcardMatch == null) {
-                            wildcardMatch = ar;
-                        }
-                        if (req.getHostName().equals(nodeId.getHost()) && nameMatch == null) {
-                            nameMatch = ar;
-                            break;
-                        }
-                    }
-                }
-                if (nameMatch != null) {
-                    found = true;
-                    nameMatch.allocation = c;
-                } else if (wildcardMatch != null) {
-                    found = true;
-                    wildcardMatch.allocation = c;
-                }
-            }
-            if (!found) {
-                System.err.println("Unknown request satisfied: " + resource);
-            }
-        }
-    }
-
-    private void performRegistration() throws YarnRemoteException {
-        RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
-        appMasterRequest.setApplicationAttemptId(amrmc.getApplicationAttemptId());
-
-        registration = amrmc.getAMRMProtocol().registerApplicationMaster(appMasterRequest);
-    }
-
-    public static void main(String[] args) throws Exception {
-        Options options = new Options();
-        CmdLineParser parser = new CmdLineParser(options);
-        try {
-            parser.parseArgument(args);
-        } catch (Exception e) {
-            parser.printUsage(System.err);
-            return;
-        }
-        new HyracksYarnApplicationMaster(options).run();
-    }
-
-    private static class Options {
-    }
-
-    private static class AskRecord {
-        ResourceRequest req;
-        AbstractProcess proc;
-        Container allocation;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/AbstractProcess.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/AbstractProcess.java
deleted file mode 100644
index 5c877ad..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/AbstractProcess.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.am.manifest;
-
-public abstract class AbstractProcess {
-    protected ContainerSpecification cSpec;
-
-    protected String cmdLineArgs;
-
-    public void setContainerSpecification(ContainerSpecification cSpec) {
-        this.cSpec = cSpec;
-    }
-
-    public ContainerSpecification getContainerSpecification() {
-        return cSpec;
-    }
-
-    public void setCommandLineArguments(String cmdLineArgs) {
-        this.cmdLineArgs = cmdLineArgs;
-    }
-
-    public String getCommandLineArguments() {
-        return cmdLineArgs;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ClusterController.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ClusterController.java
deleted file mode 100644
index 666b05e..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ClusterController.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.am.manifest;
-
-public class ClusterController extends AbstractProcess {
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ContainerSpecification.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ContainerSpecification.java
deleted file mode 100644
index 919a8bf..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ContainerSpecification.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.am.manifest;
-
-public class ContainerSpecification {
-    private String hostname;
-
-    private int memory;
-
-    public ContainerSpecification() {
-        hostname = "*";
-    }
-
-    public void setHostname(String hostname) {
-        this.hostname = hostname;
-    }
-
-    public String getHostname() {
-        return hostname;
-    }
-
-    public void setMemory(int memory) {
-        this.memory = memory;
-    }
-
-    public int getMemory() {
-        return memory;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/HyracksCluster.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/HyracksCluster.java
deleted file mode 100644
index 5536e0f..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/HyracksCluster.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.am.manifest;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class HyracksCluster {
-    private String name;
-
-    private ClusterController cc;
-
-    private List<NodeController> ncs;
-
-    public HyracksCluster() {
-        ncs = new ArrayList<NodeController>();
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setClusterController(ClusterController cc) {
-        this.cc = cc;
-    }
-
-    public ClusterController getClusterController() {
-        return cc;
-    }
-
-    public void addNodeController(NodeController nc) {
-        ncs.add(nc);
-    }
-
-    public List<NodeController> getNodeControllers() {
-        return ncs;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ManifestParser.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ManifestParser.java
deleted file mode 100644
index fd85353..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ManifestParser.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.am.manifest;
-
-import java.io.StringReader;
-
-import org.apache.commons.digester.Digester;
-
-public class ManifestParser {
-    public static HyracksCluster parse(String mXML) throws Exception {
-        Digester d = createDigester();
-        return (HyracksCluster) d.parse(new StringReader(mXML));
-    }
-
-    private static Digester createDigester() {
-        Digester d = new Digester();
-        d.setValidating(false);
-
-        d.addObjectCreate("hyracks-cluster", HyracksCluster.class);
-        d.addSetProperties("hyracks-cluster");
-
-        d.addObjectCreate("hyracks-cluster/cluster-controller", ClusterController.class);
-        d.addSetProperties("hyracks-cluster/cluster-controller");
-        d.addSetNext("hyracks-cluster/cluster-controller", "setClusterController");
-
-        d.addObjectCreate("hyracks-cluster/node-controllers/node-controller", NodeController.class);
-        d.addSetProperties("hyracks-cluster/node-controllers/node-controller");
-        d.addSetNext("hyracks-cluster/node-controllers/node-controller", "addNodeController");
-
-        d.addObjectCreate("*/container-specification", ContainerSpecification.class);
-        d.addSetProperties("*/container-specification");
-        d.addSetNext("*/container-specification", "setContainerSpecification");
-        return d;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/NodeController.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/NodeController.java
deleted file mode 100644
index 2356bb0..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/NodeController.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.am.manifest;
-
-public class NodeController extends AbstractProcess {
-    private String id;
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public String getId() {
-        return id;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/pom.xml b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/pom.xml
deleted file mode 100644
index 6ebadc9..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/pom.xml
+++ /dev/null
@@ -1,99 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements.  See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership.  The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License.  You may obtain a copy of the License at
- !
- !   http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied.  See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>hyracks-yarn-client</artifactId>
-  <parent>
-    <groupId>org.apache.hyracks</groupId>
-    <artifactId>hyracks-yarn</artifactId>
-    <version>0.2.1-SNAPSHOT</version>
-  </parent>
-
-  <properties>
-    <root.dir>${basedir}/../../..</root.dir>
-  </properties>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>appassembler-maven-plugin</artifactId>
-        <executions>
-          <execution>
-            <configuration>
-              <programs>
-                <program>
-                  <mainClass>org.apache.hyracks.yarn.client.LaunchHyracksApplication</mainClass>
-                  <name>launch-hyracks-application</name>
-                </program>
-                <program>
-                  <mainClass>org.apache.hyracks.yarn.client.KillHyracksApplication</mainClass>
-                  <name>kill-hyracks-application</name>
-                </program>
-              </programs>
-              <repositoryLayout>flat</repositoryLayout>
-              <repositoryName>lib</repositoryName>
-            </configuration>
-            <phase>package</phase>
-            <goals>
-              <goal>assemble</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <artifactId>maven-assembly-plugin</artifactId>
-        <version>2.2-beta-5</version>
-        <executions>
-          <execution>
-            <configuration>
-              <descriptors>
-                <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
-              </descriptors>
-            </configuration>
-            <phase>package</phase>
-            <goals>
-              <goal>attached</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-  <dependencies>
-    <dependency>
-      <groupId>args4j</groupId>
-      <artifactId>args4j</artifactId>
-      <version>2.0.16</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-yarn-common</artifactId>
-      <version>0.2.1-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-yarn-am</artifactId>
-      <version>0.2.1-SNAPSHOT</version>
-      <type>zip</type>
-      <classifier>binary-assembly</classifier>
-    </dependency>
-  </dependencies>
-</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/assembly/binary-assembly.xml b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/assembly/binary-assembly.xml
deleted file mode 100644
index 04567a3..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/assembly/binary-assembly.xml
+++ /dev/null
@@ -1,49 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements.  See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership.  The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License.  You may obtain a copy of the License at
- !
- !   http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied.  See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<assembly>
-  <id>binary-assembly</id>
-  <formats>
-    <format>zip</format>
-    <format>dir</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-  <fileSets>
-    <fileSet>
-      <directory>target/appassembler/bin</directory>
-      <outputDirectory>bin</outputDirectory>
-      <fileMode>0755</fileMode>
-    </fileSet>
-    <fileSet>
-      <directory>target/appassembler/lib</directory>
-      <outputDirectory>lib</outputDirectory>
-    </fileSet>
-  </fileSets>
-  <dependencySets>
-    <dependencySet>
-      <outputDirectory>hyracks-yarn-am</outputDirectory>
-      <includes>
-        <include>hyracks-yarn-am*</include>
-      </includes>
-      <unpack>false</unpack>
-      <outputFileNameMapping>${artifact.artifactId}.${artifact.extension}</outputFileNameMapping>
-      <useTransitiveDependencies>false</useTransitiveDependencies>
-    </dependencySet>
-  </dependencySets>
-</assembly>
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/org/apache/hyracks/yarn/client/KillHyracksApplication.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/org/apache/hyracks/yarn/client/KillHyracksApplication.java
deleted file mode 100644
index 8f3b68d..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/org/apache/hyracks/yarn/client/KillHyracksApplication.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.client;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
-import org.apache.hyracks.yarn.common.protocols.clientrm.YarnClientRMConnection;
-
-public class KillHyracksApplication {
-    private final Options options;
-
-    private KillHyracksApplication(Options options) {
-        this.options = options;
-    }
-
-    private void run() throws Exception {
-        Configuration conf = new Configuration();
-        YarnConfiguration yconf = new YarnConfiguration(conf);
-        YarnClientRMConnection crmc = new YarnClientRMConnection(yconf);
-        crmc.killApplication(options.appId);
-    }
-
-    public static void main(String[] args) throws Exception {
-        Options options = new Options();
-        CmdLineParser parser = new CmdLineParser(options);
-        try {
-            parser.parseArgument(args);
-        } catch (Exception e) {
-            parser.printUsage(System.err);
-            return;
-        }
-        new KillHyracksApplication(options).run();
-    }
-
-    private static class Options {
-        @Option(name = "-application-id", required = true, usage = "Application Id")
-        String appId;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/org/apache/hyracks/yarn/client/LaunchHyracksApplication.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/org/apache/hyracks/yarn/client/LaunchHyracksApplication.java
deleted file mode 100644
index e5d727f..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/org/apache/hyracks/yarn/client/LaunchHyracksApplication.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.client;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
-import org.apache.hyracks.yarn.common.protocols.clientrm.YarnApplication;
-import org.apache.hyracks.yarn.common.protocols.clientrm.YarnClientRMConnection;
-import org.apache.hyracks.yarn.common.resources.LocalResourceHelper;
-import org.apache.hyracks.yarn.common.resources.ResourceHelper;
-
-public class LaunchHyracksApplication {
-    private final Options options;
-
-    private LaunchHyracksApplication(Options options) {
-        this.options = options;
-    }
-
-    private void run() throws Exception {
-        Configuration conf = new Configuration();
-        YarnConfiguration yconf = new YarnConfiguration(conf);
-        YarnClientRMConnection crmc = new YarnClientRMConnection(yconf);
-
-        YarnApplication app = crmc.createApplication(options.appName);
-
-        ContainerLaunchContext clCtx = app.getContainerLaunchContext();
-
-        Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-
-        File amZipFile = new File(System.getProperty("basedir") + "/hyracks-yarn-am/hyracks-yarn-am.zip");
-        localResources.put("archive", LocalResourceHelper.createArchiveResource(conf, amZipFile));
-        clCtx.setLocalResources(localResources);
-
-        String command = "./archive/bin/hyracks-yarn-am 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
-                + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";
-
-        List<String> commands = new ArrayList<String>();
-        commands.add(command);
-        clCtx.setCommands(commands);
-
-        clCtx.setResource(ResourceHelper.createMemoryCapability(options.amMemory));
-
-        app.submit();
-    }
-
-    public static void main(String[] args) throws Exception {
-        Options options = new Options();
-        CmdLineParser parser = new CmdLineParser(options);
-        try {
-            parser.parseArgument(args);
-        } catch (Exception e) {
-            parser.printUsage(System.err);
-            return;
-        }
-        new LaunchHyracksApplication(options).run();
-    }
-
-    private static class Options {
-        @Option(name = "-application-name", required = true, usage = "Application Name")
-        String appName;
-
-        @Option(name = "-am-host", required = false, usage = "Application master host name (default: *). Currently has NO effect")
-        String amHostName = "*";
-
-        @Option(name = "-am-memory", required = false, usage = "Application Master memory requirements")
-        int amMemory = 128;
-
-        @Option(name = "-workers", required = true, usage = "Number of worker containers")
-        int nWorkers;
-
-        @Option(name = "-worker-memory", required = true, usage = "Amount of memory to provide to each worker")
-        int workerMemory;
-
-        @Option(name = "-extra-jars", required = false, usage = "Other jars that need to be added to the classpath")
-        String extraJars = "";
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/pom.xml
deleted file mode 100644
index 43b4cc0..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/pom.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements.  See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership.  The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License.  You may obtain a copy of the License at
- !
- !   http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied.  See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>hyracks-yarn-common</artifactId>
-  <parent>
-    <groupId>org.apache.hyracks</groupId>
-    <artifactId>hyracks-yarn</artifactId>
-    <version>0.2.1-SNAPSHOT</version>
-  </parent>
-
-  <properties>
-    <root.dir>${basedir}/../../..</root.dir>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-api</artifactId>
-      <version>2.0.0-alpha</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-common</artifactId>
-      <version>2.0.0-alpha</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <version>2.0.0-alpha</version>
-    </dependency>
-  </dependencies>
-</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/amrm/AMRMConnection.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/amrm/AMRMConnection.java
deleted file mode 100644
index 931f39d..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/amrm/AMRMConnection.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.common.protocols.amrm;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-
-public class AMRMConnection {
-    private final YarnConfiguration config;
-
-    private final ApplicationAttemptId appAttemptId;
-
-    private final AMRMProtocol amrmp;
-
-    public AMRMConnection(YarnConfiguration config) {
-        this.config = config;
-        Map<String, String> envs = System.getenv();
-        String containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
-        if (containerIdString == null) {
-            throw new IllegalArgumentException("ContainerId not set in the environment");
-        }
-        ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
-        appAttemptId = containerId.getApplicationAttemptId();
-        InetSocketAddress rmAddress = NetUtils.createSocketAddr(config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
-                YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
-        YarnRPC rpc = YarnRPC.create(config);
-
-        amrmp = (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, config);
-    }
-
-    public ApplicationAttemptId getApplicationAttemptId() {
-        return appAttemptId;
-    }
-
-    public AMRMProtocol getAMRMProtocol() {
-        return amrmp;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/clientrm/YarnApplication.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/clientrm/YarnApplication.java
deleted file mode 100644
index 8ea8f21..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/clientrm/YarnApplication.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.common.protocols.clientrm;
-
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.util.Records;
-
-public class YarnApplication {
-    private final YarnClientRMConnection crmc;
-
-    private ApplicationSubmissionContext appCtx;
-
-    private ContainerLaunchContext clCtx;
-
-    YarnApplication(YarnClientRMConnection crmc, String appName) throws YarnRemoteException {
-        this.crmc = crmc;
-        appCtx = Records.newRecord(ApplicationSubmissionContext.class);
-        appCtx.setApplicationId(getNewApplicationId(crmc));
-        appCtx.setApplicationName(appName);
-        clCtx = Records.newRecord(ContainerLaunchContext.class);
-    }
-
-    public ContainerLaunchContext getContainerLaunchContext() {
-        return clCtx;
-    }
-
-    public void submit() throws YarnRemoteException {
-        appCtx.setAMContainerSpec(clCtx);
-        SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class);
-        appRequest.setApplicationSubmissionContext(appCtx);
-        crmc.getClientRMProtocol().submitApplication(appRequest);
-    }
-
-    private static ApplicationId getNewApplicationId(YarnClientRMConnection crmc) throws YarnRemoteException {
-        GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
-        GetNewApplicationResponse response = crmc.getClientRMProtocol().getNewApplication(request);
-
-        return response.getApplicationId();
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/clientrm/YarnClientRMConnection.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/clientrm/YarnClientRMConnection.java
deleted file mode 100644
index 8ed9a98..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/clientrm/YarnClientRMConnection.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.common.protocols.clientrm;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityInfo;
-import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
-import org.apache.hadoop.yarn.util.Records;
-
-public class YarnClientRMConnection {
-    private final YarnConfiguration config;
-
-    private final ClientRMProtocol crmp;
-
-    public YarnClientRMConnection(YarnConfiguration config) {
-        this.config = config;
-        InetSocketAddress remoteAddress = NetUtils.createSocketAddr(config.get(YarnConfiguration.RM_ADDRESS,
-                YarnConfiguration.DEFAULT_RM_ADDRESS));
-        Configuration appsManagerServerConf = new Configuration(config);
-        appsManagerServerConf.setClass(YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CLIENT_RESOURCEMANAGER,
-                ClientRMSecurityInfo.class, SecurityInfo.class);
-        YarnRPC rpc = YarnRPC.create(appsManagerServerConf);
-        crmp = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, remoteAddress, appsManagerServerConf));
-    }
-
-    public YarnApplication createApplication(String appName) throws YarnRemoteException {
-        return new YarnApplication(this, appName);
-    }
-
-    public ClientRMProtocol getClientRMProtocol() {
-        return crmp;
-    }
-
-    public void killApplication(String appId) throws Exception {
-        KillApplicationRequest killRequest = Records.newRecord(KillApplicationRequest.class);
-        ApplicationId aid = Records.newRecord(ApplicationId.class);
-        long ts = Long.parseLong(appId.substring(appId.indexOf('_') + 1, appId.lastIndexOf('_')));
-        aid.setClusterTimestamp(ts);
-        int id = Integer.parseInt(appId.substring(appId.lastIndexOf('_') + 1));
-        aid.setId(id);
-        killRequest.setApplicationId(aid);
-        crmp.forceKillApplication(killRequest);
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/resources/LocalResourceHelper.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/resources/LocalResourceHelper.java
deleted file mode 100644
index c185ef2..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/resources/LocalResourceHelper.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.common.resources;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-
-public class LocalResourceHelper {
-    private static LocalResource createLocalResourceFromPath(Configuration config, File path) throws IOException {
-        LocalResource lr = Records.newRecord(LocalResource.class);
-        URL url = ConverterUtils.getYarnUrlFromPath(FileContext.getFileContext().makeQualified(new Path(path.toURI())));
-        lr.setResource(url);
-        lr.setVisibility(LocalResourceVisibility.APPLICATION);
-        lr.setTimestamp(path.lastModified());
-        lr.setSize(path.length());
-        return lr;
-    }
-
-    public static LocalResource createFileResource(Configuration config, File path) throws IOException {
-        LocalResource lr = createLocalResourceFromPath(config, path);
-        lr.setType(LocalResourceType.FILE);
-        return lr;
-    }
-
-    public static LocalResource createArchiveResource(Configuration config, File path) throws IOException {
-        LocalResource lr = createLocalResourceFromPath(config, path);
-        lr.setType(LocalResourceType.ARCHIVE);
-        return lr;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/resources/ResourceHelper.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/resources/ResourceHelper.java
deleted file mode 100644
index a878dc2..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/resources/ResourceHelper.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.common.resources;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.Records;
-
-public class ResourceHelper {
-    public static Resource createMemoryCapability(int memory) {
-        Resource capability = Records.newRecord(Resource.class);
-        capability.setMemory(memory);
-        return capability;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/pom.xml b/hyracks-fullstack/hyracks/hyracks-yarn/pom.xml
deleted file mode 100644
index a86befb..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/pom.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements.  See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership.  The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License.  You may obtain a copy of the License at
- !
- !   http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied.  See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>hyracks-yarn</artifactId>
-  <packaging>pom</packaging>
-  <name>hyracks-yarn</name>
-
-  <parent>
-    <groupId>org.apache.hyracks</groupId>
-    <artifactId>hyracks</artifactId>
-    <version>0.2.1-SNAPSHOT</version>
-  </parent>
-
-  <licenses>
-    <license>
-      <name>Apache License, Version 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-      <distribution>repo</distribution>
-      <comments>A business-friendly OSS license</comments>
-    </license>
-  </licenses>
-
-  <properties>
-    <root.dir>${basedir}/../..</root.dir>
-  </properties>
-
-  <modules>
-    <module>hyracks-yarn-common</module>
-    <module>hyracks-yarn-client</module>
-    <module>hyracks-yarn-am</module>
-  </modules>
-</project>