1) made changes to pom.xml for hadoopcompatapp , so that working directory is appropriately set for the CC and NCs 2) refactored code

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_hadoop_compat_changes@479 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/.settings/org.eclipse.jdt.core.prefs b/hyracks-api/.settings/org.eclipse.jdt.core.prefs
index a0e106b..6b7a0fc 100644
--- a/hyracks-api/.settings/org.eclipse.jdt.core.prefs
+++ b/hyracks-api/.settings/org.eclipse.jdt.core.prefs
@@ -1,4 +1,4 @@
-#Thu Jul 29 01:10:06 PDT 2010
+#Thu Jun 02 12:55:19 PDT 2011
 eclipse.preferences.version=1
 org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
 org.eclipse.jdt.core.compiler.compliance=1.6
diff --git a/hyracks-cli/.classpath b/hyracks-cli/.classpath
index ba0bb5a..1f3c1ff 100644
--- a/hyracks-cli/.classpath
+++ b/hyracks-cli/.classpath
@@ -1,7 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <classpath>
 	<classpathentry kind="src" output="target/classes" path="src/main/java"/>
-	<classpathentry kind="src" path="target/generated-sources/javacc"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
 	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
 	<classpathentry kind="output" path="target/classes"/>
diff --git a/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs b/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
index 7404d54..0272d6e 100644
--- a/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
+++ b/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
@@ -1,5 +1,15 @@
-#Tue Nov 02 17:09:03 PDT 2010
+#Thu Jun 02 13:11:16 PDT 2011
 eclipse.preferences.version=1
+org.eclipse.jdt.core.codeComplete.argumentPrefixes=
+org.eclipse.jdt.core.codeComplete.argumentSuffixes=
+org.eclipse.jdt.core.codeComplete.fieldPrefixes=
+org.eclipse.jdt.core.codeComplete.fieldSuffixes=
+org.eclipse.jdt.core.codeComplete.localPrefixes=
+org.eclipse.jdt.core.codeComplete.localSuffixes=
+org.eclipse.jdt.core.codeComplete.staticFieldPrefixes=
+org.eclipse.jdt.core.codeComplete.staticFieldSuffixes=
+org.eclipse.jdt.core.codeComplete.staticFinalFieldPrefixes=
+org.eclipse.jdt.core.codeComplete.staticFinalFieldSuffixes=
 org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
 org.eclipse.jdt.core.compiler.compliance=1.6
 org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
@@ -14,7 +24,7 @@
 org.eclipse.jdt.core.formatter.alignment_for_binary_expression=16
 org.eclipse.jdt.core.formatter.alignment_for_compact_if=16
 org.eclipse.jdt.core.formatter.alignment_for_conditional_expression=80
-org.eclipse.jdt.core.formatter.alignment_for_enum_constants=49
+org.eclipse.jdt.core.formatter.alignment_for_enum_constants=48
 org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer=16
 org.eclipse.jdt.core.formatter.alignment_for_multiple_fields=16
 org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration=16
@@ -48,18 +58,18 @@
 org.eclipse.jdt.core.formatter.brace_position_for_switch=end_of_line
 org.eclipse.jdt.core.formatter.brace_position_for_type_declaration=end_of_line
 org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment=false
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment=true
-org.eclipse.jdt.core.formatter.comment.format_block_comments=false
+org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment=false
+org.eclipse.jdt.core.formatter.comment.format_block_comments=true
 org.eclipse.jdt.core.formatter.comment.format_header=false
 org.eclipse.jdt.core.formatter.comment.format_html=true
 org.eclipse.jdt.core.formatter.comment.format_javadoc_comments=true
-org.eclipse.jdt.core.formatter.comment.format_line_comments=false
+org.eclipse.jdt.core.formatter.comment.format_line_comments=true
 org.eclipse.jdt.core.formatter.comment.format_source_code=true
 org.eclipse.jdt.core.formatter.comment.indent_parameter_description=true
 org.eclipse.jdt.core.formatter.comment.indent_root_tags=true
 org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags=insert
 org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter=insert
-org.eclipse.jdt.core.formatter.comment.line_length=9999
+org.eclipse.jdt.core.formatter.comment.line_length=80
 org.eclipse.jdt.core.formatter.compact_else_if=true
 org.eclipse.jdt.core.formatter.continuation_indentation=2
 org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer=2
@@ -246,7 +256,7 @@
 org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant=do not insert
 org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration=do not insert
 org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.join_lines_in_comments=false
+org.eclipse.jdt.core.formatter.join_lines_in_comments=true
 org.eclipse.jdt.core.formatter.join_wrapped_lines=true
 org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line=false
 org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line=false
@@ -257,7 +267,7 @@
 org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column=false
 org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body=0
 org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve=1
-org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line=false
+org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line=true
 org.eclipse.jdt.core.formatter.tabulation.char=space
 org.eclipse.jdt.core.formatter.tabulation.size=4
 org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations=false
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index a1c14c5..fd549b3 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -135,7 +135,7 @@
             }
             this.writer = writer;
         }
-     
+
         protected void initializeMapper() throws HyracksDataException {
             super.initializeMapper();
             try {
@@ -146,8 +146,7 @@
             if (!conf.getUseNewMapper()) {
                 ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
             }
-        } 
-
+        }
 
     }
 
@@ -344,7 +343,7 @@
                 mapperClass = conf.getMapperClass();
                 mapperClassName = mapperClass.getName();
             }
-            mapper = getHadoopClassFactory().createMapper(mapperClassName,conf);
+            mapper = getHadoopClassFactory().createMapper(mapperClassName, conf);
         }
         return mapper;
     }
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index 195b7c4..5857f36 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -60,64 +60,66 @@
         private Object reducer;
         private DataWritingOutputCollector<K3, V3> output;
         private Reporter reporter;
-        private ReducerContext reducerContext; 
+        private ReducerContext reducerContext;
         RawKeyValueIterator rawKeyValueIterator = new RawKeyValueIterator() {
-            
+
             @Override
             public boolean next() throws IOException {
                 return false;
             }
-            
+
             @Override
             public DataInputBuffer getValue() throws IOException {
                 return null;
             }
-            
+
             @Override
             public Progress getProgress() {
                 return null;
             }
-            
+
             @Override
             public DataInputBuffer getKey() throws IOException {
                 return null;
             }
-            
+
             @Override
             public void close() throws IOException {
-                
+
             }
         };
-        
-    
+
         class ReducerContext extends org.apache.hadoop.mapreduce.Reducer.Context {
             private HadoopReducerOperatorDescriptor.ValueIterator iterator;
-            
+
             @SuppressWarnings("unchecked")
-            ReducerContext(org.apache.hadoop.mapreduce.Reducer reducer, JobConf conf) throws IOException, InterruptedException, ClassNotFoundException{
-            
-                reducer.super(conf,new TaskAttemptID(),rawKeyValueIterator,null,null,null,null,null,null,Class.forName("org.apache.hadoop.io.NullWritable"),Class.forName("org.apache.hadoop.io.NullWritable"));
+            ReducerContext(org.apache.hadoop.mapreduce.Reducer reducer, JobConf conf) throws IOException,
+                    InterruptedException, ClassNotFoundException {
+
+                reducer.super(conf, new TaskAttemptID(), rawKeyValueIterator, null, null, null, null, null, null, Class
+                        .forName("org.apache.hadoop.io.NullWritable"), Class
+                        .forName("org.apache.hadoop.io.NullWritable"));
             }
-            
-            public  void setIterator(HadoopReducerOperatorDescriptor.ValueIterator iter) {
+
+            public void setIterator(HadoopReducerOperatorDescriptor.ValueIterator iter) {
                 iterator = iter;
             }
-            
+
             @Override
             public Iterable<V2> getValues() throws IOException, InterruptedException {
-              return new Iterable<V2>() {
-                @Override
-                public Iterator<V2> iterator() {
-                    return iterator;
-                }
-              };
+                return new Iterable<V2>() {
+                    @Override
+                    public Iterator<V2> iterator() {
+                        return iterator;
+                    }
+                };
             }
-            
+
             /** Start processing next unique key. */
             @Override
-            public boolean nextKey() throws IOException,InterruptedException {
+            public boolean nextKey() throws IOException, InterruptedException {
                 boolean hasMore = iterator.hasNext();
-                if(hasMore){
+                if (hasMore) {
                     nextKeyValue();
                 }
                 return hasMore;
@@ -133,26 +135,25 @@
             }
 
             public Object getCurrentKey() {
-              return iterator.getKey();
+                return iterator.getKey();
             }
 
             @Override
             public Object getCurrentValue() {
-              return iterator.getValue();
+                return iterator.getValue();
             }
-            
+
             /**
              * Generate an output key/value pair.
              */
             @Override
-            public void write(Object key, Object value
-                              ) throws IOException, InterruptedException {
-              output.collect(key, value);
+            public void write(Object key, Object value) throws IOException, InterruptedException {
+                output.collect(key, value);
             }
 
         }
-            
-        public ReducerAggregator(Object reducer) throws HyracksDataException{
+
+        public ReducerAggregator(Object reducer) throws HyracksDataException {
             this.reducer = reducer;
             initializeReducer();
             output = new DataWritingOutputCollector<K3, V3>();
@@ -201,17 +202,17 @@
             i.reset(reader);
             output.setWriter(writer);
             try {
-                if(jobConf.getUseNewReducer()){
+                if (jobConf.getUseNewReducer()) {
                     try {
                         reducerContext.setIterator(i);
-                        ((org.apache.hadoop.mapreduce.Reducer)reducer).run(reducerContext);
+                        ((org.apache.hadoop.mapreduce.Reducer) reducer).run(reducerContext);
                     } catch (InterruptedException e) {
                         e.printStackTrace();
                         throw new HyracksDataException(e);
                     }
-                } else {  
-                    ((org.apache.hadoop.mapred.Reducer)reducer).reduce(i.getKey(), i, output, reporter);
-                }    
+                } else {
+                    ((org.apache.hadoop.mapred.Reducer) reducer).reduce(i.getKey(), i, output, reporter);
+                }
             } catch (IOException e) {
                 e.printStackTrace();
             }
@@ -221,28 +222,28 @@
         public void close() throws HyracksDataException {
             // -- - close - --
             try {
-                if(!jobConf.getUseNewMapper()) {
-                    ((org.apache.hadoop.mapred.Reducer)reducer).close();
-                }    
+                if (!jobConf.getUseNewMapper()) {
+                    ((org.apache.hadoop.mapred.Reducer) reducer).close();
+                }
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
         }
-        
+
         private void initializeReducer() throws HyracksDataException {
             jobConf.setClassLoader(this.getClass().getClassLoader());
-            if(!jobConf.getUseNewReducer()) {
-                ((org.apache.hadoop.mapred.Reducer)reducer).configure(getJobConf());    
+            if (!jobConf.getUseNewReducer()) {
+                ((org.apache.hadoop.mapred.Reducer) reducer).configure(getJobConf());
             } else {
                 try {
-                    reducerContext = new ReducerContext((org.apache.hadoop.mapreduce.Reducer)reducer,jobConf);
+                    reducerContext = new ReducerContext((org.apache.hadoop.mapreduce.Reducer) reducer, jobConf);
                 } catch (IOException e) {
                     e.printStackTrace();
                     throw new HyracksDataException(e);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                     throw new HyracksDataException(e);
-                } catch (RuntimeException e){
+                } catch (RuntimeException e) {
                     e.printStackTrace();
                 } catch (ClassNotFoundException e) {
                     e.printStackTrace();
@@ -259,7 +260,7 @@
         public K2 getKey() {
             return key;
         }
-        
+
         public V2 getValue() {
             return value;
         }
@@ -313,7 +314,7 @@
     private boolean useAsCombiner = false;
 
     public HadoopReducerOperatorDescriptor(JobSpecification spec, JobConf conf, IComparatorFactory comparatorFactory,
-            IHadoopClassFactory classFactory,boolean useAsCombiner) {
+            IHadoopClassFactory classFactory, boolean useAsCombiner) {
         super(spec, 1, getRecordDescriptor(conf, classFactory), conf, classFactory);
         this.comparatorFactory = comparatorFactory;
         this.useAsCombiner = useAsCombiner;
@@ -324,22 +325,24 @@
             return ReflectionUtils.newInstance(reducerClass, getJobConf());
         } else {
             Object reducer;
-            if(!useAsCombiner) {
-                if(getJobConf().getUseNewReducer()){
+            if (!useAsCombiner) {
+                if (getJobConf().getUseNewReducer()) {
                     JobContext jobContext = new JobContext(getJobConf(), null);
-                    reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?,?,?,?>> )jobContext.getReducerClass();
+                    reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?>>) jobContext
+                            .getReducerClass();
                 } else {
                     reducerClass = (Class<? extends Reducer>) getJobConf().getReducerClass();
                 }
             } else {
-                if(getJobConf().getUseNewReducer()){
+                if (getJobConf().getUseNewReducer()) {
                     JobContext jobContext = new JobContext(getJobConf(), null);
-                    reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?,?,?,?>> )jobContext.getCombinerClass();
+                    reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?>>) jobContext
+                            .getCombinerClass();
                 } else {
                     reducerClass = (Class<? extends Reducer>) getJobConf().getCombinerClass();
                 }
             }
-            reducer = getHadoopClassFactory().createReducer(reducerClass.getName(),getJobConf());
+            reducer = getHadoopClassFactory().createReducer(reducerClass.getName(), getJobConf());
             return reducer;
         }
     }
@@ -377,18 +380,18 @@
     }
 
     public static RecordDescriptor getRecordDescriptor(JobConf conf, IHadoopClassFactory classFactory) {
-        String outputKeyClassName =null; 
+        String outputKeyClassName = null;
         String outputValueClassName = null;
-        
-        if(conf.getUseNewMapper()) {
-            JobContext context = new JobContext(conf,null);
+
+        if (conf.getUseNewMapper()) {
+            JobContext context = new JobContext(conf, null);
             outputKeyClassName = context.getOutputKeyClass().getName();
             outputValueClassName = context.getOutputValueClass().getName();
         } else {
             outputKeyClassName = conf.getOutputKeyClass().getName();
             outputValueClassName = conf.getOutputValueClass().getName();
         }
-        
+
         RecordDescriptor recordDescriptor = null;
         try {
             if (classFactory == null) {
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
index 30584b7..2154f5a 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
@@ -21,7 +21,7 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -32,7 +32,6 @@
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.mapred.FileOutputCommitter;
 
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -43,58 +42,61 @@
 
 public class HadoopWriteOperatorDescriptor extends AbstractFileWriteOperatorDescriptor {
 
-    private  class HadoopFileWriter implements IRecordWriter {
+    private class HadoopFileWriter implements IRecordWriter {
 
         Object recordWriter;
         JobConf conf;
         Path finalOutputFile;
         Path tempOutputFile;
-        Path tempDir; 
-        
+        Path tempDir;
+
         HadoopFileWriter(Object recordWriter, int index, JobConf conf) throws Exception {
             this.recordWriter = recordWriter;
             this.conf = conf;
-	    initialize(index, conf);
+            initialize(index, conf);
         }
 
-
         private void initialize(int index, JobConf conf) throws Exception {
-            if(! (conf.getOutputFormat() instanceof NullOutputFormat)) {
+            if (!(conf.getOutputFormat() instanceof NullOutputFormat)) {
                 boolean isMap = conf.getNumReduceTasks() == 0;
-                TaskAttemptID taskAttempId = new TaskAttemptID("0",index,isMap,index,index);
-                conf.set("mapred.task.id",taskAttempId.toString());
-                String suffix =  new String("part-00000");
+                TaskAttemptID taskAttempId = new TaskAttemptID("0", index, isMap, index, index);
+                conf.set("mapred.task.id", taskAttempId.toString());
+                String suffix = new String("part-00000");
                 suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
                 suffix = suffix + index;
                 outputPath = new Path(conf.get("mapred.output.dir"));
-	        tempDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
-	        FileSystem fileSys = tempDir.getFileSystem(conf);
-	        if (!fileSys.mkdirs(tempDir)) {
-	           throw new IOException("Mkdirs failed to create " + tempDir.toString());
-	        }
-                tempOutputFile = new Path(tempDir,new Path("_" + taskAttempId.toString()));
-                tempOutputFile = new Path(tempOutputFile,suffix);
-                finalOutputFile = new Path(outputPath,suffix);
-                if(conf.getUseNewMapper()){
-                    org.apache.hadoop.mapreduce.JobContext jobContext = new org.apache.hadoop.mapreduce.JobContext(conf,null);
-                    org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat)ReflectionUtils.newInstance(jobContext.getOutputFormatClass(),conf);
-                    recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, taskAttempId));
-                }else {
-                    recordWriter = conf.getOutputFormat().getRecordWriter(FileSystem.get(conf), conf,suffix, new Progressable() {
-                    @Override
-                    public void progress() {}
-                    });
+                tempDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+                FileSystem fileSys = tempDir.getFileSystem(conf);
+                if (!fileSys.mkdirs(tempDir)) {
+                    throw new IOException("Mkdirs failed to create " + tempDir.toString());
                 }
-	    }
-	}
+                tempOutputFile = new Path(tempDir, new Path("_" + taskAttempId.toString()));
+                tempOutputFile = new Path(tempOutputFile, suffix);
+                finalOutputFile = new Path(outputPath, suffix);
+                if (conf.getUseNewMapper()) {
+                    org.apache.hadoop.mapreduce.JobContext jobContext = new org.apache.hadoop.mapreduce.JobContext(
+                            conf, null);
+                    org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat) ReflectionUtils
+                            .newInstance(jobContext.getOutputFormatClass(), conf);
+                    recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, taskAttempId));
+                } else {
+                    recordWriter = conf.getOutputFormat().getRecordWriter(FileSystem.get(conf), conf, suffix,
+                            new Progressable() {
+                                @Override
+                                public void progress() {
+                                }
+                            });
+                }
+            }
+        }
 
         @Override
         public void write(Object[] record) throws Exception {
-            if(recordWriter != null){
-                if (conf.getUseNewMapper()){
-                    ((org.apache.hadoop.mapreduce.RecordWriter)recordWriter).write(record[0], record[1]);
+            if (recordWriter != null) {
+                if (conf.getUseNewMapper()) {
+                    ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).write(record[0], record[1]);
                 } else {
-                    ((org.apache.hadoop.mapred.RecordWriter)recordWriter).write(record[0], record[1]);
+                    ((org.apache.hadoop.mapred.RecordWriter) recordWriter).write(record[0], record[1]);
                 }
             }
         }
@@ -102,18 +104,19 @@
         @Override
         public void close() {
             try {
-                if(recordWriter != null) {
-                    if (conf.getUseNewMapper()){
-                       ((org.apache.hadoop.mapreduce.RecordWriter)recordWriter).close(new TaskAttemptContext(conf, new TaskAttemptID()));
+                if (recordWriter != null) {
+                    if (conf.getUseNewMapper()) {
+                        ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).close(new TaskAttemptContext(conf,
+                                new TaskAttemptID()));
                     } else {
-                        ((org.apache.hadoop.mapred.RecordWriter)recordWriter).close(null);
+                        ((org.apache.hadoop.mapred.RecordWriter) recordWriter).close(null);
                     }
-	            if (outputPath != null) {
-		        FileSystem fileSystem = FileSystem.get(conf);
+                    if (outputPath != null) {
+                        FileSystem fileSystem = FileSystem.get(conf);
                         fileSystem.rename(tempOutputFile, finalOutputFile);
-                        fileSystem.delete(tempDir,true);
-		    }	
-                } 
+                        fileSystem.delete(tempDir, true);
+                    }
+                }
             } catch (Exception e) {
                 e.printStackTrace();
             }
@@ -129,53 +132,51 @@
         conf.setClassLoader(this.getClass().getClassLoader());
         Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
         FileSystem fileSystem = FileSystem.get(conf);
-        checkIfCanWriteToHDFS(new FileSplit[] { fileSplit });
-        Object recordWriter  = null;
+        Object recordWriter = null;
         return new HadoopFileWriter(recordWriter, index, conf);
     }
-    
 
     Path outputPath;
     Path outputTempPath;
-   
+
     protected Reporter createReporter() {
-    return new Reporter() {
-        @Override
-        public Counter getCounter(Enum<?> name) {
-            return null;
-        }
+        return new Reporter() {
+            @Override
+            public Counter getCounter(Enum<?> name) {
+                return null;
+            }
 
-        @Override
-        public Counter getCounter(String group, String name) {
-            return null;
-        }
+            @Override
+            public Counter getCounter(String group, String name) {
+                return null;
+            }
 
-        @Override
-        public InputSplit getInputSplit() throws UnsupportedOperationException {
-            return null;
-        }
+            @Override
+            public InputSplit getInputSplit() throws UnsupportedOperationException {
+                return null;
+            }
 
-        @Override
-        public void incrCounter(Enum<?> key, long amount) {
+            @Override
+            public void incrCounter(Enum<?> key, long amount) {
 
-        }
+            }
 
-        @Override
-        public void incrCounter(String group, String counter, long amount) {
+            @Override
+            public void incrCounter(String group, String counter, long amount) {
 
-        }
+            }
 
-        @Override
-        public void progress() {
+            @Override
+            public void progress() {
 
-        }
+            }
 
-        @Override
-        public void setStatus(String status) {
+            @Override
+            public void setStatus(String status) {
 
-        }
-    };
-}
+            }
+        };
+    }
 
     private boolean checkIfCanWriteToHDFS(FileSplit[] fileSplits) throws Exception {
         JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
@@ -197,8 +198,9 @@
     private static FileSplit[] getOutputSplits(JobConf conf, int noOfMappers) throws ClassNotFoundException {
         int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
         Object outputFormat = null;
-        if(conf.getUseNewMapper()) {
-            outputFormat = ReflectionUtils.newInstance(new org.apache.hadoop.mapreduce.JobContext(conf,null).getOutputFormatClass(), conf);
+        if (conf.getUseNewMapper()) {
+            outputFormat = ReflectionUtils.newInstance(new org.apache.hadoop.mapreduce.JobContext(conf, null)
+                    .getOutputFormatClass(), conf);
         } else {
             outputFormat = conf.getOutputFormat();
         }
@@ -222,23 +224,11 @@
             }
             return outputFileSplits;
         }
-
     }
 
     public HadoopWriteOperatorDescriptor(JobSpecification jobSpec, JobConf jobConf, int numMapTasks) throws Exception {
         super(jobSpec, getOutputSplits(jobConf, numMapTasks));
         this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
         checkIfCanWriteToHDFS(super.splits);
-	/*
-        FileSystem fs = FileSystem.get(jobConf);
-        if (jobConf.get("mapred.output.dir") != null) {
-            Path output = new Path(jobConf.get("mapred.output.dir"));
-            Path outputTemp = new Path(output,"_temporary");
-            if(output != null && !fs.exists(outputTemp)) {
-                    fs.mkdirs(outputTemp);
-                }
-            }
-        }
-       */
     }
 }
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
index f4dbe4a..a9f3a71 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
@@ -17,19 +17,18 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.ReflectionUtils;
 
-
 public class ClasspathBasedHadoopClassFactory implements IHadoopClassFactory {
 
     @Override
-    public Object createMapper(String mapClassName,JobConf conf) throws Exception {
+    public Object createMapper(String mapClassName, JobConf conf) throws Exception {
         Class clazz = loadClass(mapClassName);
         return ReflectionUtils.newInstance(clazz, conf);
     }
 
     @Override
-    public Object createReducer(String reduceClassName,JobConf conf) throws Exception {
+    public Object createReducer(String reduceClassName, JobConf conf) throws Exception {
         Class clazz = loadClass(reduceClassName);
-        return  ReflectionUtils.newInstance(clazz, conf);
+        return ReflectionUtils.newInstance(clazz, conf);
     }
 
     @Override
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
index 22db448..c2ffeeb 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
@@ -20,9 +20,9 @@
 
 public interface IHadoopClassFactory extends Serializable {
 
-    public Object createMapper(String mapClassName,JobConf conf) throws Exception;
+    public Object createMapper(String mapClassName, JobConf conf) throws Exception;
 
-    public Object createReducer(String reduceClassName,JobConf conf) throws Exception;
+    public Object createReducer(String reduceClassName, JobConf conf) throws Exception;
 
     public Class loadClass(String className) throws Exception;
 }
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
index 29239e3..9226c29 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
@@ -36,15 +36,15 @@
         isClasses = new Class[inputSplits.length];
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(baos);
-        if(conf.getUseNewMapper()){
+        if (conf.getUseNewMapper()) {
             for (int i = 0; i < inputSplits.length; ++i) {
-                isClasses[i] = ((org.apache.hadoop.mapreduce.InputSplit)inputSplits[i]).getClass();
-                ((Writable)inputSplits[i]).write(dos);
+                isClasses[i] = ((org.apache.hadoop.mapreduce.InputSplit) inputSplits[i]).getClass();
+                ((Writable) inputSplits[i]).write(dos);
             }
         } else {
             for (int i = 0; i < inputSplits.length; ++i) {
-                isClasses[i] = ((org.apache.hadoop.mapred.InputSplit)inputSplits[i]).getClass();
-                ((Writable)inputSplits[i]).write(dos);
+                isClasses[i] = ((org.apache.hadoop.mapred.InputSplit) inputSplits[i]).getClass();
+                ((Writable) inputSplits[i]).write(dos);
             }
         }
         dos.close();
@@ -52,17 +52,16 @@
 
     }
 
-    public Object[] toInputSplits(JobConf jobConf) throws InstantiationException, IllegalAccessException,
-            IOException {
+    public Object[] toInputSplits(JobConf jobConf) throws InstantiationException, IllegalAccessException, IOException {
         Object[] splits = new Object[isClasses.length];
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
         for (int i = 0; i < splits.length; ++i) {
             splits[i] = ReflectionUtils.newInstance(isClasses[i], jobConf);
-            if(jobConf.getUseNewMapper()){
-                ((Writable)splits[i]).readFields(dis);
-            }else {
-                ((Writable)splits[i]).readFields(dis);
-            }    
+            if (jobConf.getUseNewMapper()) {
+                ((Writable) splits[i]).readFields(dis);
+            } else {
+                ((Writable) splits[i]).readFields(dis);
+            }
         }
         return splits;
     }
diff --git a/hyracks-examples/pom.xml b/hyracks-examples/pom.xml
index 4d42049..5281140 100644
--- a/hyracks-examples/pom.xml
+++ b/hyracks-examples/pom.xml
@@ -16,5 +16,6 @@
     <module>text-example</module>
     <module>btree-example</module>
     <module>hyracks-integration-tests</module>
+    <module>hadoop-compat-example</module>
   </modules>
 </project>
diff --git a/hyracks-examples/text-example/textapp/.classpath b/hyracks-examples/text-example/textapp/.classpath
index 0dff09c..f2cc5f7 100644
--- a/hyracks-examples/text-example/textapp/.classpath
+++ b/hyracks-examples/text-example/textapp/.classpath
@@ -1,7 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <classpath>
 	<classpathentry kind="src" output="target/test-classes" path="src/test/java"/>
-	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
 	<classpathentry kind="output" path="target/classes"/>
 </classpath>