Refactored code in comaptibility layer to support submission of jobs against existing applications + made minor changes in hadoop operators

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_hadoop_compat_changes@460 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index c3167cb..a1c14c5 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -71,16 +71,8 @@
             Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
             jobConf = getJobConf();
             populateCache(jobConf);
-            try {
-                mapper = createMapper();
-            } catch (Exception e) {
-                throw new HyracksDataException(e);
-            }
             conf = new JobConf(jobConf);
             conf.setClassLoader(jobConf.getClassLoader());
-            if (!jobConf.getUseNewMapper()) {
-                ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
-            }
             reporter = createReporter();
         }
 
@@ -143,6 +135,20 @@
             }
             this.writer = writer;
         }
+     
+        protected void initializeMapper() throws HyracksDataException {
+            super.initializeMapper();
+            try {
+                mapper = createMapper(conf);
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+            if (!conf.getUseNewMapper()) {
+                ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
+            }
+        } 
+
+
     }
 
     private class ReaderMapperOperator extends MapperBaseOperator {
@@ -167,10 +173,10 @@
                 } else if (splitRead instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
                     conf.set("map.input.file", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getPath()
                             .toString());
-                    conf.setLong("map.input.start",
-                            ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getStart());
-                    conf.setLong("map.input.length",
-                            ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getLength());
+                    conf.setLong("map.input.start", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
+                            .getStart());
+                    conf.setLong("map.input.length", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
+                            .getLength());
                 }
             } catch (Exception e) {
                 e.printStackTrace();
@@ -180,10 +186,22 @@
             }
         }
 
+        protected void initializeMapper() throws HyracksDataException {
+            super.initializeMapper();
+            updateConfWithSplit(conf);
+            try {
+                mapper = createMapper(conf);
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+            if (!conf.getUseNewMapper()) {
+                ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
+            }
+        }
+
         public void mapInput() throws HyracksDataException, InterruptedException, ClassNotFoundException {
             try {
                 initializeMapper();
-                updateConfWithSplit(conf);
                 conf.setClassLoader(this.getClass().getClassLoader());
                 Object reader;
                 Object key = null;
@@ -209,7 +227,7 @@
                     };;;
 
                     OutputCommitter outputCommitter = new org.apache.hadoop.mapreduce.lib.output.NullOutputFormat()
-                            .getOutputCommitter(new TaskAttemptContext(jobConf, new TaskAttemptID()));
+                            .getOutputCommitter(new TaskAttemptContext(conf, new TaskAttemptID()));
                     StatusReporter statusReporter = new StatusReporter() {
                         @Override
                         public void setStatus(String arg0) {
@@ -229,7 +247,7 @@
                             return null;
                         }
                     };;;
-                    context = new org.apache.hadoop.mapreduce.Mapper().new Context(jobConf, new TaskAttemptID(),
+                    context = new org.apache.hadoop.mapreduce.Mapper().new Context(conf, new TaskAttemptID(),
                             newReader, recordWriter, outputCommitter, statusReporter,
                             (org.apache.hadoop.mapreduce.InputSplit) inputSplit);
                     newReader.initialize((org.apache.hadoop.mapreduce.InputSplit) inputSplit, context);
@@ -298,9 +316,9 @@
         String mapOutputValueClassName = conf.getMapOutputValueClass().getName();
         try {
             if (hadoopClassFactory == null) {
-                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                        (Class<? extends Writable>) Class.forName(mapOutputKeyClassName),
-                        (Class<? extends Writable>) Class.forName(mapOutputValueClassName));
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
+                        .forName(mapOutputKeyClassName), (Class<? extends Writable>) Class
+                        .forName(mapOutputValueClassName));
             } else {
                 recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
                         (Class<? extends Writable>) hadoopClassFactory.loadClass(mapOutputKeyClassName),
@@ -312,21 +330,21 @@
         return recordDescriptor;
     }
 
-    private Object createMapper() throws Exception {
+    private Object createMapper(JobConf conf) throws Exception {
         Object mapper;
         if (mapperClass != null) {
-            return ReflectionUtils.newInstance(mapperClass, jobConf);
+            return ReflectionUtils.newInstance(mapperClass, conf);
         } else {
             String mapperClassName = null;
             if (jobConf.getUseNewMapper()) {
-                JobContext jobContext = new JobContext(jobConf, null);
+                JobContext jobContext = new JobContext(conf, null);
                 mapperClass = jobContext.getMapperClass();
                 mapperClassName = mapperClass.getName();
             } else {
-                mapperClass = super.getJobConf().getMapperClass();
+                mapperClass = conf.getMapperClass();
                 mapperClassName = mapperClass.getName();
             }
-            mapper = getHadoopClassFactory().createMapper(mapperClassName, jobConf);
+            mapper = getHadoopClassFactory().createMapper(mapperClassName,conf);
         }
         return mapper;
     }
@@ -344,8 +362,8 @@
         } else {
             Class inputFormatClass = conf.getInputFormat().getClass();
             InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
-            return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf,
-                    super.createReporter());
+            return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf, super
+                    .createReporter());
         }
     }
 
@@ -383,8 +401,8 @@
                 }
                 return createSelfReadingMapper(ctx, env, recordDescriptor, partition);
             } else {
-                return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition),
-                        recordDescProvider.getInputRecordDescriptor(this.odId, 0));
+                return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), recordDescProvider
+                        .getInputRecordDescriptor(this.odId, 0));
             }
         } catch (Exception e) {
             throw new HyracksDataException(e);
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index 2d44c79..195b7c4 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -23,13 +23,14 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -59,66 +60,64 @@
         private Object reducer;
         private DataWritingOutputCollector<K3, V3> output;
         private Reporter reporter;
-        private ReducerContext reducerContext;
+        private ReducerContext reducerContext; 
         RawKeyValueIterator rawKeyValueIterator = new RawKeyValueIterator() {
-
+            
             @Override
             public boolean next() throws IOException {
                 return false;
             }
-
+            
             @Override
             public DataInputBuffer getValue() throws IOException {
                 return null;
             }
-
+            
             @Override
             public Progress getProgress() {
                 return null;
             }
-
+            
             @Override
             public DataInputBuffer getKey() throws IOException {
                 return null;
             }
-
+            
             @Override
             public void close() throws IOException {
-
+                
             }
         };
-
+        
+    
         class ReducerContext extends org.apache.hadoop.mapreduce.Reducer.Context {
             private HadoopReducerOperatorDescriptor.ValueIterator iterator;
-
+            
             @SuppressWarnings("unchecked")
-            ReducerContext(org.apache.hadoop.mapreduce.Reducer reducer, JobConf conf) throws IOException,
-                    InterruptedException, ClassNotFoundException {
-
-                reducer.super(conf, new TaskAttemptID(), rawKeyValueIterator, null, null, null, null, null, null, Class
-                        .forName("org.apache.hadoop.io.NullWritable"), Class
-                        .forName("org.apache.hadoop.io.NullWritable"));
+            ReducerContext(org.apache.hadoop.mapreduce.Reducer reducer, JobConf conf) throws IOException, InterruptedException, ClassNotFoundException{
+            
+                reducer.super(conf,new TaskAttemptID(),rawKeyValueIterator,null,null,null,null,null,null,Class.forName("org.apache.hadoop.io.NullWritable"),Class.forName("org.apache.hadoop.io.NullWritable"));
             }
-
-            public void setIterator(HadoopReducerOperatorDescriptor.ValueIterator iter) {
+            
+            public  void setIterator(HadoopReducerOperatorDescriptor.ValueIterator iter) {
                 iterator = iter;
             }
-
+            
             @Override
             public Iterable<V2> getValues() throws IOException, InterruptedException {
-                return new Iterable<V2>() {
-                    @Override
-                    public Iterator<V2> iterator() {
-                        return iterator;
-                    }
-                };
+              return new Iterable<V2>() {
+                @Override
+                public Iterator<V2> iterator() {
+                    return iterator;
+                }
+              };
             }
-
+            
             /** Start processing next unique key. */
             @Override
-            public boolean nextKey() throws IOException, InterruptedException {
+            public boolean nextKey() throws IOException,InterruptedException {
                 boolean hasMore = iterator.hasNext();
-                if (hasMore) {
+                if(hasMore){
                     nextKeyValue();
                 }
                 return hasMore;
@@ -134,25 +133,26 @@
             }
 
             public Object getCurrentKey() {
-                return iterator.getKey();
+              return iterator.getKey();
             }
 
             @Override
             public Object getCurrentValue() {
-                return iterator.getValue();
+              return iterator.getValue();
             }
-
+            
             /**
              * Generate an output key/value pair.
              */
             @Override
-            public void write(Object key, Object value) throws IOException, InterruptedException {
-                output.collect(key, value);
+            public void write(Object key, Object value
+                              ) throws IOException, InterruptedException {
+              output.collect(key, value);
             }
 
         }
-
-        public ReducerAggregator(Object reducer) throws HyracksDataException {
+            
+        public ReducerAggregator(Object reducer) throws HyracksDataException{
             this.reducer = reducer;
             initializeReducer();
             output = new DataWritingOutputCollector<K3, V3>();
@@ -201,17 +201,17 @@
             i.reset(reader);
             output.setWriter(writer);
             try {
-                if (jobConf.getUseNewReducer()) {
+                if(jobConf.getUseNewReducer()){
                     try {
                         reducerContext.setIterator(i);
-                        ((org.apache.hadoop.mapreduce.Reducer) reducer).run(reducerContext);
+                        ((org.apache.hadoop.mapreduce.Reducer)reducer).run(reducerContext);
                     } catch (InterruptedException e) {
                         e.printStackTrace();
                         throw new HyracksDataException(e);
                     }
-                } else {
-                    ((org.apache.hadoop.mapred.Reducer) reducer).reduce(i.getKey(), i, output, reporter);
-                }
+                } else {  
+                    ((org.apache.hadoop.mapred.Reducer)reducer).reduce(i.getKey(), i, output, reporter);
+                }    
             } catch (IOException e) {
                 e.printStackTrace();
             }
@@ -221,28 +221,28 @@
         public void close() throws HyracksDataException {
             // -- - close - --
             try {
-                if (!jobConf.getUseNewMapper()) {
-                    ((org.apache.hadoop.mapred.Reducer) reducer).close();
-                }
+                if(!jobConf.getUseNewMapper()) {
+                    ((org.apache.hadoop.mapred.Reducer)reducer).close();
+                }    
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
         }
-
+        
         private void initializeReducer() throws HyracksDataException {
             jobConf.setClassLoader(this.getClass().getClassLoader());
-            if (!jobConf.getUseNewReducer()) {
-                ((org.apache.hadoop.mapred.Reducer) reducer).configure(getJobConf());
+            if(!jobConf.getUseNewReducer()) {
+                ((org.apache.hadoop.mapred.Reducer)reducer).configure(getJobConf());    
             } else {
                 try {
-                    reducerContext = new ReducerContext((org.apache.hadoop.mapreduce.Reducer) reducer, jobConf);
+                    reducerContext = new ReducerContext((org.apache.hadoop.mapreduce.Reducer)reducer,jobConf);
                 } catch (IOException e) {
                     e.printStackTrace();
                     throw new HyracksDataException(e);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                     throw new HyracksDataException(e);
-                } catch (RuntimeException e) {
+                } catch (RuntimeException e){
                     e.printStackTrace();
                 } catch (ClassNotFoundException e) {
                     e.printStackTrace();
@@ -259,7 +259,7 @@
         public K2 getKey() {
             return key;
         }
-
+        
         public V2 getValue() {
             return value;
         }
@@ -310,11 +310,13 @@
     private static final long serialVersionUID = 1L;
     private Class reducerClass;
     private IComparatorFactory comparatorFactory;
+    private boolean useAsCombiner = false;
 
     public HadoopReducerOperatorDescriptor(JobSpecification spec, JobConf conf, IComparatorFactory comparatorFactory,
-            IHadoopClassFactory classFactory) {
+            IHadoopClassFactory classFactory,boolean useAsCombiner) {
         super(spec, 1, getRecordDescriptor(conf, classFactory), conf, classFactory);
         this.comparatorFactory = comparatorFactory;
+        this.useAsCombiner = useAsCombiner;
     }
 
     private Object createReducer() throws Exception {
@@ -322,14 +324,22 @@
             return ReflectionUtils.newInstance(reducerClass, getJobConf());
         } else {
             Object reducer;
-            if (getJobConf().getUseNewReducer()) {
-                JobContext jobContext = new JobContext(getJobConf(), null);
-                reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?>>) jobContext
-                        .getReducerClass();
+            if(!useAsCombiner) {
+                if(getJobConf().getUseNewReducer()){
+                    JobContext jobContext = new JobContext(getJobConf(), null);
+                    reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?,?,?,?>> )jobContext.getReducerClass();
+                } else {
+                    reducerClass = (Class<? extends Reducer>) getJobConf().getReducerClass();
+                }
             } else {
-                reducerClass = (Class<? extends Reducer>) getJobConf().getReducerClass();
+                if(getJobConf().getUseNewReducer()){
+                    JobContext jobContext = new JobContext(getJobConf(), null);
+                    reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?,?,?,?>> )jobContext.getCombinerClass();
+                } else {
+                    reducerClass = (Class<? extends Reducer>) getJobConf().getCombinerClass();
+                }
             }
-            reducer = getHadoopClassFactory().createReducer(reducerClass.getName(), getJobConf());
+            reducer = getHadoopClassFactory().createReducer(reducerClass.getName(),getJobConf());
             return reducer;
         }
     }
@@ -367,24 +377,23 @@
     }
 
     public static RecordDescriptor getRecordDescriptor(JobConf conf, IHadoopClassFactory classFactory) {
-        String outputKeyClassName = null;
+        String outputKeyClassName =null; 
         String outputValueClassName = null;
-
-        if (conf.getUseNewMapper()) {
-            JobContext context = new JobContext(conf, null);
+        
+        if(conf.getUseNewMapper()) {
+            JobContext context = new JobContext(conf,null);
             outputKeyClassName = context.getOutputKeyClass().getName();
             outputValueClassName = context.getOutputValueClass().getName();
         } else {
             outputKeyClassName = conf.getOutputKeyClass().getName();
             outputValueClassName = conf.getOutputValueClass().getName();
         }
-
+        
         RecordDescriptor recordDescriptor = null;
         try {
             if (classFactory == null) {
-                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                        (Class<? extends Writable>) Class.forName(outputKeyClassName),
-                        (Class<? extends Writable>) Class.forName(outputValueClassName));
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
+                        .forName(outputKeyClassName), (Class<? extends Writable>) Class.forName(outputValueClassName));
             } else {
                 recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
                         (Class<? extends Writable>) classFactory.loadClass(outputKeyClassName),
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
index 4dcfa61..9f3d532 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
@@ -22,11 +22,11 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -43,14 +43,15 @@
 
 public class HadoopWriteOperatorDescriptor extends AbstractFileWriteOperatorDescriptor {
 
-    private class HadoopFileWriter implements IRecordWriter {
+    private  class HadoopFileWriter implements IRecordWriter {
 
         Object recordWriter;
         JobConf conf;
         Path finalOutputFile;
         Path tempOutputFile;
-
-        HadoopFileWriter(Object recordWriter, Path tempOutputFile, Path outputFile, JobConf conf) {
+      
+        
+        HadoopFileWriter(Object recordWriter,Path tempOutputFile,Path outputFile,JobConf conf) {
             this.recordWriter = recordWriter;
             this.conf = conf;
             this.finalOutputFile = outputFile;
@@ -59,23 +60,26 @@
 
         @Override
         public void write(Object[] record) throws Exception {
-            if (conf.getUseNewMapper()) {
-                ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).write(record[0], record[1]);
-            } else {
-                ((org.apache.hadoop.mapred.RecordWriter) recordWriter).write(record[0], record[1]);
+            if(recordWriter != null){
+                if (conf.getUseNewMapper()){
+                    ((org.apache.hadoop.mapreduce.RecordWriter)recordWriter).write(record[0], record[1]);
+                } else {
+                    ((org.apache.hadoop.mapred.RecordWriter)recordWriter).write(record[0], record[1]);
+                }
             }
         }
 
         @Override
         public void close() {
             try {
-                if (conf.getUseNewMapper()) {
-                    ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).close(new TaskAttemptContext(conf,
-                            new TaskAttemptID()));
-                } else {
-                    ((org.apache.hadoop.mapred.RecordWriter) recordWriter).close(null);
-                }
-                FileSystem.get(conf).rename(tempOutputFile, finalOutputFile);
+                if(recordWriter != null) {
+                    if (conf.getUseNewMapper()){
+                       ((org.apache.hadoop.mapreduce.RecordWriter)recordWriter).close(new TaskAttemptContext(conf, new TaskAttemptID()));
+                    } else {
+                        ((org.apache.hadoop.mapred.RecordWriter)recordWriter).close(null);
+                    }
+                    FileSystem.get(conf).rename( tempOutputFile, finalOutputFile);
+                } 
             } catch (Exception e) {
                 e.printStackTrace();
             }
@@ -120,83 +124,81 @@
         } catch (IOException ioe) {
             ioe.printStackTrace();
         }
-        Path path = new Path(fileSplit.getLocalFile().getFile().getPath());
         Path tempOutputFile = null;
         Path finalOutputFile = null;
         checkIfCanWriteToHDFS(new FileSplit[] { fileSplit });
-        Object recordWriter = null;
-        Object outputFormat = null;
-        String taskAttempId = new TaskAttemptID().toString();
-        conf.set("mapred.task.id", taskAttempId);
-        outputPath = new Path(conf.get("mapred.output.dir"));
-        outputTempPath = new Path(outputPath, "_temporary");
-        if (outputPath != null && !fileSystem.exists(outputPath)) {
-            fileSystem.mkdirs(outputTempPath);
+        Object recordWriter  = null;
+        if(! (conf.getOutputFormat() instanceof NullOutputFormat)) {
+            boolean isMap = conf.getNumReduceTasks() == 0;
+            TaskAttemptID taskAttempId = new TaskAttemptID("0",index,isMap,index,index);
+            conf.set("mapred.task.id",taskAttempId.toString());
+            outputPath = new Path(conf.get("mapred.output.dir"));
+            outputTempPath = new Path(outputPath,"_temporary");
+            String suffix =  new String("part-r-00000");
+            suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
+            suffix = suffix + index;
+            tempOutputFile = new Path(outputTempPath,new Path("_" + taskAttempId.toString()));
+            if (conf.getNumReduceTasks() == 0 ) {
+                suffix=suffix.replace("-r-", "-m-");
+            }
+            tempOutputFile = new Path(tempOutputFile,suffix);
+            finalOutputFile = new Path(outputPath,suffix);
+            if(conf.getUseNewMapper()){
+                org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat)ReflectionUtils.newInstance((new JobContext(conf,null)).getOutputFormatClass(),conf);
+                    recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, taskAttempId));
+            }else {
+               recordWriter = conf.getOutputFormat().getRecordWriter(fileSystem, conf,suffix, new Progressable() {
+               @Override
+               public void progress() {}
+               });
+            }
         }
-        String suffix = new String("part-r-00000");
-        suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
-        suffix = suffix + index;
-        tempOutputFile = new Path(outputTempPath, "_" + taskAttempId + "/" + suffix);
-        if (conf.getNumReduceTasks() == 0) {
-            suffix.replace("-r-", "-m-");
-        }
-        finalOutputFile = new Path(outputPath, suffix);
-        if (conf.getUseNewMapper()) {
-            org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat) ReflectionUtils
-                    .newInstance((new JobContext(conf, null)).getOutputFormatClass(), conf);
-            recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, new TaskAttemptID()));
-        } else {
-            recordWriter = conf.getOutputFormat().getRecordWriter(fileSystem, conf, suffix, new Progressable() {
-                @Override
-                public void progress() {
-                }
-            });
-        }
-
+        
         return new HadoopFileWriter(recordWriter, tempOutputFile, finalOutputFile, conf);
     }
+    
 
     Path outputPath;
     Path outputTempPath;
-
+   
     protected Reporter createReporter() {
-        return new Reporter() {
-            @Override
-            public Counter getCounter(Enum<?> name) {
-                return null;
-            }
+    return new Reporter() {
+        @Override
+        public Counter getCounter(Enum<?> name) {
+            return null;
+        }
 
-            @Override
-            public Counter getCounter(String group, String name) {
-                return null;
-            }
+        @Override
+        public Counter getCounter(String group, String name) {
+            return null;
+        }
 
-            @Override
-            public InputSplit getInputSplit() throws UnsupportedOperationException {
-                return null;
-            }
+        @Override
+        public InputSplit getInputSplit() throws UnsupportedOperationException {
+            return null;
+        }
 
-            @Override
-            public void incrCounter(Enum<?> key, long amount) {
+        @Override
+        public void incrCounter(Enum<?> key, long amount) {
 
-            }
+        }
 
-            @Override
-            public void incrCounter(String group, String counter, long amount) {
+        @Override
+        public void incrCounter(String group, String counter, long amount) {
 
-            }
+        }
 
-            @Override
-            public void progress() {
+        @Override
+        public void progress() {
 
-            }
+        }
 
-            @Override
-            public void setStatus(String status) {
+        @Override
+        public void setStatus(String status) {
 
-            }
-        };
-    }
+        }
+    };
+}
 
     private boolean checkIfCanWriteToHDFS(FileSplit[] fileSplits) throws Exception {
         JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
@@ -218,8 +220,8 @@
     private static FileSplit[] getOutputSplits(JobConf conf, int noOfMappers) throws ClassNotFoundException {
         int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
         Object outputFormat = null;
-        if (conf.getUseNewMapper()) {
-            outputFormat = ReflectionUtils.newInstance(new JobContext(conf, null).getOutputFormatClass(), conf);
+        if(conf.getUseNewMapper()) {
+            outputFormat = ReflectionUtils.newInstance(new JobContext(conf,null).getOutputFormatClass(), conf);
         } else {
             outputFormat = conf.getOutputFormat();
         }
@@ -234,13 +236,11 @@
 
             FileSplit[] outputFileSplits = new FileSplit[numOutputters];
             String absolutePath = FileOutputFormat.getOutputPath(conf).toString();
-            System.out.println("absolute path:" + absolutePath);
             for (int index = 0; index < numOutputters; index++) {
                 String suffix = new String("part-00000");
                 suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
                 suffix = suffix + index;
                 String outputPath = absolutePath + "/" + suffix;
-                System.out.println("output path :" + outputPath);
                 outputFileSplits[index] = new FileSplit("localhost", outputPath);
             }
             return outputFileSplits;
@@ -252,5 +252,14 @@
         super(jobSpec, getOutputSplits(jobConf, numMapTasks));
         this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
         checkIfCanWriteToHDFS(super.splits);
+        FileSystem fs = FileSystem.get(jobConf);
+        if (jobConf.get("mapred.output.dir") != null) {
+            Path output = new Path(jobConf.get("mapred.output.dir"));
+            Path outputTemp = new Path(output,"_temporary");
+            if(output != null && !fs.exists(outputTemp)) {
+                    fs.mkdirs(outputTemp);
+                }
+            }
+        }
     }
-}
+
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
index e4daf0b..a363221 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
@@ -1,93 +1,91 @@
 package edu.uci.ics.hyracks.hadoop.compat.client;
 
-import java.io.File;
-import java.util.List;
+import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.hadoop.mapred.JobConf;
-
-import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
-import edu.uci.ics.hyracks.hadoop.compat.util.HadoopAdapter;
-import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
-import edu.uci.ics.hyracks.hadoop.compat.client.HyracksRunningJob;
 import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
+import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
 
 public class HyracksClient {
 
-    private HadoopAdapter hadoopAdapter;
-    private static HyracksRMIConnection connection;
-    private static final String applicationName = "CompatibilityLayer";
+	private static HyracksRMIConnection connection;
+	private static final String jobProfilingKey = "jobProfilingKey";
+	Set<String> systemLibs;
 
-    public HyracksClient(String clusterConf) throws Exception {
-        Properties properties = Utilities.getProperties(clusterConf, '=');
-        String clusterController = (String) properties.get(ConfigurationConstants.clusterControllerHost);
-        String fileSystem = (String) properties.get(ConfigurationConstants.namenodeURL);
-        initialize(clusterController, fileSystem);
-    }
+	public HyracksClient(Properties clusterProperties) throws Exception {
+		initialize(clusterProperties);
+	}
 
-    public HyracksClient(String clusterControllerAddr, String fileSystem) throws Exception {
-        initialize(clusterControllerAddr, fileSystem);
-    }
+	private void initialize(Properties properties) throws Exception {
+		String clusterController = (String) properties
+				.get(ConfigurationConstants.clusterControllerHost);
+		connection = new HyracksRMIConnection(clusterController, 1099);
+		systemLibs = new HashSet<String>();
+		for (String systemLib : ConfigurationConstants.systemLibs) {
+			String systemLibPath = properties.getProperty(systemLib);
+			if (systemLibPath != null) {
+				systemLibs.add(systemLibPath);
+			}
+		}
+	}
 
-    private void initialize(String clusterControllerAddr, String namenodeUrl) throws Exception {
-        connection = new HyracksRMIConnection(clusterControllerAddr, 1099);
-        connection.destroyApplication(applicationName);
-        hadoopAdapter = new HadoopAdapter(namenodeUrl);
-    }
+	public HyracksClient(String clusterConf, char delimiter) throws Exception {
+		Properties properties = Utilities.getProperties(clusterConf, delimiter);
+		initialize(properties);
+	}
 
-    public HyracksRunningJob submitJobs(List<JobConf> confs, Set<String> requiredLibs) throws Exception {
-        JobSpecification spec = hadoopAdapter.getJobSpecification(confs);
-        String appName  = getApplicationNameHadoopJob(confs.get(0));
-        return submitJob(appName,spec, requiredLibs);
-    }
+	private Set<String> getRequiredLibs(Set<String> userLibs) {
+		Set<String> requiredLibs = new HashSet<String>();
+		for (String systemLib : systemLibs) {
+			requiredLibs.add(systemLib);
+		}
+		for (String userLib : userLibs) {
+			requiredLibs.add(userLib);
+		}
+		return requiredLibs;
+	}
 
-    private String getApplicationNameHadoopJob(JobConf jobConf) {
-        String jar = jobConf.getJar();
-        if( jar != null){
-            return jar.substring(jar.lastIndexOf("/") >=0 ? jar.lastIndexOf("/") +1 : 0);
-        }else {
-            return "" + System.currentTimeMillis();
-        }
-    }
-    
-    public HyracksRunningJob submitJob(JobConf conf, Set<String> requiredLibs) throws Exception {
-        JobSpecification spec = hadoopAdapter.getJobSpecification(conf);
-        String appName  = getApplicationNameHadoopJob(conf);
-        return submitJob(appName, spec, requiredLibs);
-    }
+	public JobStatus getJobStatus(UUID jobId) throws Exception {
+		return connection.getJobStatus(jobId);
+	}
 
-    public JobStatus getJobStatus(UUID jobId) throws Exception {
-        return connection.getJobStatus(jobId);
-    }
+	private void createApplication(String applicationName, Set<String> userLibs)
+			throws Exception {
+		connection.createApplication(applicationName, Utilities
+				.getHyracksArchive(applicationName, getRequiredLibs(userLibs)));
+	}
 
-    public HyracksRunningJob submitJob(String applicationName, JobSpecification spec, Set<String> requiredLibs) throws Exception {
-        UUID jobId = null;
-        try {
-            jobId = connection.createJob(applicationName, spec);
-        } catch (Exception e){
-            System.out.println(" application not found, creating application" + applicationName);
-            connection.createApplication(applicationName, Utilities.getHyracksArchive(applicationName, requiredLibs));
-            jobId = connection.createJob(applicationName, spec);
-        }
-        connection.start(jobId);
-        HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
-        return runningJob;
-    }
+	public HyracksRunningJob submitJob(String applicationName,
+			JobSpecification spec) throws Exception {
+		String jobProfilingVal = System.getenv(jobProfilingKey);
+		boolean doProfiling = ("true".equalsIgnoreCase(jobProfilingVal));
+		UUID jobId;
+		if (doProfiling) {
+			System.out.println("PROFILING");
+			jobId = connection.createJob(applicationName, spec, EnumSet
+					.of(JobFlag.PROFILE_RUNTIME));
+		} else {
+			jobId = connection.createJob(applicationName, spec);
+		}
+		connection.start(jobId);
+		HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
+		return runningJob;
+	}
 
-    public HadoopAdapter getHadoopAdapter() {
-        return hadoopAdapter;
-    }
+	public HyracksRunningJob submitJob(String applicationName,
+			JobSpecification spec, Set<String> userLibs) throws Exception {
+		createApplication(applicationName, userLibs);
+		return submitJob(applicationName, spec);
+	}
 
-    public void setHadoopAdapter(HadoopAdapter hadoopAdapter) {
-        this.hadoopAdapter = hadoopAdapter;
-    }
-
-    public void waitForCompleton(UUID jobId) throws Exception {
-        connection.waitForCompletion(jobId);
-    }
-
+	public void waitForCompleton(UUID jobId) throws Exception {
+		connection.waitForCompletion(jobId);
+	}
 }
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
index 0b96041..37f4d34 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
@@ -1,5 +1,6 @@
 package edu.uci.ics.hyracks.hadoop.compat.driver;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -25,175 +26,183 @@
 
 public class CompatibilityLayer {
 
-    HyracksClient hyracksClient;
-    DCacheHandler dCacheHander = null;
-    Properties clusterConf;
-    Set<String> systemLibs;
+	HyracksClient hyracksClient;
+	DCacheHandler dCacheHander = null;
+	Properties clusterConf;
+	HadoopAdapter hadoopAdapter;
 
-    private static char configurationFileDelimiter = '=';
-    private static final String dacheKeyPrefix = "dcache.key";
+	private static char configurationFileDelimiter = '=';
+	private static final String dacheKeyPrefix = "dcache.key";
 
-    public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception {
-        initialize(clConfig);
-    }
+	public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception {
+		initialize(clConfig);
+	}
 
-    public HyracksRunningJob submitJobs(String[] jobFiles, Set<String> userLibs) throws Exception {
-        Set<String> requiredLibs = getRequiredLibs(userLibs);
-        List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
-        Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFiles[0]);
-        String tempDir = "/tmp";
-        if (dcacheTasks.size() > 0) {
-            HadoopAdapter hadoopAdapter = hyracksClient.getHadoopAdapter();
-            for (String key : dcacheTasks.keySet()) {
-                String destPath = tempDir + "/" + key + System.currentTimeMillis();
-                hadoopAdapter.getHDFSClient().copyToLocalFile(new Path(dcacheTasks.get(key)), new Path(destPath));
-                System.out.println(" source :" + dcacheTasks.get(key));
-                System.out.println(" dest :" + destPath);
-                System.out.println(" key :" + key);
-                System.out.println(" value :" + destPath);
-                dCacheHander.put(key, destPath);
-            }
-        }
-        HyracksRunningJob hyraxRunningJob = hyracksClient.submitJobs(jobConfs, requiredLibs);
-        return hyraxRunningJob;
-    }
+	private void initialize(CompatibilityConfig clConfig) throws Exception {
+		clusterConf = Utilities.getProperties(clConfig.clusterConf,
+				configurationFileDelimiter);
+		hadoopAdapter = new HadoopAdapter(clusterConf
+				.getProperty(ConfigurationConstants.namenodeURL));
+		hyracksClient = new HyracksClient(clusterConf);
+		dCacheHander = new DCacheHandler(clusterConf
+				.getProperty(ConfigurationConstants.dcacheServerConfiguration));
+	}
 
-    private Set<String> getRequiredLibs(Set<String> userLibs) {
-        Set<String> requiredLibs = new HashSet<String>();
-        for (String systemLib : systemLibs) {
-            requiredLibs.add(systemLib);
-        }
-        for (String userLib : userLibs) {
-            requiredLibs.add(userLib);
-        }
-        return requiredLibs;
-    }
+	public HyracksRunningJob submitJob(JobConf conf,Set<String> userLibs) throws Exception {
+		List<JobConf> jobConfs = new ArrayList<JobConf>();
+		jobConfs.add(conf);
+		String applicationName = conf.getJobName() + System.currentTimeMillis();
+		JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+		HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
+				applicationName, spec, userLibs);
+		return hyracksRunningJob; 
+	}
+	
+	public HyracksRunningJob submitJobs(String applicationName,
+			String[] jobFiles, Set<String> userLibs) throws Exception {
+		List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
+		populateDCache(jobFiles[0]);
+		JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+		HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
+				applicationName, spec, userLibs);
+		return hyracksRunningJob;
+	}
 
-    private void initialize(CompatibilityConfig clConfig) throws Exception {
-        clusterConf = Utilities.getProperties(clConfig.clusterConf, configurationFileDelimiter);
-        systemLibs = new HashSet<String>();
-        for (String systemLib : ConfigurationConstants.systemLibs) {
-            String systemLibPath = clusterConf.getProperty(systemLib);
-            if (systemLibPath != null) {
-                systemLibs.add(systemLibPath);
-            }
-        }
-        String clusterControllerHost = clusterConf.getProperty(ConfigurationConstants.clusterControllerHost);
-        String dacheServerConfiguration = clusterConf.getProperty(ConfigurationConstants.dcacheServerConfiguration);
-        String fileSystem = clusterConf.getProperty(ConfigurationConstants.namenodeURL);
-        hyracksClient = new HyracksClient(clusterControllerHost, fileSystem);
-        try {
-            dCacheHander = new DCacheHandler(dacheServerConfiguration);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
+	public HyracksRunningJob submitJobs(String applicationName,
+			String[] jobFiles) throws Exception {
+		List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
+		populateDCache(jobFiles[0]);
+		JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+		HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
+				applicationName, spec);
+		return hyracksRunningJob;
+	}
 
-    private Map<String, String> initializeCustomProperties(Properties properties, String prefix) {
-        Map<String, String> foundProperties = new HashMap<String, String>();
-        Set<Entry<Object, Object>> entrySet = properties.entrySet();
-        for (Entry entry : entrySet) {
-            String key = (String) entry.getKey();
-            String value = (String) entry.getValue();
-            if ((key.startsWith(prefix))) {
-                String actualKey = key.substring(prefix.length() + 1); // "cut off '<prefix>.' from the beginning"
-                foundProperties.put(actualKey, value);
-            }
-        }
-        return foundProperties;
-    }
+	private void populateDCache(String jobFile) throws IOException {
+		Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFile);
+		String tempDir = "/tmp";
+		if (dcacheTasks.size() > 0) {
+			for (String key : dcacheTasks.keySet()) {
+				String destPath = tempDir + "/" + key
+						+ System.currentTimeMillis();
+				hadoopAdapter.getHDFSClient().copyToLocalFile(
+						new Path(dcacheTasks.get(key)), new Path(destPath));
+				System.out.println(" source :" + dcacheTasks.get(key));
+				System.out.println(" dest :" + destPath);
+				System.out.println(" key :" + key);
+				System.out.println(" value :" + destPath);
+				dCacheHander.put(key, destPath);
+			}
+		}
+	}
 
-    public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) {
-        Properties jobProperties = Utilities.getProperties(jobFile, ',');
-        Map<String, String> dcacheTasks = new HashMap<String, String>();
-        Map<String, String> dcacheKeys = initializeCustomProperties(jobProperties, dacheKeyPrefix);
-        for (String key : dcacheKeys.keySet()) {
-            String sourcePath = dcacheKeys.get(key);
-            if (sourcePath != null) {
-                dcacheTasks.put(key, sourcePath);
-            }
-        }
-        return dcacheTasks;
-    }
+	private String getApplicationNameForHadoopJob(JobConf jobConf) {
+		String jar = jobConf.getJar();
+		if (jar != null) {
+			return jar.substring(jar.lastIndexOf("/") >= 0 ? jar
+					.lastIndexOf("/") + 1 : 0);
+		} else {
+			return "" + System.currentTimeMillis();
+		}
+	}
 
-    public void waitForCompletion(UUID jobId) throws Exception {
-        hyracksClient.waitForCompleton(jobId);
-    }
+	private Map<String, String> initializeCustomProperties(
+			Properties properties, String prefix) {
+		Map<String, String> foundProperties = new HashMap<String, String>();
+		Set<Entry<Object, Object>> entrySet = properties.entrySet();
+		for (Entry entry : entrySet) {
+			String key = (String) entry.getKey();
+			String value = (String) entry.getValue();
+			if ((key.startsWith(prefix))) {
+				String actualKey = key.substring(prefix.length() + 1); // "cut off '<prefix>.' from the beginning"
+				foundProperties.put(actualKey, value);
+			}
+		}
+		return foundProperties;
+	}
 
-    public HyracksRunningJob submitHadoopJobToHyrax(JobConf jobConf, Set<String> userLibs) {
-        HyracksRunningJob hyraxRunningJob = null;
-        List<JobConf> jobConfs = new ArrayList<JobConf>();
-        jobConfs.add(jobConf);
-        try {
-            hyraxRunningJob = hyracksClient.submitJobs(jobConfs, getRequiredLibs(userLibs));
-            System.out.println(" Result in " + jobConf.get("mapred.output.dir"));
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        return hyraxRunningJob;
-    }
+	public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) {
+		Properties jobProperties = Utilities.getProperties(jobFile, ',');
+		Map<String, String> dcacheTasks = new HashMap<String, String>();
+		Map<String, String> dcacheKeys = initializeCustomProperties(
+				jobProperties, dacheKeyPrefix);
+		for (String key : dcacheKeys.keySet()) {
+			String sourcePath = dcacheKeys.get(key);
+			if (sourcePath != null) {
+				dcacheTasks.put(key, sourcePath);
+			}
+		}
+		return dcacheTasks;
+	}
 
-    public HyracksRunningJob submitJob(String appName, JobSpecification jobSpec, Set<String> userLibs) {
-        HyracksRunningJob hyraxRunningJob = null;
-        try {
-            hyraxRunningJob = hyracksClient.submitJob(appName, jobSpec, getRequiredLibs(userLibs));
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        return hyraxRunningJob;
-    }
+	public void waitForCompletion(UUID jobId) throws Exception {
+		hyracksClient.waitForCompleton(jobId);
+	}
 
-    private List<JobConf> constructHadoopJobConfs(String[] jobFiles) throws Exception {
-        List<JobConf> jobConfs = new ArrayList<JobConf>();
-        for (String jobFile : jobFiles) {
-            jobConfs.add(constructHadoopJobConf(jobFile));
-        }
-        return jobConfs;
-    }
+	private List<JobConf> constructHadoopJobConfs(String[] jobFiles)
+			throws Exception {
+		List<JobConf> jobConfs = new ArrayList<JobConf>();
+		for (String jobFile : jobFiles) {
+			jobConfs.add(constructHadoopJobConf(jobFile));
+		}
+		return jobConfs;
+	}
 
-    private JobConf constructHadoopJobConf(String jobFile) {
-        Properties jobProperties = Utilities.getProperties(jobFile, '=');
-        JobConf conf = hyracksClient.getHadoopAdapter().getConf();
-        for (Entry entry : jobProperties.entrySet()) {
-            conf.set((String) entry.getKey(), (String) entry.getValue());
-            System.out.println((String) entry.getKey() + " : " + (String) entry.getValue());
-        }
-        return conf;
-    }
+	private JobConf constructHadoopJobConf(String jobFile) {
+		Properties jobProperties = Utilities.getProperties(jobFile, '=');
+		JobConf conf = new JobConf(hadoopAdapter.getConf());
+		for (Entry entry : jobProperties.entrySet()) {
+			conf.set((String) entry.getKey(), (String) entry.getValue());
+			System.out.println((String) entry.getKey() + " : "
+					+ (String) entry.getValue());
+		}
+		return conf;
+	}
 
-    private String[] getJobs(CompatibilityConfig clConfig) {
-        return clConfig.jobFiles == null ? new String[0] : clConfig.jobFiles.split(",");
-    }
+	private String[] getJobs(CompatibilityConfig clConfig) {
+		return clConfig.jobFiles == null ? new String[0] : clConfig.jobFiles
+				.split(",");
+	}
 
-    public static void main(String args[]) throws Exception {
-        long startTime = System.nanoTime();
-        CompatibilityConfig clConfig = new CompatibilityConfig();
-        CmdLineParser cp = new CmdLineParser(clConfig);
-        try {
-            cp.parseArgument(args);
-        } catch (Exception e) {
-            System.err.println(e.getMessage());
-            cp.printUsage(System.err);
-            return;
-        }
-        CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
-        String[] jobFiles = compatLayer.getJobs(clConfig);
-        String[] tempUserLibs = clConfig.userLibs == null ? new String[0] : clConfig.userLibs.split(",");
-        Set<String> userLibs = new HashSet<String>();
-        for(String userLib : tempUserLibs) {
-            userLibs.add(userLib);
-        }
-        HyracksRunningJob hyraxRunningJob = null;
-        try {
-            hyraxRunningJob = compatLayer.submitJobs(jobFiles, userLibs);
-            compatLayer.waitForCompletion(hyraxRunningJob.getJobId());
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        hyraxRunningJob.waitForCompletion();
-        long end_time = System.nanoTime();
-        System.out.println("TOTAL TIME (from Launch to Completion):" + ((end_time - startTime) / (float) 1000000000.0)
-                + " seconds.");
-    }
-
+	public static void main(String args[]) throws Exception {
+		long startTime = System.nanoTime();
+		CompatibilityConfig clConfig = new CompatibilityConfig();
+		CmdLineParser cp = new CmdLineParser(clConfig);
+		try {
+			cp.parseArgument(args);
+		} catch (Exception e) {
+			System.err.println(e.getMessage());
+			cp.printUsage(System.err);
+			return;
+		}
+		CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
+		String applicationName = clConfig.applicationName;
+		String[] jobFiles = compatLayer.getJobs(clConfig);
+		String[] userLibraries = null;
+		if (clConfig.userLibs != null) {
+			userLibraries = clConfig.userLibs.split(",");
+		}
+		try {
+			HyracksRunningJob hyraxRunningJob = null;
+			if (userLibraries != null) {
+				Set<String> userLibs = new HashSet<String>();
+				for (String userLib : userLibraries) {
+					userLibs.add(userLib);
+				}
+				hyraxRunningJob = compatLayer.submitJobs(applicationName,
+						jobFiles, userLibs);
+			} else {
+				hyraxRunningJob = compatLayer.submitJobs(applicationName,
+						jobFiles);
+			}
+			compatLayer.waitForCompletion(hyraxRunningJob.getJobId());
+			long end_time = System.nanoTime();
+			System.out.println("TOTAL TIME (from Launch to Completion):"
+					+ ((end_time - startTime) / (float) 1000000000.0)
+					+ " seconds.");
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw e;
+		}
+	}
 }
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
index 1dd266f..6d94bc7 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
@@ -4,17 +4,20 @@
 
 public class CompatibilityConfig {
 
-    @Option(name = "-cluster", required = true, usage = "Defines the path to the configuration file that provides the following info: +"
-            + " (1) Address of HyracksClusterController service" + " (2) Address of Hadoop namenode service")
-    public String clusterConf;
+	@Option(name = "-cluster", required = true, usage = "Defines the path to the configuration file that provides the following info: +"
+			+ " (1) Address of HyracksClusterController service"
+			+ " (2) Address of Hadoop namenode service")
+	public String clusterConf;
 
-    @Option(name = "-jobFiles", usage = "Comma separated list of jobFiles. "
-            + "Each job file defines the hadoop job + " + "The order in the list defines the sequence in which"
-            + "the jobs are to be executed")
-    public String jobFiles;
+	@Option(name = "-jobFiles", usage = "Comma separated list of jobFiles. "
+			+ "Each job file defines the hadoop job + "
+			+ "The order in the list defines the sequence in which"
+			+ "the jobs are to be executed")
+	public String jobFiles;
 
-    @Option(name = "-userLibs", usage = " A comma separated list of jar files that are required to be addedd to classpath when running "
-            + " mappers/reducers etc ")
-    public String userLibs;
+	@Option(name = "-applicationName", usage = " The application as part of which the job executes")
+	public String applicationName;
 
+	@Option(name = "-userLibs", usage = " A comma separated list of jar files that are required to be addedd to classpath when running ")
+	public String userLibs;
 }
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
index d0df7f1..f2f7d03 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
@@ -47,310 +47,360 @@
 
 public class HadoopAdapter {
 
-    public static final String FS_DEFAULT_NAME = "fs.default.name";
-    private JobConf jobConf;
-    private Map<OperatorDescriptorId,Integer> operatorInstanceCount = new HashMap<OperatorDescriptorId,Integer>();
-    public static final String HYRACKS_EX_SORT_FRAME_LIMIT = "HYRACKS_EX_SORT_FRAME_LIMIT"; 
-    public static final int DEFAULT_EX_SORT_FRAME_LIMIT = 4096;
-    public static final int DEFAULT_MAX_MAPPERS = 40;
-    public static final int DEFAULT_MAX_REDUCERS= 40;
-    public static final String MAX_MAPPERS_KEY = "maxMappers";
-    public static final String MAX_REDUCERS_KEY = "maxReducers";
-    public static final String EX_SORT_FRAME_LIMIT_KEY = "sortFrameLimit";
-    
-    private  int maxMappers = DEFAULT_MAX_MAPPERS;
-    private  int maxReducers = DEFAULT_MAX_REDUCERS;
-    private int exSortFrame = DEFAULT_EX_SORT_FRAME_LIMIT;
-    
-    class NewHadoopConstants {
-    	public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.inputformat.class";
-    	public static final String MAP_CLASS_ATTR = "mapreduce.map.class";
-    	public static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
-    	public static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
-    	public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.outputformat.class";
-    	public static final String PARTITIONER_CLASS_ATTR = "mapreduce.partitioner.class";
-    }
-    
-    public HadoopAdapter(String namenodeUrl) {
-        jobConf = new JobConf(true);
-        jobConf.set(FS_DEFAULT_NAME, namenodeUrl);
-        if(System.getenv(MAX_MAPPERS_KEY) != null) {
-            maxMappers = Integer.parseInt(System.getenv(MAX_MAPPERS_KEY));
-        }
-        if(System.getenv(MAX_REDUCERS_KEY) != null) {
-            maxReducers= Integer.parseInt(System.getenv(MAX_REDUCERS_KEY));
-        }
-        if(System.getenv(EX_SORT_FRAME_LIMIT_KEY) != null) {
-            exSortFrame= Integer.parseInt(System.getenv(EX_SORT_FRAME_LIMIT_KEY));
-        }
-    }
+	public static final String FS_DEFAULT_NAME = "fs.default.name";
+	private JobConf jobConf;
+	private Map<OperatorDescriptorId, Integer> operatorInstanceCount = new HashMap<OperatorDescriptorId, Integer>();
+	public static final String HYRACKS_EX_SORT_FRAME_LIMIT = "HYRACKS_EX_SORT_FRAME_LIMIT";
+	public static final int DEFAULT_EX_SORT_FRAME_LIMIT = 4096;
+	public static final int DEFAULT_MAX_MAPPERS = 40;
+	public static final int DEFAULT_MAX_REDUCERS = 40;
+	public static final String MAX_MAPPERS_KEY = "maxMappers";
+	public static final String MAX_REDUCERS_KEY = "maxReducers";
+	public static final String EX_SORT_FRAME_LIMIT_KEY = "sortFrameLimit";
 
-    private String getEnvironmentVariable(String key, String def) {
-        String ret =  System.getenv(key);
-        return ret != null ? ret : def;
-    }
-    
-    public JobConf getConf() {
-        return jobConf;
-    }
+	private int maxMappers = DEFAULT_MAX_MAPPERS;
+	private int maxReducers = DEFAULT_MAX_REDUCERS;
+	private int exSortFrame = DEFAULT_EX_SORT_FRAME_LIMIT;
 
-    public static VersionedProtocol getProtocol(Class protocolClass, InetSocketAddress inetAddress, JobConf jobConf)
-            throws IOException {
-        VersionedProtocol versionedProtocol = RPC.getProxy(protocolClass, ClientProtocol.versionID, inetAddress,
-                jobConf);
-        return versionedProtocol;
-    }
+	class NewHadoopConstants {
+		public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.inputformat.class";
+		public static final String MAP_CLASS_ATTR = "mapreduce.map.class";
+		public static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
+		public static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
+		public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.outputformat.class";
+		public static final String PARTITIONER_CLASS_ATTR = "mapreduce.partitioner.class";
+	}
 
-    private static RecordDescriptor getHadoopRecordDescriptor(String className1, String className2) {
-        RecordDescriptor recordDescriptor = null;
-        try {
-            recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
-                    .forName(className1), (Class<? extends Writable>) Class.forName(className2));
-        } catch (ClassNotFoundException cnfe) {
-            cnfe.printStackTrace();
-        }
-        return recordDescriptor;
-    }
+	public HadoopAdapter(String namenodeUrl) {
+		jobConf = new JobConf(true);
+		jobConf.set(FS_DEFAULT_NAME, namenodeUrl);
+		if (System.getenv(MAX_MAPPERS_KEY) != null) {
+			maxMappers = Integer.parseInt(System.getenv(MAX_MAPPERS_KEY));
+		}
+		if (System.getenv(MAX_REDUCERS_KEY) != null) {
+			maxReducers = Integer.parseInt(System.getenv(MAX_REDUCERS_KEY));
+		}
+		if (System.getenv(EX_SORT_FRAME_LIMIT_KEY) != null) {
+			exSortFrame = Integer.parseInt(System
+					.getenv(EX_SORT_FRAME_LIMIT_KEY));
+		}
+	}
 
-    private Object[] getInputSplits(JobConf conf) throws IOException, ClassNotFoundException, InterruptedException {
-        if (conf.getUseNewMapper()) {
-        	return getNewInputSplits(conf);
-        } else {
-        	return getOldInputSplits(conf);
-        }
-    }
-    
-    private org.apache.hadoop.mapreduce.InputSplit[] getNewInputSplits(JobConf conf) throws ClassNotFoundException, IOException, InterruptedException {
-    	org.apache.hadoop.mapreduce.InputSplit[] splits = null;
-    	JobContext context = new JobContext(conf,null);
-    	org.apache.hadoop.mapreduce.InputFormat inputFormat = ReflectionUtils.newInstance(context.getInputFormatClass(),conf);
-    	List<org.apache.hadoop.mapreduce.InputSplit> inputSplits = inputFormat.getSplits(context);
-    	return inputSplits.toArray(new org.apache.hadoop.mapreduce.InputSplit[]{});
-    }
-    
-    private InputSplit[] getOldInputSplits(JobConf conf) throws IOException  {
-      	InputFormat inputFormat = conf.getInputFormat();
-    	return inputFormat.getSplits(conf, conf.getNumMapTasks());
-    }
-   
-    private void configurePartitionCountConstraint(JobSpecification spec, IOperatorDescriptor operator,int instanceCount){
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, instanceCount);
-        operatorInstanceCount.put(operator.getOperatorId(),instanceCount);
-    }
+	private String getEnvironmentVariable(String key, String def) {
+		String ret = System.getenv(key);
+		return ret != null ? ret : def;
+	}
 
-    public HadoopMapperOperatorDescriptor getMapper(JobConf conf,JobSpecification spec, IOperatorDescriptor previousOp)
-            throws Exception {
-        boolean selfRead = previousOp == null;
-        IHadoopClassFactory classFactory = new ClasspathBasedHadoopClassFactory();
-        HadoopMapperOperatorDescriptor mapOp = null;
-        if(selfRead) {
-            Object [] splits = getInputSplits(conf,maxMappers);
-            mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,classFactory);
-	    configurePartitionCountConstraint(spec,mapOp,splits.length);
-            System.out.println("No of  mappers :" + splits.length);
-        } else {
-	    configurePartitionCountConstraint(spec,mapOp,getInstanceCount(previousOp));
-            mapOp = new HadoopMapperOperatorDescriptor(spec,conf,classFactory);
-            spec.connect(new OneToOneConnectorDescriptor(spec), previousOp, 0, mapOp, 0);
-        }
-        return mapOp;
-    }
+	public JobConf getConf() {
+		return jobConf;
+	}
 
-    public HadoopReducerOperatorDescriptor getReducer(JobConf conf, JobSpecification spec) {
-        HadoopReducerOperatorDescriptor reduceOp = new HadoopReducerOperatorDescriptor(spec, conf, null,
-                new ClasspathBasedHadoopClassFactory());
-        return reduceOp;
-    }
+	public static VersionedProtocol getProtocol(Class protocolClass,
+			InetSocketAddress inetAddress, JobConf jobConf) throws IOException {
+		VersionedProtocol versionedProtocol = RPC.getProxy(protocolClass,
+				ClientProtocol.versionID, inetAddress, jobConf);
+		return versionedProtocol;
+	}
 
-    public FileSystem getHDFSClient() {
-        FileSystem fileSystem = null;
-        try {
-            fileSystem = FileSystem.get(jobConf);
-        } catch (IOException ioe) {
-            ioe.printStackTrace();
-        }
-        return fileSystem;
-    }
+	private static RecordDescriptor getHadoopRecordDescriptor(
+			String className1, String className2) {
+		RecordDescriptor recordDescriptor = null;
+		try {
+			recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+					(Class<? extends Writable>) Class.forName(className1),
+					(Class<? extends Writable>) Class.forName(className2));
+		} catch (ClassNotFoundException cnfe) {
+			cnfe.printStackTrace();
+		}
+		return recordDescriptor;
+	}
 
-    public JobSpecification getJobSpecification(List<JobConf> jobConfs) throws Exception {
-        JobSpecification spec = null;
-        if (jobConfs.size() == 1) {
-            spec = getJobSpecification(jobConfs.get(0));
-        } else {
-            spec = getPipelinedSpec(jobConfs);
-        }
-        return spec;
-    }
+	private Object[] getInputSplits(JobConf conf) throws IOException,
+			ClassNotFoundException, InterruptedException {
+		if (conf.getUseNewMapper()) {
+			return getNewInputSplits(conf);
+		} else {
+			return getOldInputSplits(conf);
+		}
+	}
 
-    private IOperatorDescriptor configureOutput( IOperatorDescriptor previousOperator, JobConf conf,
-            JobSpecification spec) throws Exception {
-	int instanceCountPreviousOperator = operatorInstanceCount.get(previousOperator.getOperatorId());
-        int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : instanceCountPreviousOperator;
-        HadoopWriteOperatorDescriptor writer = null;
-        writer = new HadoopWriteOperatorDescriptor(spec, conf, numOutputters);
-	configurePartitionCountConstraint(spec,writer,numOutputters);
-        spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, writer, 0);
-        return writer;
-    }
+	private org.apache.hadoop.mapreduce.InputSplit[] getNewInputSplits(
+			JobConf conf) throws ClassNotFoundException, IOException,
+			InterruptedException {
+		org.apache.hadoop.mapreduce.InputSplit[] splits = null;
+		JobContext context = new JobContext(conf, null);
+		org.apache.hadoop.mapreduce.InputFormat inputFormat = ReflectionUtils
+				.newInstance(context.getInputFormatClass(), conf);
+		List<org.apache.hadoop.mapreduce.InputSplit> inputSplits = inputFormat
+				.getSplits(context);
+		return inputSplits
+				.toArray(new org.apache.hadoop.mapreduce.InputSplit[] {});
+	}
 
+	private InputSplit[] getOldInputSplits(JobConf conf) throws IOException {
+		InputFormat inputFormat = conf.getInputFormat();
+		return inputFormat.getSplits(conf, conf.getNumMapTasks());
+	}
 
-    private int getInstanceCount(IOperatorDescriptor operator) {
-        return operatorInstanceCount.get(operator.getOperatorId());
-    } 
+	private void configurePartitionCountConstraint(JobSpecification spec,
+			IOperatorDescriptor operator, int instanceCount) {
+		PartitionConstraintHelper.addPartitionCountConstraint(spec, operator,
+				instanceCount);
+		operatorInstanceCount.put(operator.getOperatorId(), instanceCount);
+	}
 
-    private IOperatorDescriptor addCombiner(IOperatorDescriptor previousOperator, JobConf jobConf,
-            JobSpecification spec) throws Exception {
-        boolean useCombiner = (jobConf.getCombinerClass() != null);
-        IOperatorDescriptor mapSideOutputOp = previousOperator;
-        if (useCombiner) {
-            System.out.println("Using Combiner:" + jobConf.getCombinerClass().getName());
-            IOperatorDescriptor mapSideCombineSortOp = getExternalSorter(jobConf, spec);
-	    configurePartitionCountConstraint(spec,mapSideCombineSortOp,getInstanceCount(previousOperator));
-    
-            HadoopReducerOperatorDescriptor mapSideCombineReduceOp = getReducer(jobConf, spec);
-	    configurePartitionCountConstraint(spec,mapSideCombineReduceOp,getInstanceCount(previousOperator));
-            spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, mapSideCombineSortOp, 0);
-            spec.connect(new OneToOneConnectorDescriptor(spec), mapSideCombineSortOp, 0, mapSideCombineReduceOp, 0);
-            mapSideOutputOp = mapSideCombineSortOp;
-        }
-        return mapSideOutputOp;
-    }
-    
-    private int getNumReduceTasks(JobConf jobConf) {
-        int numReduceTasks = Math.min(maxReducers,jobConf.getNumReduceTasks());
-        return numReduceTasks;
-    }
-    
-    private IOperatorDescriptor addReducer(IOperatorDescriptor previousOperator, JobConf jobConf,
-            JobSpecification spec) throws Exception {
-        IOperatorDescriptor mrOutputOperator = previousOperator;
-        if (jobConf.getNumReduceTasks() != 0) {
-            IOperatorDescriptor sorter = getExternalSorter(jobConf, spec);
-            HadoopReducerOperatorDescriptor reducer = getReducer(jobConf, spec);
-            int numReduceTasks = getNumReduceTasks(jobConf);
-            System.out.println("No of Reducers :" + numReduceTasks);
-	    configurePartitionCountConstraint(spec,sorter,numReduceTasks);
-	    configurePartitionCountConstraint(spec,reducer,numReduceTasks);
-    
-            IConnectorDescriptor mToNConnectorDescriptor = getMtoNHashPartitioningConnector(jobConf, spec);
-            spec.connect(mToNConnectorDescriptor, previousOperator, 0, sorter, 0);
-            spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, reducer, 0);
-            mrOutputOperator = reducer;
-        }   
-        return mrOutputOperator;
-    }
-    
-    private long getInputSize(Object[] splits,JobConf conf) throws IOException, InterruptedException {
-        long totalInputSize =0;
-    	if(conf.getUseNewMapper()) {
-        	for (org.apache.hadoop.mapreduce.InputSplit split : (org.apache.hadoop.mapreduce.InputSplit[])splits) {
-        	    totalInputSize += split.getLength();
-            }                                       
-        } else {
-	    	for (InputSplit split : (InputSplit[])splits) {
-	            totalInputSize += split.getLength();
-	        }
-        }
-    	return totalInputSize;
-    }
-    
-    private Object[] getInputSplits(JobConf conf, int desiredMaxMappers) throws Exception {
-        Object[] splits = getInputSplits(conf);
-        System.out.println(" initial split count :" + splits.length);
-        System.out.println(" desired mappers :" + desiredMaxMappers);
-        if (splits.length > desiredMaxMappers) {
-            long totalInputSize = getInputSize(splits,conf);
-            long goalSize = (totalInputSize/desiredMaxMappers);
-            System.out.println(" total input length :" + totalInputSize);
-            System.out.println(" goal size :" + goalSize);
-            conf.setLong("mapred.min.split.size", goalSize);
-            conf.setNumMapTasks(desiredMaxMappers);
-            splits = getInputSplits(conf);
-            System.out.println(" revised split count :" + splits.length);
-        }
-        return splits; 
-    }
-    
-    public JobSpecification getPipelinedSpec(List<JobConf> jobConfs) throws Exception {
-        JobSpecification spec = new JobSpecification();
-        Iterator<JobConf> iterator = jobConfs.iterator();
-        JobConf firstMR = iterator.next();
-        IOperatorDescriptor mrOutputOp = configureMapReduce(null, spec,firstMR);
-        while (iterator.hasNext())
-            for (JobConf currentJobConf : jobConfs) {
-                mrOutputOp = configureMapReduce(mrOutputOp, spec , currentJobConf);
-            }
-        configureOutput(mrOutputOp, jobConfs.get(jobConfs.size() - 1), spec);
-        return spec;
-    }
+	public HadoopMapperOperatorDescriptor getMapper(JobConf conf,
+			JobSpecification spec, IOperatorDescriptor previousOp)
+			throws Exception {
+		boolean selfRead = previousOp == null;
+		IHadoopClassFactory classFactory = new ClasspathBasedHadoopClassFactory();
+		HadoopMapperOperatorDescriptor mapOp = null;
+		if (selfRead) {
+			Object[] splits = getInputSplits(conf, maxMappers);
+			mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,
+					classFactory);
+			configurePartitionCountConstraint(spec, mapOp, splits.length);
+		} else {
+			configurePartitionCountConstraint(spec, mapOp,
+					getInstanceCount(previousOp));
+			mapOp = new HadoopMapperOperatorDescriptor(spec, conf, classFactory);
+			spec.connect(new OneToOneConnectorDescriptor(spec), previousOp, 0,
+					mapOp, 0);
+		}
+		return mapOp;
+	}
 
-    public JobSpecification getJobSpecification(JobConf conf) throws Exception {
-        JobSpecification spec = new JobSpecification();
-        IOperatorDescriptor mrOutput = configureMapReduce(null,spec, conf);
-        IOperatorDescriptor printer = configureOutput(mrOutput, conf, spec);
-        spec.addRoot(printer);
-        System.out.println(spec);
-        return spec;
-    }
-    
-    private IOperatorDescriptor configureMapReduce(IOperatorDescriptor previousOuputOp, JobSpecification spec, JobConf conf) throws Exception {
-        IOperatorDescriptor mapper = getMapper(conf,spec,previousOuputOp);
-        IOperatorDescriptor mapSideOutputOp = addCombiner(mapper,conf,spec);
-        IOperatorDescriptor reducer = addReducer(mapSideOutputOp, conf, spec);
-        return reducer; 
-    }
+	public HadoopReducerOperatorDescriptor getReducer(JobConf conf,
+			JobSpecification spec, boolean useAsCombiner) {
+		HadoopReducerOperatorDescriptor reduceOp = new HadoopReducerOperatorDescriptor(
+				spec, conf, null, new ClasspathBasedHadoopClassFactory(),
+				useAsCombiner);
+		return reduceOp;
+	}
 
-    public static InMemorySortOperatorDescriptor getInMemorySorter(JobConf conf, JobSpecification spec) {
-        InMemorySortOperatorDescriptor inMemorySortOp = null;
-        RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf.getMapOutputKeyClass().getName(), conf
-                .getMapOutputValueClass().getName());
-        Class<? extends RawComparator> rawComparatorClass = null;
-        WritableComparator writableComparator = WritableComparator.get(conf.getMapOutputKeyClass().asSubclass(
-                WritableComparable.class));
-        WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
-                writableComparator.getClass());
-        inMemorySortOp = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
-                new IBinaryComparatorFactory[] { comparatorFactory }, recordDescriptor);
-        return inMemorySortOp;
-    }
+	public FileSystem getHDFSClient() {
+		FileSystem fileSystem = null;
+		try {
+			fileSystem = FileSystem.get(jobConf);
+		} catch (IOException ioe) {
+			ioe.printStackTrace();
+		}
+		return fileSystem;
+	}
 
-    public static ExternalSortOperatorDescriptor getExternalSorter(JobConf conf, JobSpecification spec) {
-        ExternalSortOperatorDescriptor externalSortOp = null;
-        RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf.getMapOutputKeyClass().getName(), conf
-                .getMapOutputValueClass().getName());
-        Class<? extends RawComparator> rawComparatorClass = null;
-        WritableComparator writableComparator = WritableComparator.get(conf.getMapOutputKeyClass().asSubclass(
-                WritableComparable.class));
-        WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
-                writableComparator.getClass());
-        externalSortOp = new ExternalSortOperatorDescriptor(spec,conf.getInt(HYRACKS_EX_SORT_FRAME_LIMIT,DEFAULT_EX_SORT_FRAME_LIMIT),new int[] { 0 },
-                new IBinaryComparatorFactory[] { comparatorFactory }, recordDescriptor);
-        return externalSortOp;
-    }
-    
-    public static MToNHashPartitioningConnectorDescriptor getMtoNHashPartitioningConnector(JobConf conf,
-            JobSpecification spec) {
+	public JobSpecification getJobSpecification(List<JobConf> jobConfs)
+			throws Exception {
+		JobSpecification spec = null;
+		if (jobConfs.size() == 1) {
+			spec = getJobSpecification(jobConfs.get(0));
+		} else {
+			spec = getPipelinedSpec(jobConfs);
+		}
+		return spec;
+	}
 
-        Class mapOutputKeyClass = conf.getMapOutputKeyClass();
-        Class mapOutputValueClass = conf.getMapOutputValueClass();
+	private IOperatorDescriptor configureOutput(
+			IOperatorDescriptor previousOperator, JobConf conf,
+			JobSpecification spec) throws Exception {
+		int instanceCountPreviousOperator = operatorInstanceCount
+				.get(previousOperator.getOperatorId());
+		int numOutputters = conf.getNumReduceTasks() != 0 ? conf
+				.getNumReduceTasks() : instanceCountPreviousOperator;
+		HadoopWriteOperatorDescriptor writer = null;
+		writer = new HadoopWriteOperatorDescriptor(spec, conf, numOutputters);
+		configurePartitionCountConstraint(spec, writer, numOutputters);
+		spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator,
+				0, writer, 0);
+		return writer;
+	}
 
-        MToNHashPartitioningConnectorDescriptor connectorDescriptor = null;
-        ITuplePartitionComputerFactory factory = null;
-        conf.getMapOutputKeyClass();
-        if (conf.getPartitionerClass() != null && !conf.getPartitionerClass().getName().startsWith("org.apache.hadoop")) {
-            Class<? extends Partitioner> partitioner = conf.getPartitionerClass();
-            factory = new HadoopPartitionerTuplePartitionComputerFactory(partitioner, DatatypeHelper
-                    .createSerializerDeserializer(mapOutputKeyClass), DatatypeHelper
-                    .createSerializerDeserializer(mapOutputValueClass));
-        } else {
-            RecordDescriptor recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(mapOutputKeyClass,
-                    mapOutputValueClass);
-            ISerializerDeserializer mapOutputKeySerializerDerserializer = DatatypeHelper
-                    .createSerializerDeserializer(mapOutputKeyClass);
-            factory = new HadoopHashTuplePartitionComputerFactory(mapOutputKeySerializerDerserializer);
-        }
-        connectorDescriptor = new MToNHashPartitioningConnectorDescriptor(spec, factory);
-        return connectorDescriptor;
-    }
+	private int getInstanceCount(IOperatorDescriptor operator) {
+		return operatorInstanceCount.get(operator.getOperatorId());
+	}
+
+	private IOperatorDescriptor addCombiner(
+			IOperatorDescriptor previousOperator, JobConf jobConf,
+			JobSpecification spec) throws Exception {
+		boolean useCombiner = (jobConf.getCombinerClass() != null);
+		IOperatorDescriptor mapSideOutputOp = previousOperator;
+		if (useCombiner) {
+			System.out.println("Using Combiner:"
+					+ jobConf.getCombinerClass().getName());
+			IOperatorDescriptor mapSideCombineSortOp = getExternalSorter(
+					jobConf, spec);
+			configurePartitionCountConstraint(spec, mapSideCombineSortOp,
+					getInstanceCount(previousOperator));
+
+			HadoopReducerOperatorDescriptor mapSideCombineReduceOp = getReducer(
+					jobConf, spec, true);
+			configurePartitionCountConstraint(spec, mapSideCombineReduceOp,
+					getInstanceCount(previousOperator));
+			spec.connect(new OneToOneConnectorDescriptor(spec),
+					previousOperator, 0, mapSideCombineSortOp, 0);
+			spec.connect(new OneToOneConnectorDescriptor(spec),
+					mapSideCombineSortOp, 0, mapSideCombineReduceOp, 0);
+			mapSideOutputOp = mapSideCombineReduceOp;
+		}
+		return mapSideOutputOp;
+	}
+
+	private int getNumReduceTasks(JobConf jobConf) {
+		int numReduceTasks = Math.min(maxReducers, jobConf.getNumReduceTasks());
+		return numReduceTasks;
+	}
+
+	private IOperatorDescriptor addReducer(
+			IOperatorDescriptor previousOperator, JobConf jobConf,
+			JobSpecification spec) throws Exception {
+		IOperatorDescriptor mrOutputOperator = previousOperator;
+		if (jobConf.getNumReduceTasks() != 0) {
+			IOperatorDescriptor sorter = getExternalSorter(jobConf, spec);
+			HadoopReducerOperatorDescriptor reducer = getReducer(jobConf, spec,
+					false);
+			int numReduceTasks = getNumReduceTasks(jobConf);
+			configurePartitionCountConstraint(spec, sorter, numReduceTasks);
+			configurePartitionCountConstraint(spec, reducer, numReduceTasks);
+
+			IConnectorDescriptor mToNConnectorDescriptor = getMtoNHashPartitioningConnector(
+					jobConf, spec);
+			spec.connect(mToNConnectorDescriptor, previousOperator, 0, sorter,
+					0);
+			spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0,
+					reducer, 0);
+			mrOutputOperator = reducer;
+		}
+		return mrOutputOperator;
+	}
+
+	private long getInputSize(Object[] splits, JobConf conf)
+			throws IOException, InterruptedException {
+		long totalInputSize = 0;
+		if (conf.getUseNewMapper()) {
+			for (org.apache.hadoop.mapreduce.InputSplit split : (org.apache.hadoop.mapreduce.InputSplit[]) splits) {
+				totalInputSize += split.getLength();
+			}
+		} else {
+			for (InputSplit split : (InputSplit[]) splits) {
+				totalInputSize += split.getLength();
+			}
+		}
+		return totalInputSize;
+	}
+
+	private Object[] getInputSplits(JobConf conf, int desiredMaxMappers)
+			throws Exception {
+		Object[] splits = getInputSplits(conf);
+		if (splits.length > desiredMaxMappers) {
+			long totalInputSize = getInputSize(splits, conf);
+			long goalSize = (totalInputSize / desiredMaxMappers);
+			conf.setLong("mapred.min.split.size", goalSize);
+			conf.setNumMapTasks(desiredMaxMappers);
+			splits = getInputSplits(conf);
+		}
+		return splits;
+	}
+
+	public JobSpecification getPipelinedSpec(List<JobConf> jobConfs)
+			throws Exception {
+		JobSpecification spec = new JobSpecification();
+		Iterator<JobConf> iterator = jobConfs.iterator();
+		JobConf firstMR = iterator.next();
+		IOperatorDescriptor mrOutputOp = configureMapReduce(null, spec, firstMR);
+		while (iterator.hasNext())
+			for (JobConf currentJobConf : jobConfs) {
+				mrOutputOp = configureMapReduce(mrOutputOp, spec,
+						currentJobConf);
+			}
+		configureOutput(mrOutputOp, jobConfs.get(jobConfs.size() - 1), spec);
+		return spec;
+	}
+
+	public JobSpecification getJobSpecification(JobConf conf) throws Exception {
+		JobSpecification spec = new JobSpecification();
+		IOperatorDescriptor mrOutput = configureMapReduce(null, spec, conf);
+		IOperatorDescriptor printer = configureOutput(mrOutput, conf, spec);
+		spec.addRoot(printer);
+		System.out.println(spec);
+		return spec;
+	}
+
+	private IOperatorDescriptor configureMapReduce(
+			IOperatorDescriptor previousOuputOp, JobSpecification spec,
+			JobConf conf) throws Exception {
+		IOperatorDescriptor mapper = getMapper(conf, spec, previousOuputOp);
+		IOperatorDescriptor mapSideOutputOp = addCombiner(mapper, conf, spec);
+		IOperatorDescriptor reducer = addReducer(mapSideOutputOp, conf, spec);
+		return reducer;
+	}
+
+	public static InMemorySortOperatorDescriptor getInMemorySorter(
+			JobConf conf, JobSpecification spec) {
+		InMemorySortOperatorDescriptor inMemorySortOp = null;
+		RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf
+				.getMapOutputKeyClass().getName(), conf
+				.getMapOutputValueClass().getName());
+		Class<? extends RawComparator> rawComparatorClass = null;
+		WritableComparator writableComparator = WritableComparator.get(conf
+				.getMapOutputKeyClass().asSubclass(WritableComparable.class));
+		WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
+				writableComparator.getClass());
+		inMemorySortOp = new InMemorySortOperatorDescriptor(spec,
+				new int[] { 0 },
+				new IBinaryComparatorFactory[] { comparatorFactory },
+				recordDescriptor);
+		return inMemorySortOp;
+	}
+
+	public static ExternalSortOperatorDescriptor getExternalSorter(
+			JobConf conf, JobSpecification spec) {
+		ExternalSortOperatorDescriptor externalSortOp = null;
+		RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf
+				.getMapOutputKeyClass().getName(), conf
+				.getMapOutputValueClass().getName());
+		Class<? extends RawComparator> rawComparatorClass = null;
+		WritableComparator writableComparator = WritableComparator.get(conf
+				.getMapOutputKeyClass().asSubclass(WritableComparable.class));
+		WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
+				writableComparator.getClass());
+		externalSortOp = new ExternalSortOperatorDescriptor(spec, conf.getInt(
+				HYRACKS_EX_SORT_FRAME_LIMIT, DEFAULT_EX_SORT_FRAME_LIMIT),
+				new int[] { 0 },
+				new IBinaryComparatorFactory[] { comparatorFactory },
+				recordDescriptor);
+		return externalSortOp;
+	}
+
+	public static MToNHashPartitioningConnectorDescriptor getMtoNHashPartitioningConnector(
+			JobConf conf, JobSpecification spec) {
+
+		Class mapOutputKeyClass = conf.getMapOutputKeyClass();
+		Class mapOutputValueClass = conf.getMapOutputValueClass();
+
+		MToNHashPartitioningConnectorDescriptor connectorDescriptor = null;
+		ITuplePartitionComputerFactory factory = null;
+		conf.getMapOutputKeyClass();
+		if (conf.getPartitionerClass() != null
+				&& !conf.getPartitionerClass().getName().startsWith(
+						"org.apache.hadoop")) {
+			Class<? extends Partitioner> partitioner = conf
+					.getPartitionerClass();
+			factory = new HadoopPartitionerTuplePartitionComputerFactory(
+					partitioner, DatatypeHelper
+							.createSerializerDeserializer(mapOutputKeyClass),
+					DatatypeHelper
+							.createSerializerDeserializer(mapOutputValueClass));
+		} else {
+			RecordDescriptor recordDescriptor = DatatypeHelper
+					.createKeyValueRecordDescriptor(mapOutputKeyClass,
+							mapOutputValueClass);
+			ISerializerDeserializer mapOutputKeySerializerDerserializer = DatatypeHelper
+					.createSerializerDeserializer(mapOutputKeyClass);
+			factory = new HadoopHashTuplePartitionComputerFactory(
+					mapOutputKeySerializerDerserializer);
+		}
+		connectorDescriptor = new MToNHashPartitioningConnectorDescriptor(spec,
+				factory);
+		return connectorDescriptor;
+	}
 
 }