Refactored code in comaptibility layer to support submission of jobs against existing applications + made minor changes in hadoop operators
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_hadoop_compat_changes@460 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index c3167cb..a1c14c5 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -71,16 +71,8 @@
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
jobConf = getJobConf();
populateCache(jobConf);
- try {
- mapper = createMapper();
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
conf = new JobConf(jobConf);
conf.setClassLoader(jobConf.getClassLoader());
- if (!jobConf.getUseNewMapper()) {
- ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
- }
reporter = createReporter();
}
@@ -143,6 +135,20 @@
}
this.writer = writer;
}
+
+ protected void initializeMapper() throws HyracksDataException {
+ super.initializeMapper();
+ try {
+ mapper = createMapper(conf);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ if (!conf.getUseNewMapper()) {
+ ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
+ }
+ }
+
+
}
private class ReaderMapperOperator extends MapperBaseOperator {
@@ -167,10 +173,10 @@
} else if (splitRead instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
conf.set("map.input.file", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getPath()
.toString());
- conf.setLong("map.input.start",
- ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getStart());
- conf.setLong("map.input.length",
- ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getLength());
+ conf.setLong("map.input.start", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
+ .getStart());
+ conf.setLong("map.input.length", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
+ .getLength());
}
} catch (Exception e) {
e.printStackTrace();
@@ -180,10 +186,22 @@
}
}
+ protected void initializeMapper() throws HyracksDataException {
+ super.initializeMapper();
+ updateConfWithSplit(conf);
+ try {
+ mapper = createMapper(conf);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ if (!conf.getUseNewMapper()) {
+ ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
+ }
+ }
+
public void mapInput() throws HyracksDataException, InterruptedException, ClassNotFoundException {
try {
initializeMapper();
- updateConfWithSplit(conf);
conf.setClassLoader(this.getClass().getClassLoader());
Object reader;
Object key = null;
@@ -209,7 +227,7 @@
};;;
OutputCommitter outputCommitter = new org.apache.hadoop.mapreduce.lib.output.NullOutputFormat()
- .getOutputCommitter(new TaskAttemptContext(jobConf, new TaskAttemptID()));
+ .getOutputCommitter(new TaskAttemptContext(conf, new TaskAttemptID()));
StatusReporter statusReporter = new StatusReporter() {
@Override
public void setStatus(String arg0) {
@@ -229,7 +247,7 @@
return null;
}
};;;
- context = new org.apache.hadoop.mapreduce.Mapper().new Context(jobConf, new TaskAttemptID(),
+ context = new org.apache.hadoop.mapreduce.Mapper().new Context(conf, new TaskAttemptID(),
newReader, recordWriter, outputCommitter, statusReporter,
(org.apache.hadoop.mapreduce.InputSplit) inputSplit);
newReader.initialize((org.apache.hadoop.mapreduce.InputSplit) inputSplit, context);
@@ -298,9 +316,9 @@
String mapOutputValueClassName = conf.getMapOutputValueClass().getName();
try {
if (hadoopClassFactory == null) {
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
- (Class<? extends Writable>) Class.forName(mapOutputKeyClassName),
- (Class<? extends Writable>) Class.forName(mapOutputValueClassName));
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
+ .forName(mapOutputKeyClassName), (Class<? extends Writable>) Class
+ .forName(mapOutputValueClassName));
} else {
recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
(Class<? extends Writable>) hadoopClassFactory.loadClass(mapOutputKeyClassName),
@@ -312,21 +330,21 @@
return recordDescriptor;
}
- private Object createMapper() throws Exception {
+ private Object createMapper(JobConf conf) throws Exception {
Object mapper;
if (mapperClass != null) {
- return ReflectionUtils.newInstance(mapperClass, jobConf);
+ return ReflectionUtils.newInstance(mapperClass, conf);
} else {
String mapperClassName = null;
if (jobConf.getUseNewMapper()) {
- JobContext jobContext = new JobContext(jobConf, null);
+ JobContext jobContext = new JobContext(conf, null);
mapperClass = jobContext.getMapperClass();
mapperClassName = mapperClass.getName();
} else {
- mapperClass = super.getJobConf().getMapperClass();
+ mapperClass = conf.getMapperClass();
mapperClassName = mapperClass.getName();
}
- mapper = getHadoopClassFactory().createMapper(mapperClassName, jobConf);
+ mapper = getHadoopClassFactory().createMapper(mapperClassName,conf);
}
return mapper;
}
@@ -344,8 +362,8 @@
} else {
Class inputFormatClass = conf.getInputFormat().getClass();
InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
- return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf,
- super.createReporter());
+ return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf, super
+ .createReporter());
}
}
@@ -383,8 +401,8 @@
}
return createSelfReadingMapper(ctx, env, recordDescriptor, partition);
} else {
- return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition),
- recordDescProvider.getInputRecordDescriptor(this.odId, 0));
+ return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), recordDescProvider
+ .getInputRecordDescriptor(this.odId, 0));
}
} catch (Exception e) {
throw new HyracksDataException(e);
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index 2d44c79..195b7c4 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -23,13 +23,14 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
@@ -59,66 +60,64 @@
private Object reducer;
private DataWritingOutputCollector<K3, V3> output;
private Reporter reporter;
- private ReducerContext reducerContext;
+ private ReducerContext reducerContext;
RawKeyValueIterator rawKeyValueIterator = new RawKeyValueIterator() {
-
+
@Override
public boolean next() throws IOException {
return false;
}
-
+
@Override
public DataInputBuffer getValue() throws IOException {
return null;
}
-
+
@Override
public Progress getProgress() {
return null;
}
-
+
@Override
public DataInputBuffer getKey() throws IOException {
return null;
}
-
+
@Override
public void close() throws IOException {
-
+
}
};
-
+
+
class ReducerContext extends org.apache.hadoop.mapreduce.Reducer.Context {
private HadoopReducerOperatorDescriptor.ValueIterator iterator;
-
+
@SuppressWarnings("unchecked")
- ReducerContext(org.apache.hadoop.mapreduce.Reducer reducer, JobConf conf) throws IOException,
- InterruptedException, ClassNotFoundException {
-
- reducer.super(conf, new TaskAttemptID(), rawKeyValueIterator, null, null, null, null, null, null, Class
- .forName("org.apache.hadoop.io.NullWritable"), Class
- .forName("org.apache.hadoop.io.NullWritable"));
+ ReducerContext(org.apache.hadoop.mapreduce.Reducer reducer, JobConf conf) throws IOException, InterruptedException, ClassNotFoundException{
+
+ reducer.super(conf,new TaskAttemptID(),rawKeyValueIterator,null,null,null,null,null,null,Class.forName("org.apache.hadoop.io.NullWritable"),Class.forName("org.apache.hadoop.io.NullWritable"));
}
-
- public void setIterator(HadoopReducerOperatorDescriptor.ValueIterator iter) {
+
+ public void setIterator(HadoopReducerOperatorDescriptor.ValueIterator iter) {
iterator = iter;
}
-
+
@Override
public Iterable<V2> getValues() throws IOException, InterruptedException {
- return new Iterable<V2>() {
- @Override
- public Iterator<V2> iterator() {
- return iterator;
- }
- };
+ return new Iterable<V2>() {
+ @Override
+ public Iterator<V2> iterator() {
+ return iterator;
+ }
+ };
}
-
+
/** Start processing next unique key. */
@Override
- public boolean nextKey() throws IOException, InterruptedException {
+ public boolean nextKey() throws IOException,InterruptedException {
boolean hasMore = iterator.hasNext();
- if (hasMore) {
+ if(hasMore){
nextKeyValue();
}
return hasMore;
@@ -134,25 +133,26 @@
}
public Object getCurrentKey() {
- return iterator.getKey();
+ return iterator.getKey();
}
@Override
public Object getCurrentValue() {
- return iterator.getValue();
+ return iterator.getValue();
}
-
+
/**
* Generate an output key/value pair.
*/
@Override
- public void write(Object key, Object value) throws IOException, InterruptedException {
- output.collect(key, value);
+ public void write(Object key, Object value
+ ) throws IOException, InterruptedException {
+ output.collect(key, value);
}
}
-
- public ReducerAggregator(Object reducer) throws HyracksDataException {
+
+ public ReducerAggregator(Object reducer) throws HyracksDataException{
this.reducer = reducer;
initializeReducer();
output = new DataWritingOutputCollector<K3, V3>();
@@ -201,17 +201,17 @@
i.reset(reader);
output.setWriter(writer);
try {
- if (jobConf.getUseNewReducer()) {
+ if(jobConf.getUseNewReducer()){
try {
reducerContext.setIterator(i);
- ((org.apache.hadoop.mapreduce.Reducer) reducer).run(reducerContext);
+ ((org.apache.hadoop.mapreduce.Reducer)reducer).run(reducerContext);
} catch (InterruptedException e) {
e.printStackTrace();
throw new HyracksDataException(e);
}
- } else {
- ((org.apache.hadoop.mapred.Reducer) reducer).reduce(i.getKey(), i, output, reporter);
- }
+ } else {
+ ((org.apache.hadoop.mapred.Reducer)reducer).reduce(i.getKey(), i, output, reporter);
+ }
} catch (IOException e) {
e.printStackTrace();
}
@@ -221,28 +221,28 @@
public void close() throws HyracksDataException {
// -- - close - --
try {
- if (!jobConf.getUseNewMapper()) {
- ((org.apache.hadoop.mapred.Reducer) reducer).close();
- }
+ if(!jobConf.getUseNewMapper()) {
+ ((org.apache.hadoop.mapred.Reducer)reducer).close();
+ }
} catch (IOException e) {
throw new HyracksDataException(e);
}
}
-
+
private void initializeReducer() throws HyracksDataException {
jobConf.setClassLoader(this.getClass().getClassLoader());
- if (!jobConf.getUseNewReducer()) {
- ((org.apache.hadoop.mapred.Reducer) reducer).configure(getJobConf());
+ if(!jobConf.getUseNewReducer()) {
+ ((org.apache.hadoop.mapred.Reducer)reducer).configure(getJobConf());
} else {
try {
- reducerContext = new ReducerContext((org.apache.hadoop.mapreduce.Reducer) reducer, jobConf);
+ reducerContext = new ReducerContext((org.apache.hadoop.mapreduce.Reducer)reducer,jobConf);
} catch (IOException e) {
e.printStackTrace();
throw new HyracksDataException(e);
} catch (InterruptedException e) {
e.printStackTrace();
throw new HyracksDataException(e);
- } catch (RuntimeException e) {
+ } catch (RuntimeException e){
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
@@ -259,7 +259,7 @@
public K2 getKey() {
return key;
}
-
+
public V2 getValue() {
return value;
}
@@ -310,11 +310,13 @@
private static final long serialVersionUID = 1L;
private Class reducerClass;
private IComparatorFactory comparatorFactory;
+ private boolean useAsCombiner = false;
public HadoopReducerOperatorDescriptor(JobSpecification spec, JobConf conf, IComparatorFactory comparatorFactory,
- IHadoopClassFactory classFactory) {
+ IHadoopClassFactory classFactory,boolean useAsCombiner) {
super(spec, 1, getRecordDescriptor(conf, classFactory), conf, classFactory);
this.comparatorFactory = comparatorFactory;
+ this.useAsCombiner = useAsCombiner;
}
private Object createReducer() throws Exception {
@@ -322,14 +324,22 @@
return ReflectionUtils.newInstance(reducerClass, getJobConf());
} else {
Object reducer;
- if (getJobConf().getUseNewReducer()) {
- JobContext jobContext = new JobContext(getJobConf(), null);
- reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?>>) jobContext
- .getReducerClass();
+ if(!useAsCombiner) {
+ if(getJobConf().getUseNewReducer()){
+ JobContext jobContext = new JobContext(getJobConf(), null);
+ reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?,?,?,?>> )jobContext.getReducerClass();
+ } else {
+ reducerClass = (Class<? extends Reducer>) getJobConf().getReducerClass();
+ }
} else {
- reducerClass = (Class<? extends Reducer>) getJobConf().getReducerClass();
+ if(getJobConf().getUseNewReducer()){
+ JobContext jobContext = new JobContext(getJobConf(), null);
+ reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?,?,?,?>> )jobContext.getCombinerClass();
+ } else {
+ reducerClass = (Class<? extends Reducer>) getJobConf().getCombinerClass();
+ }
}
- reducer = getHadoopClassFactory().createReducer(reducerClass.getName(), getJobConf());
+ reducer = getHadoopClassFactory().createReducer(reducerClass.getName(),getJobConf());
return reducer;
}
}
@@ -367,24 +377,23 @@
}
public static RecordDescriptor getRecordDescriptor(JobConf conf, IHadoopClassFactory classFactory) {
- String outputKeyClassName = null;
+ String outputKeyClassName =null;
String outputValueClassName = null;
-
- if (conf.getUseNewMapper()) {
- JobContext context = new JobContext(conf, null);
+
+ if(conf.getUseNewMapper()) {
+ JobContext context = new JobContext(conf,null);
outputKeyClassName = context.getOutputKeyClass().getName();
outputValueClassName = context.getOutputValueClass().getName();
} else {
outputKeyClassName = conf.getOutputKeyClass().getName();
outputValueClassName = conf.getOutputValueClass().getName();
}
-
+
RecordDescriptor recordDescriptor = null;
try {
if (classFactory == null) {
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
- (Class<? extends Writable>) Class.forName(outputKeyClassName),
- (Class<? extends Writable>) Class.forName(outputValueClassName));
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
+ .forName(outputKeyClassName), (Class<? extends Writable>) Class.forName(outputValueClassName));
} else {
recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
(Class<? extends Writable>) classFactory.loadClass(outputKeyClassName),
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
index 4dcfa61..9f3d532 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
@@ -22,11 +22,11 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -43,14 +43,15 @@
public class HadoopWriteOperatorDescriptor extends AbstractFileWriteOperatorDescriptor {
- private class HadoopFileWriter implements IRecordWriter {
+ private class HadoopFileWriter implements IRecordWriter {
Object recordWriter;
JobConf conf;
Path finalOutputFile;
Path tempOutputFile;
-
- HadoopFileWriter(Object recordWriter, Path tempOutputFile, Path outputFile, JobConf conf) {
+
+
+ HadoopFileWriter(Object recordWriter,Path tempOutputFile,Path outputFile,JobConf conf) {
this.recordWriter = recordWriter;
this.conf = conf;
this.finalOutputFile = outputFile;
@@ -59,23 +60,26 @@
@Override
public void write(Object[] record) throws Exception {
- if (conf.getUseNewMapper()) {
- ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).write(record[0], record[1]);
- } else {
- ((org.apache.hadoop.mapred.RecordWriter) recordWriter).write(record[0], record[1]);
+ if(recordWriter != null){
+ if (conf.getUseNewMapper()){
+ ((org.apache.hadoop.mapreduce.RecordWriter)recordWriter).write(record[0], record[1]);
+ } else {
+ ((org.apache.hadoop.mapred.RecordWriter)recordWriter).write(record[0], record[1]);
+ }
}
}
@Override
public void close() {
try {
- if (conf.getUseNewMapper()) {
- ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).close(new TaskAttemptContext(conf,
- new TaskAttemptID()));
- } else {
- ((org.apache.hadoop.mapred.RecordWriter) recordWriter).close(null);
- }
- FileSystem.get(conf).rename(tempOutputFile, finalOutputFile);
+ if(recordWriter != null) {
+ if (conf.getUseNewMapper()){
+ ((org.apache.hadoop.mapreduce.RecordWriter)recordWriter).close(new TaskAttemptContext(conf, new TaskAttemptID()));
+ } else {
+ ((org.apache.hadoop.mapred.RecordWriter)recordWriter).close(null);
+ }
+ FileSystem.get(conf).rename( tempOutputFile, finalOutputFile);
+ }
} catch (Exception e) {
e.printStackTrace();
}
@@ -120,83 +124,81 @@
} catch (IOException ioe) {
ioe.printStackTrace();
}
- Path path = new Path(fileSplit.getLocalFile().getFile().getPath());
Path tempOutputFile = null;
Path finalOutputFile = null;
checkIfCanWriteToHDFS(new FileSplit[] { fileSplit });
- Object recordWriter = null;
- Object outputFormat = null;
- String taskAttempId = new TaskAttemptID().toString();
- conf.set("mapred.task.id", taskAttempId);
- outputPath = new Path(conf.get("mapred.output.dir"));
- outputTempPath = new Path(outputPath, "_temporary");
- if (outputPath != null && !fileSystem.exists(outputPath)) {
- fileSystem.mkdirs(outputTempPath);
+ Object recordWriter = null;
+ if(! (conf.getOutputFormat() instanceof NullOutputFormat)) {
+ boolean isMap = conf.getNumReduceTasks() == 0;
+ TaskAttemptID taskAttempId = new TaskAttemptID("0",index,isMap,index,index);
+ conf.set("mapred.task.id",taskAttempId.toString());
+ outputPath = new Path(conf.get("mapred.output.dir"));
+ outputTempPath = new Path(outputPath,"_temporary");
+ String suffix = new String("part-r-00000");
+ suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
+ suffix = suffix + index;
+ tempOutputFile = new Path(outputTempPath,new Path("_" + taskAttempId.toString()));
+ if (conf.getNumReduceTasks() == 0 ) {
+ suffix=suffix.replace("-r-", "-m-");
+ }
+ tempOutputFile = new Path(tempOutputFile,suffix);
+ finalOutputFile = new Path(outputPath,suffix);
+ if(conf.getUseNewMapper()){
+ org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat)ReflectionUtils.newInstance((new JobContext(conf,null)).getOutputFormatClass(),conf);
+ recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, taskAttempId));
+ }else {
+ recordWriter = conf.getOutputFormat().getRecordWriter(fileSystem, conf,suffix, new Progressable() {
+ @Override
+ public void progress() {}
+ });
+ }
}
- String suffix = new String("part-r-00000");
- suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
- suffix = suffix + index;
- tempOutputFile = new Path(outputTempPath, "_" + taskAttempId + "/" + suffix);
- if (conf.getNumReduceTasks() == 0) {
- suffix.replace("-r-", "-m-");
- }
- finalOutputFile = new Path(outputPath, suffix);
- if (conf.getUseNewMapper()) {
- org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat) ReflectionUtils
- .newInstance((new JobContext(conf, null)).getOutputFormatClass(), conf);
- recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, new TaskAttemptID()));
- } else {
- recordWriter = conf.getOutputFormat().getRecordWriter(fileSystem, conf, suffix, new Progressable() {
- @Override
- public void progress() {
- }
- });
- }
-
+
return new HadoopFileWriter(recordWriter, tempOutputFile, finalOutputFile, conf);
}
+
Path outputPath;
Path outputTempPath;
-
+
protected Reporter createReporter() {
- return new Reporter() {
- @Override
- public Counter getCounter(Enum<?> name) {
- return null;
- }
+ return new Reporter() {
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ return null;
+ }
- @Override
- public Counter getCounter(String group, String name) {
- return null;
- }
+ @Override
+ public Counter getCounter(String group, String name) {
+ return null;
+ }
- @Override
- public InputSplit getInputSplit() throws UnsupportedOperationException {
- return null;
- }
+ @Override
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ return null;
+ }
- @Override
- public void incrCounter(Enum<?> key, long amount) {
+ @Override
+ public void incrCounter(Enum<?> key, long amount) {
- }
+ }
- @Override
- public void incrCounter(String group, String counter, long amount) {
+ @Override
+ public void incrCounter(String group, String counter, long amount) {
- }
+ }
- @Override
- public void progress() {
+ @Override
+ public void progress() {
- }
+ }
- @Override
- public void setStatus(String status) {
+ @Override
+ public void setStatus(String status) {
- }
- };
- }
+ }
+ };
+}
private boolean checkIfCanWriteToHDFS(FileSplit[] fileSplits) throws Exception {
JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
@@ -218,8 +220,8 @@
private static FileSplit[] getOutputSplits(JobConf conf, int noOfMappers) throws ClassNotFoundException {
int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
Object outputFormat = null;
- if (conf.getUseNewMapper()) {
- outputFormat = ReflectionUtils.newInstance(new JobContext(conf, null).getOutputFormatClass(), conf);
+ if(conf.getUseNewMapper()) {
+ outputFormat = ReflectionUtils.newInstance(new JobContext(conf,null).getOutputFormatClass(), conf);
} else {
outputFormat = conf.getOutputFormat();
}
@@ -234,13 +236,11 @@
FileSplit[] outputFileSplits = new FileSplit[numOutputters];
String absolutePath = FileOutputFormat.getOutputPath(conf).toString();
- System.out.println("absolute path:" + absolutePath);
for (int index = 0; index < numOutputters; index++) {
String suffix = new String("part-00000");
suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
suffix = suffix + index;
String outputPath = absolutePath + "/" + suffix;
- System.out.println("output path :" + outputPath);
outputFileSplits[index] = new FileSplit("localhost", outputPath);
}
return outputFileSplits;
@@ -252,5 +252,14 @@
super(jobSpec, getOutputSplits(jobConf, numMapTasks));
this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
checkIfCanWriteToHDFS(super.splits);
+ FileSystem fs = FileSystem.get(jobConf);
+ if (jobConf.get("mapred.output.dir") != null) {
+ Path output = new Path(jobConf.get("mapred.output.dir"));
+ Path outputTemp = new Path(output,"_temporary");
+ if(output != null && !fs.exists(outputTemp)) {
+ fs.mkdirs(outputTemp);
+ }
+ }
+ }
}
-}
+
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
index e4daf0b..a363221 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
@@ -1,93 +1,91 @@
package edu.uci.ics.hyracks.hadoop.compat.client;
-import java.io.File;
-import java.util.List;
+import java.util.EnumSet;
+import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
-import org.apache.hadoop.mapred.JobConf;
-
-import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
-import edu.uci.ics.hyracks.hadoop.compat.util.HadoopAdapter;
-import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
-import edu.uci.ics.hyracks.hadoop.compat.client.HyracksRunningJob;
import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
+import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
public class HyracksClient {
- private HadoopAdapter hadoopAdapter;
- private static HyracksRMIConnection connection;
- private static final String applicationName = "CompatibilityLayer";
+ private static HyracksRMIConnection connection;
+ private static final String jobProfilingKey = "jobProfilingKey";
+ Set<String> systemLibs;
- public HyracksClient(String clusterConf) throws Exception {
- Properties properties = Utilities.getProperties(clusterConf, '=');
- String clusterController = (String) properties.get(ConfigurationConstants.clusterControllerHost);
- String fileSystem = (String) properties.get(ConfigurationConstants.namenodeURL);
- initialize(clusterController, fileSystem);
- }
+ public HyracksClient(Properties clusterProperties) throws Exception {
+ initialize(clusterProperties);
+ }
- public HyracksClient(String clusterControllerAddr, String fileSystem) throws Exception {
- initialize(clusterControllerAddr, fileSystem);
- }
+ private void initialize(Properties properties) throws Exception {
+ String clusterController = (String) properties
+ .get(ConfigurationConstants.clusterControllerHost);
+ connection = new HyracksRMIConnection(clusterController, 1099);
+ systemLibs = new HashSet<String>();
+ for (String systemLib : ConfigurationConstants.systemLibs) {
+ String systemLibPath = properties.getProperty(systemLib);
+ if (systemLibPath != null) {
+ systemLibs.add(systemLibPath);
+ }
+ }
+ }
- private void initialize(String clusterControllerAddr, String namenodeUrl) throws Exception {
- connection = new HyracksRMIConnection(clusterControllerAddr, 1099);
- connection.destroyApplication(applicationName);
- hadoopAdapter = new HadoopAdapter(namenodeUrl);
- }
+ public HyracksClient(String clusterConf, char delimiter) throws Exception {
+ Properties properties = Utilities.getProperties(clusterConf, delimiter);
+ initialize(properties);
+ }
- public HyracksRunningJob submitJobs(List<JobConf> confs, Set<String> requiredLibs) throws Exception {
- JobSpecification spec = hadoopAdapter.getJobSpecification(confs);
- String appName = getApplicationNameHadoopJob(confs.get(0));
- return submitJob(appName,spec, requiredLibs);
- }
+ private Set<String> getRequiredLibs(Set<String> userLibs) {
+ Set<String> requiredLibs = new HashSet<String>();
+ for (String systemLib : systemLibs) {
+ requiredLibs.add(systemLib);
+ }
+ for (String userLib : userLibs) {
+ requiredLibs.add(userLib);
+ }
+ return requiredLibs;
+ }
- private String getApplicationNameHadoopJob(JobConf jobConf) {
- String jar = jobConf.getJar();
- if( jar != null){
- return jar.substring(jar.lastIndexOf("/") >=0 ? jar.lastIndexOf("/") +1 : 0);
- }else {
- return "" + System.currentTimeMillis();
- }
- }
-
- public HyracksRunningJob submitJob(JobConf conf, Set<String> requiredLibs) throws Exception {
- JobSpecification spec = hadoopAdapter.getJobSpecification(conf);
- String appName = getApplicationNameHadoopJob(conf);
- return submitJob(appName, spec, requiredLibs);
- }
+ public JobStatus getJobStatus(UUID jobId) throws Exception {
+ return connection.getJobStatus(jobId);
+ }
- public JobStatus getJobStatus(UUID jobId) throws Exception {
- return connection.getJobStatus(jobId);
- }
+ private void createApplication(String applicationName, Set<String> userLibs)
+ throws Exception {
+ connection.createApplication(applicationName, Utilities
+ .getHyracksArchive(applicationName, getRequiredLibs(userLibs)));
+ }
- public HyracksRunningJob submitJob(String applicationName, JobSpecification spec, Set<String> requiredLibs) throws Exception {
- UUID jobId = null;
- try {
- jobId = connection.createJob(applicationName, spec);
- } catch (Exception e){
- System.out.println(" application not found, creating application" + applicationName);
- connection.createApplication(applicationName, Utilities.getHyracksArchive(applicationName, requiredLibs));
- jobId = connection.createJob(applicationName, spec);
- }
- connection.start(jobId);
- HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
- return runningJob;
- }
+ public HyracksRunningJob submitJob(String applicationName,
+ JobSpecification spec) throws Exception {
+ String jobProfilingVal = System.getenv(jobProfilingKey);
+ boolean doProfiling = ("true".equalsIgnoreCase(jobProfilingVal));
+ UUID jobId;
+ if (doProfiling) {
+ System.out.println("PROFILING");
+ jobId = connection.createJob(applicationName, spec, EnumSet
+ .of(JobFlag.PROFILE_RUNTIME));
+ } else {
+ jobId = connection.createJob(applicationName, spec);
+ }
+ connection.start(jobId);
+ HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
+ return runningJob;
+ }
- public HadoopAdapter getHadoopAdapter() {
- return hadoopAdapter;
- }
+ public HyracksRunningJob submitJob(String applicationName,
+ JobSpecification spec, Set<String> userLibs) throws Exception {
+ createApplication(applicationName, userLibs);
+ return submitJob(applicationName, spec);
+ }
- public void setHadoopAdapter(HadoopAdapter hadoopAdapter) {
- this.hadoopAdapter = hadoopAdapter;
- }
-
- public void waitForCompleton(UUID jobId) throws Exception {
- connection.waitForCompletion(jobId);
- }
-
+ public void waitForCompleton(UUID jobId) throws Exception {
+ connection.waitForCompletion(jobId);
+ }
}
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
index 0b96041..37f4d34 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
@@ -1,5 +1,6 @@
package edu.uci.ics.hyracks.hadoop.compat.driver;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -25,175 +26,183 @@
public class CompatibilityLayer {
- HyracksClient hyracksClient;
- DCacheHandler dCacheHander = null;
- Properties clusterConf;
- Set<String> systemLibs;
+ HyracksClient hyracksClient;
+ DCacheHandler dCacheHander = null;
+ Properties clusterConf;
+ HadoopAdapter hadoopAdapter;
- private static char configurationFileDelimiter = '=';
- private static final String dacheKeyPrefix = "dcache.key";
+ private static char configurationFileDelimiter = '=';
+ private static final String dacheKeyPrefix = "dcache.key";
- public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception {
- initialize(clConfig);
- }
+ public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception {
+ initialize(clConfig);
+ }
- public HyracksRunningJob submitJobs(String[] jobFiles, Set<String> userLibs) throws Exception {
- Set<String> requiredLibs = getRequiredLibs(userLibs);
- List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
- Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFiles[0]);
- String tempDir = "/tmp";
- if (dcacheTasks.size() > 0) {
- HadoopAdapter hadoopAdapter = hyracksClient.getHadoopAdapter();
- for (String key : dcacheTasks.keySet()) {
- String destPath = tempDir + "/" + key + System.currentTimeMillis();
- hadoopAdapter.getHDFSClient().copyToLocalFile(new Path(dcacheTasks.get(key)), new Path(destPath));
- System.out.println(" source :" + dcacheTasks.get(key));
- System.out.println(" dest :" + destPath);
- System.out.println(" key :" + key);
- System.out.println(" value :" + destPath);
- dCacheHander.put(key, destPath);
- }
- }
- HyracksRunningJob hyraxRunningJob = hyracksClient.submitJobs(jobConfs, requiredLibs);
- return hyraxRunningJob;
- }
+ private void initialize(CompatibilityConfig clConfig) throws Exception {
+ clusterConf = Utilities.getProperties(clConfig.clusterConf,
+ configurationFileDelimiter);
+ hadoopAdapter = new HadoopAdapter(clusterConf
+ .getProperty(ConfigurationConstants.namenodeURL));
+ hyracksClient = new HyracksClient(clusterConf);
+ dCacheHander = new DCacheHandler(clusterConf
+ .getProperty(ConfigurationConstants.dcacheServerConfiguration));
+ }
- private Set<String> getRequiredLibs(Set<String> userLibs) {
- Set<String> requiredLibs = new HashSet<String>();
- for (String systemLib : systemLibs) {
- requiredLibs.add(systemLib);
- }
- for (String userLib : userLibs) {
- requiredLibs.add(userLib);
- }
- return requiredLibs;
- }
+ public HyracksRunningJob submitJob(JobConf conf,Set<String> userLibs) throws Exception {
+ List<JobConf> jobConfs = new ArrayList<JobConf>();
+ jobConfs.add(conf);
+ String applicationName = conf.getJobName() + System.currentTimeMillis();
+ JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+ HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
+ applicationName, spec, userLibs);
+ return hyracksRunningJob;
+ }
+
+ public HyracksRunningJob submitJobs(String applicationName,
+ String[] jobFiles, Set<String> userLibs) throws Exception {
+ List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
+ populateDCache(jobFiles[0]);
+ JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+ HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
+ applicationName, spec, userLibs);
+ return hyracksRunningJob;
+ }
- private void initialize(CompatibilityConfig clConfig) throws Exception {
- clusterConf = Utilities.getProperties(clConfig.clusterConf, configurationFileDelimiter);
- systemLibs = new HashSet<String>();
- for (String systemLib : ConfigurationConstants.systemLibs) {
- String systemLibPath = clusterConf.getProperty(systemLib);
- if (systemLibPath != null) {
- systemLibs.add(systemLibPath);
- }
- }
- String clusterControllerHost = clusterConf.getProperty(ConfigurationConstants.clusterControllerHost);
- String dacheServerConfiguration = clusterConf.getProperty(ConfigurationConstants.dcacheServerConfiguration);
- String fileSystem = clusterConf.getProperty(ConfigurationConstants.namenodeURL);
- hyracksClient = new HyracksClient(clusterControllerHost, fileSystem);
- try {
- dCacheHander = new DCacheHandler(dacheServerConfiguration);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
+ public HyracksRunningJob submitJobs(String applicationName,
+ String[] jobFiles) throws Exception {
+ List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
+ populateDCache(jobFiles[0]);
+ JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+ HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
+ applicationName, spec);
+ return hyracksRunningJob;
+ }
- private Map<String, String> initializeCustomProperties(Properties properties, String prefix) {
- Map<String, String> foundProperties = new HashMap<String, String>();
- Set<Entry<Object, Object>> entrySet = properties.entrySet();
- for (Entry entry : entrySet) {
- String key = (String) entry.getKey();
- String value = (String) entry.getValue();
- if ((key.startsWith(prefix))) {
- String actualKey = key.substring(prefix.length() + 1); // "cut off '<prefix>.' from the beginning"
- foundProperties.put(actualKey, value);
- }
- }
- return foundProperties;
- }
+ private void populateDCache(String jobFile) throws IOException {
+ Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFile);
+ String tempDir = "/tmp";
+ if (dcacheTasks.size() > 0) {
+ for (String key : dcacheTasks.keySet()) {
+ String destPath = tempDir + "/" + key
+ + System.currentTimeMillis();
+ hadoopAdapter.getHDFSClient().copyToLocalFile(
+ new Path(dcacheTasks.get(key)), new Path(destPath));
+ System.out.println(" source :" + dcacheTasks.get(key));
+ System.out.println(" dest :" + destPath);
+ System.out.println(" key :" + key);
+ System.out.println(" value :" + destPath);
+ dCacheHander.put(key, destPath);
+ }
+ }
+ }
- public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) {
- Properties jobProperties = Utilities.getProperties(jobFile, ',');
- Map<String, String> dcacheTasks = new HashMap<String, String>();
- Map<String, String> dcacheKeys = initializeCustomProperties(jobProperties, dacheKeyPrefix);
- for (String key : dcacheKeys.keySet()) {
- String sourcePath = dcacheKeys.get(key);
- if (sourcePath != null) {
- dcacheTasks.put(key, sourcePath);
- }
- }
- return dcacheTasks;
- }
+ private String getApplicationNameForHadoopJob(JobConf jobConf) {
+ String jar = jobConf.getJar();
+ if (jar != null) {
+ return jar.substring(jar.lastIndexOf("/") >= 0 ? jar
+ .lastIndexOf("/") + 1 : 0);
+ } else {
+ return "" + System.currentTimeMillis();
+ }
+ }
- public void waitForCompletion(UUID jobId) throws Exception {
- hyracksClient.waitForCompleton(jobId);
- }
+ private Map<String, String> initializeCustomProperties(
+ Properties properties, String prefix) {
+ Map<String, String> foundProperties = new HashMap<String, String>();
+ Set<Entry<Object, Object>> entrySet = properties.entrySet();
+ for (Entry entry : entrySet) {
+ String key = (String) entry.getKey();
+ String value = (String) entry.getValue();
+ if ((key.startsWith(prefix))) {
+ String actualKey = key.substring(prefix.length() + 1); // "cut off '<prefix>.' from the beginning"
+ foundProperties.put(actualKey, value);
+ }
+ }
+ return foundProperties;
+ }
- public HyracksRunningJob submitHadoopJobToHyrax(JobConf jobConf, Set<String> userLibs) {
- HyracksRunningJob hyraxRunningJob = null;
- List<JobConf> jobConfs = new ArrayList<JobConf>();
- jobConfs.add(jobConf);
- try {
- hyraxRunningJob = hyracksClient.submitJobs(jobConfs, getRequiredLibs(userLibs));
- System.out.println(" Result in " + jobConf.get("mapred.output.dir"));
- } catch (Exception e) {
- e.printStackTrace();
- }
- return hyraxRunningJob;
- }
+ public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) {
+ Properties jobProperties = Utilities.getProperties(jobFile, ',');
+ Map<String, String> dcacheTasks = new HashMap<String, String>();
+ Map<String, String> dcacheKeys = initializeCustomProperties(
+ jobProperties, dacheKeyPrefix);
+ for (String key : dcacheKeys.keySet()) {
+ String sourcePath = dcacheKeys.get(key);
+ if (sourcePath != null) {
+ dcacheTasks.put(key, sourcePath);
+ }
+ }
+ return dcacheTasks;
+ }
- public HyracksRunningJob submitJob(String appName, JobSpecification jobSpec, Set<String> userLibs) {
- HyracksRunningJob hyraxRunningJob = null;
- try {
- hyraxRunningJob = hyracksClient.submitJob(appName, jobSpec, getRequiredLibs(userLibs));
- } catch (Exception e) {
- e.printStackTrace();
- }
- return hyraxRunningJob;
- }
+ public void waitForCompletion(UUID jobId) throws Exception {
+ hyracksClient.waitForCompleton(jobId);
+ }
- private List<JobConf> constructHadoopJobConfs(String[] jobFiles) throws Exception {
- List<JobConf> jobConfs = new ArrayList<JobConf>();
- for (String jobFile : jobFiles) {
- jobConfs.add(constructHadoopJobConf(jobFile));
- }
- return jobConfs;
- }
+ private List<JobConf> constructHadoopJobConfs(String[] jobFiles)
+ throws Exception {
+ List<JobConf> jobConfs = new ArrayList<JobConf>();
+ for (String jobFile : jobFiles) {
+ jobConfs.add(constructHadoopJobConf(jobFile));
+ }
+ return jobConfs;
+ }
- private JobConf constructHadoopJobConf(String jobFile) {
- Properties jobProperties = Utilities.getProperties(jobFile, '=');
- JobConf conf = hyracksClient.getHadoopAdapter().getConf();
- for (Entry entry : jobProperties.entrySet()) {
- conf.set((String) entry.getKey(), (String) entry.getValue());
- System.out.println((String) entry.getKey() + " : " + (String) entry.getValue());
- }
- return conf;
- }
+ private JobConf constructHadoopJobConf(String jobFile) {
+ Properties jobProperties = Utilities.getProperties(jobFile, '=');
+ JobConf conf = new JobConf(hadoopAdapter.getConf());
+ for (Entry entry : jobProperties.entrySet()) {
+ conf.set((String) entry.getKey(), (String) entry.getValue());
+ System.out.println((String) entry.getKey() + " : "
+ + (String) entry.getValue());
+ }
+ return conf;
+ }
- private String[] getJobs(CompatibilityConfig clConfig) {
- return clConfig.jobFiles == null ? new String[0] : clConfig.jobFiles.split(",");
- }
+ private String[] getJobs(CompatibilityConfig clConfig) {
+ return clConfig.jobFiles == null ? new String[0] : clConfig.jobFiles
+ .split(",");
+ }
- public static void main(String args[]) throws Exception {
- long startTime = System.nanoTime();
- CompatibilityConfig clConfig = new CompatibilityConfig();
- CmdLineParser cp = new CmdLineParser(clConfig);
- try {
- cp.parseArgument(args);
- } catch (Exception e) {
- System.err.println(e.getMessage());
- cp.printUsage(System.err);
- return;
- }
- CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
- String[] jobFiles = compatLayer.getJobs(clConfig);
- String[] tempUserLibs = clConfig.userLibs == null ? new String[0] : clConfig.userLibs.split(",");
- Set<String> userLibs = new HashSet<String>();
- for(String userLib : tempUserLibs) {
- userLibs.add(userLib);
- }
- HyracksRunningJob hyraxRunningJob = null;
- try {
- hyraxRunningJob = compatLayer.submitJobs(jobFiles, userLibs);
- compatLayer.waitForCompletion(hyraxRunningJob.getJobId());
- } catch (Exception e) {
- e.printStackTrace();
- }
- hyraxRunningJob.waitForCompletion();
- long end_time = System.nanoTime();
- System.out.println("TOTAL TIME (from Launch to Completion):" + ((end_time - startTime) / (float) 1000000000.0)
- + " seconds.");
- }
-
+ public static void main(String args[]) throws Exception {
+ long startTime = System.nanoTime();
+ CompatibilityConfig clConfig = new CompatibilityConfig();
+ CmdLineParser cp = new CmdLineParser(clConfig);
+ try {
+ cp.parseArgument(args);
+ } catch (Exception e) {
+ System.err.println(e.getMessage());
+ cp.printUsage(System.err);
+ return;
+ }
+ CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
+ String applicationName = clConfig.applicationName;
+ String[] jobFiles = compatLayer.getJobs(clConfig);
+ String[] userLibraries = null;
+ if (clConfig.userLibs != null) {
+ userLibraries = clConfig.userLibs.split(",");
+ }
+ try {
+ HyracksRunningJob hyraxRunningJob = null;
+ if (userLibraries != null) {
+ Set<String> userLibs = new HashSet<String>();
+ for (String userLib : userLibraries) {
+ userLibs.add(userLib);
+ }
+ hyraxRunningJob = compatLayer.submitJobs(applicationName,
+ jobFiles, userLibs);
+ } else {
+ hyraxRunningJob = compatLayer.submitJobs(applicationName,
+ jobFiles);
+ }
+ compatLayer.waitForCompletion(hyraxRunningJob.getJobId());
+ long end_time = System.nanoTime();
+ System.out.println("TOTAL TIME (from Launch to Completion):"
+ + ((end_time - startTime) / (float) 1000000000.0)
+ + " seconds.");
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
}
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
index 1dd266f..6d94bc7 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
@@ -4,17 +4,20 @@
public class CompatibilityConfig {
- @Option(name = "-cluster", required = true, usage = "Defines the path to the configuration file that provides the following info: +"
- + " (1) Address of HyracksClusterController service" + " (2) Address of Hadoop namenode service")
- public String clusterConf;
+ @Option(name = "-cluster", required = true, usage = "Defines the path to the configuration file that provides the following info: +"
+ + " (1) Address of HyracksClusterController service"
+ + " (2) Address of Hadoop namenode service")
+ public String clusterConf;
- @Option(name = "-jobFiles", usage = "Comma separated list of jobFiles. "
- + "Each job file defines the hadoop job + " + "The order in the list defines the sequence in which"
- + "the jobs are to be executed")
- public String jobFiles;
+ @Option(name = "-jobFiles", usage = "Comma separated list of jobFiles. "
+ + "Each job file defines the hadoop job + "
+ + "The order in the list defines the sequence in which"
+ + "the jobs are to be executed")
+ public String jobFiles;
- @Option(name = "-userLibs", usage = " A comma separated list of jar files that are required to be addedd to classpath when running "
- + " mappers/reducers etc ")
- public String userLibs;
+ @Option(name = "-applicationName", usage = " The application as part of which the job executes")
+ public String applicationName;
+ @Option(name = "-userLibs", usage = " A comma separated list of jar files that are required to be addedd to classpath when running ")
+ public String userLibs;
}
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
index d0df7f1..f2f7d03 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
@@ -47,310 +47,360 @@
public class HadoopAdapter {
- public static final String FS_DEFAULT_NAME = "fs.default.name";
- private JobConf jobConf;
- private Map<OperatorDescriptorId,Integer> operatorInstanceCount = new HashMap<OperatorDescriptorId,Integer>();
- public static final String HYRACKS_EX_SORT_FRAME_LIMIT = "HYRACKS_EX_SORT_FRAME_LIMIT";
- public static final int DEFAULT_EX_SORT_FRAME_LIMIT = 4096;
- public static final int DEFAULT_MAX_MAPPERS = 40;
- public static final int DEFAULT_MAX_REDUCERS= 40;
- public static final String MAX_MAPPERS_KEY = "maxMappers";
- public static final String MAX_REDUCERS_KEY = "maxReducers";
- public static final String EX_SORT_FRAME_LIMIT_KEY = "sortFrameLimit";
-
- private int maxMappers = DEFAULT_MAX_MAPPERS;
- private int maxReducers = DEFAULT_MAX_REDUCERS;
- private int exSortFrame = DEFAULT_EX_SORT_FRAME_LIMIT;
-
- class NewHadoopConstants {
- public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.inputformat.class";
- public static final String MAP_CLASS_ATTR = "mapreduce.map.class";
- public static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
- public static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
- public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.outputformat.class";
- public static final String PARTITIONER_CLASS_ATTR = "mapreduce.partitioner.class";
- }
-
- public HadoopAdapter(String namenodeUrl) {
- jobConf = new JobConf(true);
- jobConf.set(FS_DEFAULT_NAME, namenodeUrl);
- if(System.getenv(MAX_MAPPERS_KEY) != null) {
- maxMappers = Integer.parseInt(System.getenv(MAX_MAPPERS_KEY));
- }
- if(System.getenv(MAX_REDUCERS_KEY) != null) {
- maxReducers= Integer.parseInt(System.getenv(MAX_REDUCERS_KEY));
- }
- if(System.getenv(EX_SORT_FRAME_LIMIT_KEY) != null) {
- exSortFrame= Integer.parseInt(System.getenv(EX_SORT_FRAME_LIMIT_KEY));
- }
- }
+ public static final String FS_DEFAULT_NAME = "fs.default.name";
+ private JobConf jobConf;
+ private Map<OperatorDescriptorId, Integer> operatorInstanceCount = new HashMap<OperatorDescriptorId, Integer>();
+ public static final String HYRACKS_EX_SORT_FRAME_LIMIT = "HYRACKS_EX_SORT_FRAME_LIMIT";
+ public static final int DEFAULT_EX_SORT_FRAME_LIMIT = 4096;
+ public static final int DEFAULT_MAX_MAPPERS = 40;
+ public static final int DEFAULT_MAX_REDUCERS = 40;
+ public static final String MAX_MAPPERS_KEY = "maxMappers";
+ public static final String MAX_REDUCERS_KEY = "maxReducers";
+ public static final String EX_SORT_FRAME_LIMIT_KEY = "sortFrameLimit";
- private String getEnvironmentVariable(String key, String def) {
- String ret = System.getenv(key);
- return ret != null ? ret : def;
- }
-
- public JobConf getConf() {
- return jobConf;
- }
+ private int maxMappers = DEFAULT_MAX_MAPPERS;
+ private int maxReducers = DEFAULT_MAX_REDUCERS;
+ private int exSortFrame = DEFAULT_EX_SORT_FRAME_LIMIT;
- public static VersionedProtocol getProtocol(Class protocolClass, InetSocketAddress inetAddress, JobConf jobConf)
- throws IOException {
- VersionedProtocol versionedProtocol = RPC.getProxy(protocolClass, ClientProtocol.versionID, inetAddress,
- jobConf);
- return versionedProtocol;
- }
+ class NewHadoopConstants {
+ public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.inputformat.class";
+ public static final String MAP_CLASS_ATTR = "mapreduce.map.class";
+ public static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
+ public static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
+ public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.outputformat.class";
+ public static final String PARTITIONER_CLASS_ATTR = "mapreduce.partitioner.class";
+ }
- private static RecordDescriptor getHadoopRecordDescriptor(String className1, String className2) {
- RecordDescriptor recordDescriptor = null;
- try {
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
- .forName(className1), (Class<? extends Writable>) Class.forName(className2));
- } catch (ClassNotFoundException cnfe) {
- cnfe.printStackTrace();
- }
- return recordDescriptor;
- }
+ public HadoopAdapter(String namenodeUrl) {
+ jobConf = new JobConf(true);
+ jobConf.set(FS_DEFAULT_NAME, namenodeUrl);
+ if (System.getenv(MAX_MAPPERS_KEY) != null) {
+ maxMappers = Integer.parseInt(System.getenv(MAX_MAPPERS_KEY));
+ }
+ if (System.getenv(MAX_REDUCERS_KEY) != null) {
+ maxReducers = Integer.parseInt(System.getenv(MAX_REDUCERS_KEY));
+ }
+ if (System.getenv(EX_SORT_FRAME_LIMIT_KEY) != null) {
+ exSortFrame = Integer.parseInt(System
+ .getenv(EX_SORT_FRAME_LIMIT_KEY));
+ }
+ }
- private Object[] getInputSplits(JobConf conf) throws IOException, ClassNotFoundException, InterruptedException {
- if (conf.getUseNewMapper()) {
- return getNewInputSplits(conf);
- } else {
- return getOldInputSplits(conf);
- }
- }
-
- private org.apache.hadoop.mapreduce.InputSplit[] getNewInputSplits(JobConf conf) throws ClassNotFoundException, IOException, InterruptedException {
- org.apache.hadoop.mapreduce.InputSplit[] splits = null;
- JobContext context = new JobContext(conf,null);
- org.apache.hadoop.mapreduce.InputFormat inputFormat = ReflectionUtils.newInstance(context.getInputFormatClass(),conf);
- List<org.apache.hadoop.mapreduce.InputSplit> inputSplits = inputFormat.getSplits(context);
- return inputSplits.toArray(new org.apache.hadoop.mapreduce.InputSplit[]{});
- }
-
- private InputSplit[] getOldInputSplits(JobConf conf) throws IOException {
- InputFormat inputFormat = conf.getInputFormat();
- return inputFormat.getSplits(conf, conf.getNumMapTasks());
- }
-
- private void configurePartitionCountConstraint(JobSpecification spec, IOperatorDescriptor operator,int instanceCount){
- PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, instanceCount);
- operatorInstanceCount.put(operator.getOperatorId(),instanceCount);
- }
+ private String getEnvironmentVariable(String key, String def) {
+ String ret = System.getenv(key);
+ return ret != null ? ret : def;
+ }
- public HadoopMapperOperatorDescriptor getMapper(JobConf conf,JobSpecification spec, IOperatorDescriptor previousOp)
- throws Exception {
- boolean selfRead = previousOp == null;
- IHadoopClassFactory classFactory = new ClasspathBasedHadoopClassFactory();
- HadoopMapperOperatorDescriptor mapOp = null;
- if(selfRead) {
- Object [] splits = getInputSplits(conf,maxMappers);
- mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,classFactory);
- configurePartitionCountConstraint(spec,mapOp,splits.length);
- System.out.println("No of mappers :" + splits.length);
- } else {
- configurePartitionCountConstraint(spec,mapOp,getInstanceCount(previousOp));
- mapOp = new HadoopMapperOperatorDescriptor(spec,conf,classFactory);
- spec.connect(new OneToOneConnectorDescriptor(spec), previousOp, 0, mapOp, 0);
- }
- return mapOp;
- }
+ public JobConf getConf() {
+ return jobConf;
+ }
- public HadoopReducerOperatorDescriptor getReducer(JobConf conf, JobSpecification spec) {
- HadoopReducerOperatorDescriptor reduceOp = new HadoopReducerOperatorDescriptor(spec, conf, null,
- new ClasspathBasedHadoopClassFactory());
- return reduceOp;
- }
+ public static VersionedProtocol getProtocol(Class protocolClass,
+ InetSocketAddress inetAddress, JobConf jobConf) throws IOException {
+ VersionedProtocol versionedProtocol = RPC.getProxy(protocolClass,
+ ClientProtocol.versionID, inetAddress, jobConf);
+ return versionedProtocol;
+ }
- public FileSystem getHDFSClient() {
- FileSystem fileSystem = null;
- try {
- fileSystem = FileSystem.get(jobConf);
- } catch (IOException ioe) {
- ioe.printStackTrace();
- }
- return fileSystem;
- }
+ private static RecordDescriptor getHadoopRecordDescriptor(
+ String className1, String className2) {
+ RecordDescriptor recordDescriptor = null;
+ try {
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+ (Class<? extends Writable>) Class.forName(className1),
+ (Class<? extends Writable>) Class.forName(className2));
+ } catch (ClassNotFoundException cnfe) {
+ cnfe.printStackTrace();
+ }
+ return recordDescriptor;
+ }
- public JobSpecification getJobSpecification(List<JobConf> jobConfs) throws Exception {
- JobSpecification spec = null;
- if (jobConfs.size() == 1) {
- spec = getJobSpecification(jobConfs.get(0));
- } else {
- spec = getPipelinedSpec(jobConfs);
- }
- return spec;
- }
+ private Object[] getInputSplits(JobConf conf) throws IOException,
+ ClassNotFoundException, InterruptedException {
+ if (conf.getUseNewMapper()) {
+ return getNewInputSplits(conf);
+ } else {
+ return getOldInputSplits(conf);
+ }
+ }
- private IOperatorDescriptor configureOutput( IOperatorDescriptor previousOperator, JobConf conf,
- JobSpecification spec) throws Exception {
- int instanceCountPreviousOperator = operatorInstanceCount.get(previousOperator.getOperatorId());
- int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : instanceCountPreviousOperator;
- HadoopWriteOperatorDescriptor writer = null;
- writer = new HadoopWriteOperatorDescriptor(spec, conf, numOutputters);
- configurePartitionCountConstraint(spec,writer,numOutputters);
- spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, writer, 0);
- return writer;
- }
+ private org.apache.hadoop.mapreduce.InputSplit[] getNewInputSplits(
+ JobConf conf) throws ClassNotFoundException, IOException,
+ InterruptedException {
+ org.apache.hadoop.mapreduce.InputSplit[] splits = null;
+ JobContext context = new JobContext(conf, null);
+ org.apache.hadoop.mapreduce.InputFormat inputFormat = ReflectionUtils
+ .newInstance(context.getInputFormatClass(), conf);
+ List<org.apache.hadoop.mapreduce.InputSplit> inputSplits = inputFormat
+ .getSplits(context);
+ return inputSplits
+ .toArray(new org.apache.hadoop.mapreduce.InputSplit[] {});
+ }
+ private InputSplit[] getOldInputSplits(JobConf conf) throws IOException {
+ InputFormat inputFormat = conf.getInputFormat();
+ return inputFormat.getSplits(conf, conf.getNumMapTasks());
+ }
- private int getInstanceCount(IOperatorDescriptor operator) {
- return operatorInstanceCount.get(operator.getOperatorId());
- }
+ private void configurePartitionCountConstraint(JobSpecification spec,
+ IOperatorDescriptor operator, int instanceCount) {
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, operator,
+ instanceCount);
+ operatorInstanceCount.put(operator.getOperatorId(), instanceCount);
+ }
- private IOperatorDescriptor addCombiner(IOperatorDescriptor previousOperator, JobConf jobConf,
- JobSpecification spec) throws Exception {
- boolean useCombiner = (jobConf.getCombinerClass() != null);
- IOperatorDescriptor mapSideOutputOp = previousOperator;
- if (useCombiner) {
- System.out.println("Using Combiner:" + jobConf.getCombinerClass().getName());
- IOperatorDescriptor mapSideCombineSortOp = getExternalSorter(jobConf, spec);
- configurePartitionCountConstraint(spec,mapSideCombineSortOp,getInstanceCount(previousOperator));
-
- HadoopReducerOperatorDescriptor mapSideCombineReduceOp = getReducer(jobConf, spec);
- configurePartitionCountConstraint(spec,mapSideCombineReduceOp,getInstanceCount(previousOperator));
- spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, mapSideCombineSortOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), mapSideCombineSortOp, 0, mapSideCombineReduceOp, 0);
- mapSideOutputOp = mapSideCombineSortOp;
- }
- return mapSideOutputOp;
- }
-
- private int getNumReduceTasks(JobConf jobConf) {
- int numReduceTasks = Math.min(maxReducers,jobConf.getNumReduceTasks());
- return numReduceTasks;
- }
-
- private IOperatorDescriptor addReducer(IOperatorDescriptor previousOperator, JobConf jobConf,
- JobSpecification spec) throws Exception {
- IOperatorDescriptor mrOutputOperator = previousOperator;
- if (jobConf.getNumReduceTasks() != 0) {
- IOperatorDescriptor sorter = getExternalSorter(jobConf, spec);
- HadoopReducerOperatorDescriptor reducer = getReducer(jobConf, spec);
- int numReduceTasks = getNumReduceTasks(jobConf);
- System.out.println("No of Reducers :" + numReduceTasks);
- configurePartitionCountConstraint(spec,sorter,numReduceTasks);
- configurePartitionCountConstraint(spec,reducer,numReduceTasks);
-
- IConnectorDescriptor mToNConnectorDescriptor = getMtoNHashPartitioningConnector(jobConf, spec);
- spec.connect(mToNConnectorDescriptor, previousOperator, 0, sorter, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, reducer, 0);
- mrOutputOperator = reducer;
- }
- return mrOutputOperator;
- }
-
- private long getInputSize(Object[] splits,JobConf conf) throws IOException, InterruptedException {
- long totalInputSize =0;
- if(conf.getUseNewMapper()) {
- for (org.apache.hadoop.mapreduce.InputSplit split : (org.apache.hadoop.mapreduce.InputSplit[])splits) {
- totalInputSize += split.getLength();
- }
- } else {
- for (InputSplit split : (InputSplit[])splits) {
- totalInputSize += split.getLength();
- }
- }
- return totalInputSize;
- }
-
- private Object[] getInputSplits(JobConf conf, int desiredMaxMappers) throws Exception {
- Object[] splits = getInputSplits(conf);
- System.out.println(" initial split count :" + splits.length);
- System.out.println(" desired mappers :" + desiredMaxMappers);
- if (splits.length > desiredMaxMappers) {
- long totalInputSize = getInputSize(splits,conf);
- long goalSize = (totalInputSize/desiredMaxMappers);
- System.out.println(" total input length :" + totalInputSize);
- System.out.println(" goal size :" + goalSize);
- conf.setLong("mapred.min.split.size", goalSize);
- conf.setNumMapTasks(desiredMaxMappers);
- splits = getInputSplits(conf);
- System.out.println(" revised split count :" + splits.length);
- }
- return splits;
- }
-
- public JobSpecification getPipelinedSpec(List<JobConf> jobConfs) throws Exception {
- JobSpecification spec = new JobSpecification();
- Iterator<JobConf> iterator = jobConfs.iterator();
- JobConf firstMR = iterator.next();
- IOperatorDescriptor mrOutputOp = configureMapReduce(null, spec,firstMR);
- while (iterator.hasNext())
- for (JobConf currentJobConf : jobConfs) {
- mrOutputOp = configureMapReduce(mrOutputOp, spec , currentJobConf);
- }
- configureOutput(mrOutputOp, jobConfs.get(jobConfs.size() - 1), spec);
- return spec;
- }
+ public HadoopMapperOperatorDescriptor getMapper(JobConf conf,
+ JobSpecification spec, IOperatorDescriptor previousOp)
+ throws Exception {
+ boolean selfRead = previousOp == null;
+ IHadoopClassFactory classFactory = new ClasspathBasedHadoopClassFactory();
+ HadoopMapperOperatorDescriptor mapOp = null;
+ if (selfRead) {
+ Object[] splits = getInputSplits(conf, maxMappers);
+ mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,
+ classFactory);
+ configurePartitionCountConstraint(spec, mapOp, splits.length);
+ } else {
+ configurePartitionCountConstraint(spec, mapOp,
+ getInstanceCount(previousOp));
+ mapOp = new HadoopMapperOperatorDescriptor(spec, conf, classFactory);
+ spec.connect(new OneToOneConnectorDescriptor(spec), previousOp, 0,
+ mapOp, 0);
+ }
+ return mapOp;
+ }
- public JobSpecification getJobSpecification(JobConf conf) throws Exception {
- JobSpecification spec = new JobSpecification();
- IOperatorDescriptor mrOutput = configureMapReduce(null,spec, conf);
- IOperatorDescriptor printer = configureOutput(mrOutput, conf, spec);
- spec.addRoot(printer);
- System.out.println(spec);
- return spec;
- }
-
- private IOperatorDescriptor configureMapReduce(IOperatorDescriptor previousOuputOp, JobSpecification spec, JobConf conf) throws Exception {
- IOperatorDescriptor mapper = getMapper(conf,spec,previousOuputOp);
- IOperatorDescriptor mapSideOutputOp = addCombiner(mapper,conf,spec);
- IOperatorDescriptor reducer = addReducer(mapSideOutputOp, conf, spec);
- return reducer;
- }
+ public HadoopReducerOperatorDescriptor getReducer(JobConf conf,
+ JobSpecification spec, boolean useAsCombiner) {
+ HadoopReducerOperatorDescriptor reduceOp = new HadoopReducerOperatorDescriptor(
+ spec, conf, null, new ClasspathBasedHadoopClassFactory(),
+ useAsCombiner);
+ return reduceOp;
+ }
- public static InMemorySortOperatorDescriptor getInMemorySorter(JobConf conf, JobSpecification spec) {
- InMemorySortOperatorDescriptor inMemorySortOp = null;
- RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf.getMapOutputKeyClass().getName(), conf
- .getMapOutputValueClass().getName());
- Class<? extends RawComparator> rawComparatorClass = null;
- WritableComparator writableComparator = WritableComparator.get(conf.getMapOutputKeyClass().asSubclass(
- WritableComparable.class));
- WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
- writableComparator.getClass());
- inMemorySortOp = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
- new IBinaryComparatorFactory[] { comparatorFactory }, recordDescriptor);
- return inMemorySortOp;
- }
+ public FileSystem getHDFSClient() {
+ FileSystem fileSystem = null;
+ try {
+ fileSystem = FileSystem.get(jobConf);
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ return fileSystem;
+ }
- public static ExternalSortOperatorDescriptor getExternalSorter(JobConf conf, JobSpecification spec) {
- ExternalSortOperatorDescriptor externalSortOp = null;
- RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf.getMapOutputKeyClass().getName(), conf
- .getMapOutputValueClass().getName());
- Class<? extends RawComparator> rawComparatorClass = null;
- WritableComparator writableComparator = WritableComparator.get(conf.getMapOutputKeyClass().asSubclass(
- WritableComparable.class));
- WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
- writableComparator.getClass());
- externalSortOp = new ExternalSortOperatorDescriptor(spec,conf.getInt(HYRACKS_EX_SORT_FRAME_LIMIT,DEFAULT_EX_SORT_FRAME_LIMIT),new int[] { 0 },
- new IBinaryComparatorFactory[] { comparatorFactory }, recordDescriptor);
- return externalSortOp;
- }
-
- public static MToNHashPartitioningConnectorDescriptor getMtoNHashPartitioningConnector(JobConf conf,
- JobSpecification spec) {
+ public JobSpecification getJobSpecification(List<JobConf> jobConfs)
+ throws Exception {
+ JobSpecification spec = null;
+ if (jobConfs.size() == 1) {
+ spec = getJobSpecification(jobConfs.get(0));
+ } else {
+ spec = getPipelinedSpec(jobConfs);
+ }
+ return spec;
+ }
- Class mapOutputKeyClass = conf.getMapOutputKeyClass();
- Class mapOutputValueClass = conf.getMapOutputValueClass();
+ private IOperatorDescriptor configureOutput(
+ IOperatorDescriptor previousOperator, JobConf conf,
+ JobSpecification spec) throws Exception {
+ int instanceCountPreviousOperator = operatorInstanceCount
+ .get(previousOperator.getOperatorId());
+ int numOutputters = conf.getNumReduceTasks() != 0 ? conf
+ .getNumReduceTasks() : instanceCountPreviousOperator;
+ HadoopWriteOperatorDescriptor writer = null;
+ writer = new HadoopWriteOperatorDescriptor(spec, conf, numOutputters);
+ configurePartitionCountConstraint(spec, writer, numOutputters);
+ spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator,
+ 0, writer, 0);
+ return writer;
+ }
- MToNHashPartitioningConnectorDescriptor connectorDescriptor = null;
- ITuplePartitionComputerFactory factory = null;
- conf.getMapOutputKeyClass();
- if (conf.getPartitionerClass() != null && !conf.getPartitionerClass().getName().startsWith("org.apache.hadoop")) {
- Class<? extends Partitioner> partitioner = conf.getPartitionerClass();
- factory = new HadoopPartitionerTuplePartitionComputerFactory(partitioner, DatatypeHelper
- .createSerializerDeserializer(mapOutputKeyClass), DatatypeHelper
- .createSerializerDeserializer(mapOutputValueClass));
- } else {
- RecordDescriptor recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(mapOutputKeyClass,
- mapOutputValueClass);
- ISerializerDeserializer mapOutputKeySerializerDerserializer = DatatypeHelper
- .createSerializerDeserializer(mapOutputKeyClass);
- factory = new HadoopHashTuplePartitionComputerFactory(mapOutputKeySerializerDerserializer);
- }
- connectorDescriptor = new MToNHashPartitioningConnectorDescriptor(spec, factory);
- return connectorDescriptor;
- }
+ private int getInstanceCount(IOperatorDescriptor operator) {
+ return operatorInstanceCount.get(operator.getOperatorId());
+ }
+
+ private IOperatorDescriptor addCombiner(
+ IOperatorDescriptor previousOperator, JobConf jobConf,
+ JobSpecification spec) throws Exception {
+ boolean useCombiner = (jobConf.getCombinerClass() != null);
+ IOperatorDescriptor mapSideOutputOp = previousOperator;
+ if (useCombiner) {
+ System.out.println("Using Combiner:"
+ + jobConf.getCombinerClass().getName());
+ IOperatorDescriptor mapSideCombineSortOp = getExternalSorter(
+ jobConf, spec);
+ configurePartitionCountConstraint(spec, mapSideCombineSortOp,
+ getInstanceCount(previousOperator));
+
+ HadoopReducerOperatorDescriptor mapSideCombineReduceOp = getReducer(
+ jobConf, spec, true);
+ configurePartitionCountConstraint(spec, mapSideCombineReduceOp,
+ getInstanceCount(previousOperator));
+ spec.connect(new OneToOneConnectorDescriptor(spec),
+ previousOperator, 0, mapSideCombineSortOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec),
+ mapSideCombineSortOp, 0, mapSideCombineReduceOp, 0);
+ mapSideOutputOp = mapSideCombineReduceOp;
+ }
+ return mapSideOutputOp;
+ }
+
+ private int getNumReduceTasks(JobConf jobConf) {
+ int numReduceTasks = Math.min(maxReducers, jobConf.getNumReduceTasks());
+ return numReduceTasks;
+ }
+
+ private IOperatorDescriptor addReducer(
+ IOperatorDescriptor previousOperator, JobConf jobConf,
+ JobSpecification spec) throws Exception {
+ IOperatorDescriptor mrOutputOperator = previousOperator;
+ if (jobConf.getNumReduceTasks() != 0) {
+ IOperatorDescriptor sorter = getExternalSorter(jobConf, spec);
+ HadoopReducerOperatorDescriptor reducer = getReducer(jobConf, spec,
+ false);
+ int numReduceTasks = getNumReduceTasks(jobConf);
+ configurePartitionCountConstraint(spec, sorter, numReduceTasks);
+ configurePartitionCountConstraint(spec, reducer, numReduceTasks);
+
+ IConnectorDescriptor mToNConnectorDescriptor = getMtoNHashPartitioningConnector(
+ jobConf, spec);
+ spec.connect(mToNConnectorDescriptor, previousOperator, 0, sorter,
+ 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0,
+ reducer, 0);
+ mrOutputOperator = reducer;
+ }
+ return mrOutputOperator;
+ }
+
+ private long getInputSize(Object[] splits, JobConf conf)
+ throws IOException, InterruptedException {
+ long totalInputSize = 0;
+ if (conf.getUseNewMapper()) {
+ for (org.apache.hadoop.mapreduce.InputSplit split : (org.apache.hadoop.mapreduce.InputSplit[]) splits) {
+ totalInputSize += split.getLength();
+ }
+ } else {
+ for (InputSplit split : (InputSplit[]) splits) {
+ totalInputSize += split.getLength();
+ }
+ }
+ return totalInputSize;
+ }
+
+ private Object[] getInputSplits(JobConf conf, int desiredMaxMappers)
+ throws Exception {
+ Object[] splits = getInputSplits(conf);
+ if (splits.length > desiredMaxMappers) {
+ long totalInputSize = getInputSize(splits, conf);
+ long goalSize = (totalInputSize / desiredMaxMappers);
+ conf.setLong("mapred.min.split.size", goalSize);
+ conf.setNumMapTasks(desiredMaxMappers);
+ splits = getInputSplits(conf);
+ }
+ return splits;
+ }
+
+ public JobSpecification getPipelinedSpec(List<JobConf> jobConfs)
+ throws Exception {
+ JobSpecification spec = new JobSpecification();
+ Iterator<JobConf> iterator = jobConfs.iterator();
+ JobConf firstMR = iterator.next();
+ IOperatorDescriptor mrOutputOp = configureMapReduce(null, spec, firstMR);
+ while (iterator.hasNext())
+ for (JobConf currentJobConf : jobConfs) {
+ mrOutputOp = configureMapReduce(mrOutputOp, spec,
+ currentJobConf);
+ }
+ configureOutput(mrOutputOp, jobConfs.get(jobConfs.size() - 1), spec);
+ return spec;
+ }
+
+ public JobSpecification getJobSpecification(JobConf conf) throws Exception {
+ JobSpecification spec = new JobSpecification();
+ IOperatorDescriptor mrOutput = configureMapReduce(null, spec, conf);
+ IOperatorDescriptor printer = configureOutput(mrOutput, conf, spec);
+ spec.addRoot(printer);
+ System.out.println(spec);
+ return spec;
+ }
+
+ private IOperatorDescriptor configureMapReduce(
+ IOperatorDescriptor previousOuputOp, JobSpecification spec,
+ JobConf conf) throws Exception {
+ IOperatorDescriptor mapper = getMapper(conf, spec, previousOuputOp);
+ IOperatorDescriptor mapSideOutputOp = addCombiner(mapper, conf, spec);
+ IOperatorDescriptor reducer = addReducer(mapSideOutputOp, conf, spec);
+ return reducer;
+ }
+
+ public static InMemorySortOperatorDescriptor getInMemorySorter(
+ JobConf conf, JobSpecification spec) {
+ InMemorySortOperatorDescriptor inMemorySortOp = null;
+ RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf
+ .getMapOutputKeyClass().getName(), conf
+ .getMapOutputValueClass().getName());
+ Class<? extends RawComparator> rawComparatorClass = null;
+ WritableComparator writableComparator = WritableComparator.get(conf
+ .getMapOutputKeyClass().asSubclass(WritableComparable.class));
+ WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
+ writableComparator.getClass());
+ inMemorySortOp = new InMemorySortOperatorDescriptor(spec,
+ new int[] { 0 },
+ new IBinaryComparatorFactory[] { comparatorFactory },
+ recordDescriptor);
+ return inMemorySortOp;
+ }
+
+ public static ExternalSortOperatorDescriptor getExternalSorter(
+ JobConf conf, JobSpecification spec) {
+ ExternalSortOperatorDescriptor externalSortOp = null;
+ RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf
+ .getMapOutputKeyClass().getName(), conf
+ .getMapOutputValueClass().getName());
+ Class<? extends RawComparator> rawComparatorClass = null;
+ WritableComparator writableComparator = WritableComparator.get(conf
+ .getMapOutputKeyClass().asSubclass(WritableComparable.class));
+ WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
+ writableComparator.getClass());
+ externalSortOp = new ExternalSortOperatorDescriptor(spec, conf.getInt(
+ HYRACKS_EX_SORT_FRAME_LIMIT, DEFAULT_EX_SORT_FRAME_LIMIT),
+ new int[] { 0 },
+ new IBinaryComparatorFactory[] { comparatorFactory },
+ recordDescriptor);
+ return externalSortOp;
+ }
+
+ public static MToNHashPartitioningConnectorDescriptor getMtoNHashPartitioningConnector(
+ JobConf conf, JobSpecification spec) {
+
+ Class mapOutputKeyClass = conf.getMapOutputKeyClass();
+ Class mapOutputValueClass = conf.getMapOutputValueClass();
+
+ MToNHashPartitioningConnectorDescriptor connectorDescriptor = null;
+ ITuplePartitionComputerFactory factory = null;
+ conf.getMapOutputKeyClass();
+ if (conf.getPartitionerClass() != null
+ && !conf.getPartitionerClass().getName().startsWith(
+ "org.apache.hadoop")) {
+ Class<? extends Partitioner> partitioner = conf
+ .getPartitionerClass();
+ factory = new HadoopPartitionerTuplePartitionComputerFactory(
+ partitioner, DatatypeHelper
+ .createSerializerDeserializer(mapOutputKeyClass),
+ DatatypeHelper
+ .createSerializerDeserializer(mapOutputValueClass));
+ } else {
+ RecordDescriptor recordDescriptor = DatatypeHelper
+ .createKeyValueRecordDescriptor(mapOutputKeyClass,
+ mapOutputValueClass);
+ ISerializerDeserializer mapOutputKeySerializerDerserializer = DatatypeHelper
+ .createSerializerDeserializer(mapOutputKeyClass);
+ factory = new HadoopHashTuplePartitionComputerFactory(
+ mapOutputKeySerializerDerserializer);
+ }
+ connectorDescriptor = new MToNHashPartitioningConnectorDescriptor(spec,
+ factory);
+ return connectorDescriptor;
+ }
}