The compatibility layer now supports use of 'org.apache.hadoop.mapreduce' packages

git-svn-id: https://hyracks.googlecode.com/svn/trunk@185 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
index cc9f7d4..e4daf0b 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
@@ -69,9 +69,8 @@
         try {
             jobId = connection.createJob(applicationName, spec);
         } catch (Exception e){
-            System.out.println(" application not found, creating application");
+            System.out.println(" application not found, creating application" + applicationName);
             connection.createApplication(applicationName, Utilities.getHyracksArchive(applicationName, requiredLibs));
-            System.out.println(" created application :" + applicationName);
             jobId = connection.createJob(applicationName, spec);
         }
         connection.start(jobId);
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
index 3b10272..281e58b 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
@@ -6,6 +6,6 @@
     public static final String namenodeURL = "fs.default.name";
     public static final String dcacheServerConfiguration = "dcacheServerConfiguration";
     public static final String[] systemLibs = new String[] { "hyracksDataflowStdLib", "hyracksDataflowCommonLib",
-            "hyracksDataflowHadoopLib", "hadoopCoreLib" };
+            "hyracksDataflowHadoopLib", "hadoopCoreLib"};
 
 }
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
index 2653dc8..7be5938 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
@@ -17,6 +17,8 @@
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.util.ReflectionUtils;
 
 import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
@@ -58,6 +60,15 @@
     private  int maxReducers = DEFAULT_MAX_REDUCERS;
     private int exSortFrame = DEFAULT_EX_SORT_FRAME_LIMIT;
     
+    class NewHadoopConstants {
+    	public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.inputformat.class";
+    	public static final String MAP_CLASS_ATTR = "mapreduce.map.class";
+    	public static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
+    	public static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
+    	public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.outputformat.class";
+    	public static final String PARTITIONER_CLASS_ATTR = "mapreduce.partitioner.class";
+    }
+    
     public HadoopAdapter(String namenodeUrl) {
         jobConf = new JobConf(true);
         jobConf.set(FS_DEFAULT_NAME, namenodeUrl);
@@ -99,11 +110,27 @@
         return recordDescriptor;
     }
 
-    private InputSplit[] getInputSplits(JobConf jobConf) throws IOException {
-        InputFormat inputFormat = jobConf.getInputFormat();
-        return inputFormat.getSplits(jobConf, jobConf.getNumMapTasks());
+    private Object[] getInputSplits(JobConf conf) throws IOException, ClassNotFoundException, InterruptedException {
+        if (conf.getUseNewMapper()) {
+        	return getNewInputSplits(conf);
+        } else {
+        	return getOldInputSplits(conf);
+        }
     }
-
+    
+    private org.apache.hadoop.mapreduce.InputSplit[] getNewInputSplits(JobConf conf) throws ClassNotFoundException, IOException, InterruptedException {
+    	org.apache.hadoop.mapreduce.InputSplit[] splits = null;
+    	JobContext context = new JobContext(conf,null);
+    	org.apache.hadoop.mapreduce.InputFormat inputFormat = ReflectionUtils.newInstance(context.getInputFormatClass(),conf);
+    	List<org.apache.hadoop.mapreduce.InputSplit> inputSplits = inputFormat.getSplits(context);
+    	return inputSplits.toArray(new org.apache.hadoop.mapreduce.InputSplit[]{});
+    }
+    
+    private InputSplit[] getOldInputSplits(JobConf conf) throws IOException  {
+      	InputFormat inputFormat = conf.getInputFormat();
+    	return inputFormat.getSplits(conf, conf.getNumMapTasks());
+    }
+    
     public HadoopMapperOperatorDescriptor getMapper(JobConf conf,JobSpecification spec, IOperatorDescriptor previousOp)
             throws Exception {
         boolean selfRead = previousOp == null;
@@ -111,7 +138,7 @@
         HadoopMapperOperatorDescriptor mapOp = null;
         PartitionConstraint constraint;
         if(selfRead) {
-            InputSplit [] splits = getInputSplits(conf,maxMappers);
+            Object [] splits = getInputSplits(conf,maxMappers);
             mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,classFactory);
             mapOp.setPartitionConstraint(new PartitionCountConstraint(splits.length));
             System.out.println("No of  mappers :" + splits.length);
@@ -202,15 +229,26 @@
         return mrOutputOperator;
     }
     
-    private InputSplit[] getInputSplits(JobConf conf, int desiredMaxMappers) throws Exception {
-        InputSplit[] splits = getInputSplits(conf);
+    private long getInputSize(Object[] splits,JobConf conf) throws IOException, InterruptedException {
+        long totalInputSize =0;
+    	if(conf.getUseNewMapper()) {
+        	for (org.apache.hadoop.mapreduce.InputSplit split : (org.apache.hadoop.mapreduce.InputSplit[])splits) {
+        	    totalInputSize += split.getLength();
+            }                                       
+        } else {
+	    	for (InputSplit split : (InputSplit[])splits) {
+	            totalInputSize += split.getLength();
+	        }
+        }
+    	return totalInputSize;
+    }
+    
+    private Object[] getInputSplits(JobConf conf, int desiredMaxMappers) throws Exception {
+        Object[] splits = getInputSplits(conf);
         System.out.println(" initial split count :" + splits.length);
         System.out.println(" desired mappers :" + desiredMaxMappers);
         if (splits.length > desiredMaxMappers) {
-            long totalInputSize = 0;
-            for (InputSplit split : splits) {
-                totalInputSize += split.getLength();
-            }
+            long totalInputSize = getInputSize(splits,conf);
             long goalSize = (totalInputSize/desiredMaxMappers);
             System.out.println(" total input length :" + totalInputSize);
             System.out.println(" goal size :" + goalSize);
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
index cc9c012..4110f04 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
@@ -40,7 +40,7 @@
     }
 
     public static File getHyracksArchive(String applicationName, Set<String> libJars) {
-        String target = applicationName + ".zip";
+       String target = applicationName + ".zip";
         // Create a buffer for reading the files
         byte[] buf = new byte[1024];
         Set<String> fileNames = new HashSet<String>();