The compatibility layer now supports use of 'org.apache.hadoop.mapreduce' packages
git-svn-id: https://hyracks.googlecode.com/svn/trunk@185 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
index cc9f7d4..e4daf0b 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
@@ -69,9 +69,8 @@
try {
jobId = connection.createJob(applicationName, spec);
} catch (Exception e){
- System.out.println(" application not found, creating application");
+ System.out.println(" application not found, creating application" + applicationName);
connection.createApplication(applicationName, Utilities.getHyracksArchive(applicationName, requiredLibs));
- System.out.println(" created application :" + applicationName);
jobId = connection.createJob(applicationName, spec);
}
connection.start(jobId);
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
index 3b10272..281e58b 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
@@ -6,6 +6,6 @@
public static final String namenodeURL = "fs.default.name";
public static final String dcacheServerConfiguration = "dcacheServerConfiguration";
public static final String[] systemLibs = new String[] { "hyracksDataflowStdLib", "hyracksDataflowCommonLib",
- "hyracksDataflowHadoopLib", "hadoopCoreLib" };
+ "hyracksDataflowHadoopLib", "hadoopCoreLib"};
}
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
index 2653dc8..7be5938 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
@@ -17,6 +17,8 @@
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.util.ReflectionUtils;
import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
@@ -58,6 +60,15 @@
private int maxReducers = DEFAULT_MAX_REDUCERS;
private int exSortFrame = DEFAULT_EX_SORT_FRAME_LIMIT;
+ class NewHadoopConstants {
+ public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.inputformat.class";
+ public static final String MAP_CLASS_ATTR = "mapreduce.map.class";
+ public static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
+ public static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
+ public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.outputformat.class";
+ public static final String PARTITIONER_CLASS_ATTR = "mapreduce.partitioner.class";
+ }
+
public HadoopAdapter(String namenodeUrl) {
jobConf = new JobConf(true);
jobConf.set(FS_DEFAULT_NAME, namenodeUrl);
@@ -99,11 +110,27 @@
return recordDescriptor;
}
- private InputSplit[] getInputSplits(JobConf jobConf) throws IOException {
- InputFormat inputFormat = jobConf.getInputFormat();
- return inputFormat.getSplits(jobConf, jobConf.getNumMapTasks());
+ private Object[] getInputSplits(JobConf conf) throws IOException, ClassNotFoundException, InterruptedException {
+ if (conf.getUseNewMapper()) {
+ return getNewInputSplits(conf);
+ } else {
+ return getOldInputSplits(conf);
+ }
}
-
+
+ private org.apache.hadoop.mapreduce.InputSplit[] getNewInputSplits(JobConf conf) throws ClassNotFoundException, IOException, InterruptedException {
+ org.apache.hadoop.mapreduce.InputSplit[] splits = null;
+ JobContext context = new JobContext(conf,null);
+ org.apache.hadoop.mapreduce.InputFormat inputFormat = ReflectionUtils.newInstance(context.getInputFormatClass(),conf);
+ List<org.apache.hadoop.mapreduce.InputSplit> inputSplits = inputFormat.getSplits(context);
+ return inputSplits.toArray(new org.apache.hadoop.mapreduce.InputSplit[]{});
+ }
+
+ private InputSplit[] getOldInputSplits(JobConf conf) throws IOException {
+ InputFormat inputFormat = conf.getInputFormat();
+ return inputFormat.getSplits(conf, conf.getNumMapTasks());
+ }
+
public HadoopMapperOperatorDescriptor getMapper(JobConf conf,JobSpecification spec, IOperatorDescriptor previousOp)
throws Exception {
boolean selfRead = previousOp == null;
@@ -111,7 +138,7 @@
HadoopMapperOperatorDescriptor mapOp = null;
PartitionConstraint constraint;
if(selfRead) {
- InputSplit [] splits = getInputSplits(conf,maxMappers);
+ Object [] splits = getInputSplits(conf,maxMappers);
mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,classFactory);
mapOp.setPartitionConstraint(new PartitionCountConstraint(splits.length));
System.out.println("No of mappers :" + splits.length);
@@ -202,15 +229,26 @@
return mrOutputOperator;
}
- private InputSplit[] getInputSplits(JobConf conf, int desiredMaxMappers) throws Exception {
- InputSplit[] splits = getInputSplits(conf);
+ private long getInputSize(Object[] splits,JobConf conf) throws IOException, InterruptedException {
+ long totalInputSize =0;
+ if(conf.getUseNewMapper()) {
+ for (org.apache.hadoop.mapreduce.InputSplit split : (org.apache.hadoop.mapreduce.InputSplit[])splits) {
+ totalInputSize += split.getLength();
+ }
+ } else {
+ for (InputSplit split : (InputSplit[])splits) {
+ totalInputSize += split.getLength();
+ }
+ }
+ return totalInputSize;
+ }
+
+ private Object[] getInputSplits(JobConf conf, int desiredMaxMappers) throws Exception {
+ Object[] splits = getInputSplits(conf);
System.out.println(" initial split count :" + splits.length);
System.out.println(" desired mappers :" + desiredMaxMappers);
if (splits.length > desiredMaxMappers) {
- long totalInputSize = 0;
- for (InputSplit split : splits) {
- totalInputSize += split.getLength();
- }
+ long totalInputSize = getInputSize(splits,conf);
long goalSize = (totalInputSize/desiredMaxMappers);
System.out.println(" total input length :" + totalInputSize);
System.out.println(" goal size :" + goalSize);
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
index cc9c012..4110f04 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
@@ -40,7 +40,7 @@
}
public static File getHyracksArchive(String applicationName, Set<String> libJars) {
- String target = applicationName + ".zip";
+ String target = applicationName + ".zip";
// Create a buffer for reading the files
byte[] buf = new byte[1024];
Set<String> fileNames = new HashSet<String>();