Remove Unused / Historical Hyracks Modules
Change-Id: Iaa058eb7c73696e1ead2c05c1ee34dbe9055ce52
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1714
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/pom.xml b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/pom.xml
deleted file mode 100644
index cd0b30a..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/pom.xml
+++ /dev/null
@@ -1,85 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <artifactId>hyracks-dataflow-hadoop</artifactId>
- <name>hyracks-dataflow-hadoop</name>
-
- <parent>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks</artifactId>
- <version>0.2.18-SNAPSHOT</version>
- </parent>
-
- <licenses>
- <license>
- <name>Apache License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- <distribution>repo</distribution>
- <comments>A business-friendly OSS license</comments>
- </license>
- </licenses>
-
- <properties>
- <root.dir>${basedir}/../..</root.dir>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-api</artifactId>
- <version>${project.version}</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-dataflow-common</artifactId>
- <version>${project.version}</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-hdfs-2.x</artifactId>
- <version>${project.version}</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>edu.uci.ics.dcache</groupId>
- <artifactId>dcache-client</artifactId>
- <version>0.0.1</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-dataflow-std</artifactId>
- <version>${project.version}</version>
- <scope>compile</scope>
- </dependency>
- </dependencies>
-</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
deleted file mode 100644
index 226250e..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-import edu.uci.ics.dcache.client.DCacheClient;
-import org.apache.hyracks.api.dataflow.IDataWriter;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper;
-import org.apache.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-
-public abstract class AbstractHadoopOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
- protected transient JobConf jobConf;
-
- protected static class DataWritingOutputCollector<K, V> implements OutputCollector<K, V> {
- private IDataWriter<Object[]> writer;
-
- public DataWritingOutputCollector() {
- }
-
- public DataWritingOutputCollector(IDataWriter<Object[]> writer) {
- this.writer = writer;
- }
-
- @Override
- public void collect(Object key, Object value) throws IOException {
- writer.writeData(new Object[] { key, value });
- }
-
- public void setWriter(IDataWriter<Object[]> writer) {
- this.writer = writer;
- }
- }
-
- public static String MAPRED_CACHE_FILES = "mapred.cache.files";
- public static String MAPRED_CACHE_LOCALFILES = "mapred.cache.localFiles";
-
- private static final long serialVersionUID = 1L;
- private final Map<String, String> jobConfMap;
- private IHadoopClassFactory hadoopClassFactory;
-
- public AbstractHadoopOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity,
- RecordDescriptor recordDescriptor, JobConf jobConf, IHadoopClassFactory hadoopOperatorFactory) {
- super(spec, inputArity, 1);
- jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
- this.hadoopClassFactory = hadoopOperatorFactory;
- recordDescriptors[0] = recordDescriptor;
- }
-
- public Map<String, String> getJobConfMap() {
- return jobConfMap;
- }
-
- public IHadoopClassFactory getHadoopClassFactory() {
- return hadoopClassFactory;
- }
-
- public void setHadoopClassFactory(IHadoopClassFactory hadoopClassFactory) {
- this.hadoopClassFactory = hadoopClassFactory;
- }
-
- protected Reporter createReporter() {
- return new Reporter() {
- @Override
- public Counter getCounter(Enum<?> name) {
- return null;
- }
-
- @Override
- public Counter getCounter(String group, String name) {
- return null;
- }
-
- @Override
- public InputSplit getInputSplit() throws UnsupportedOperationException {
- return null;
- }
-
- @Override
- public void incrCounter(Enum<?> key, long amount) {
-
- }
-
- @Override
- public void incrCounter(String group, String counter, long amount) {
-
- }
-
- @Override
- public void progress() {
-
- }
-
- @Override
- public void setStatus(String status) {
-
- }
-
- @Override
- public float getProgress() {
- return 0.0f;
- }
- };
- }
-
- public JobConf getJobConf() {
- if (jobConf == null) {
- jobConf = DatatypeHelper.map2JobConf(jobConfMap);
- jobConf.setClassLoader(this.getClass().getClassLoader());
- }
- return jobConf;
- }
-
- public void populateCache(JobConf jobConf) {
- try {
- String cache = jobConf.get(MAPRED_CACHE_FILES);
- System.out.println("cache:" + cache);
- if (cache == null) {
- return;
- }
- String localCache = jobConf.get(MAPRED_CACHE_LOCALFILES);
- System.out.println("localCache:" + localCache);
- if (localCache != null) {
- return;
- }
- localCache = "";
- StringTokenizer cacheTokenizer = new StringTokenizer(cache, ",");
- while (cacheTokenizer.hasMoreTokens()) {
- if (!"".equals(localCache)) {
- localCache += ",";
- }
- try {
- localCache += DCacheClient.get().get(cacheTokenizer.nextToken());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- jobConf.set(MAPRED_CACHE_LOCALFILES, localCache);
- System.out.println("localCache:" + localCache);
- } catch (Exception e) {
-
- }
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
deleted file mode 100644
index b328f29..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ /dev/null
@@ -1,454 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileRecordReader;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.StatusReporter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOpenableDataWriter;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.comm.io.SerializingDataWriter;
-import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper;
-import org.apache.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
-import org.apache.hyracks.dataflow.hadoop.util.InputSplitsProxy;
-import org.apache.hyracks.dataflow.hadoop.util.MRContextUtil;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
-import org.apache.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
-import org.apache.hyracks.hdfs.ContextFactory;
-
-public class HadoopMapperOperatorDescriptor<K1, V1, K2, V2> extends AbstractHadoopOperatorDescriptor {
-
- private class MapperBaseOperator {
- protected OutputCollector<K2, V2> output;
- protected Reporter reporter;
- protected Object mapper;
- // protected Mapper<K1, V1, K2, V2> mapper;
- protected int partition;
- protected JobConf conf;
- protected IOpenableDataWriter<Object[]> writer;
- protected boolean newMapreduceLib = false;
- org.apache.hadoop.mapreduce.Mapper.Context context;
-
- public MapperBaseOperator(int partition) {
- this.partition = partition;
- }
-
- protected void initializeMapper() throws HyracksDataException {
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- jobConf = getJobConf();
- populateCache(jobConf);
- conf = new JobConf(jobConf);
- conf.setClassLoader(jobConf.getClassLoader());
- reporter = createReporter();
- }
-
- protected void map(Object[] data) throws HyracksDataException {
- try {
- if (!conf.getUseNewMapper()) {
- ((org.apache.hadoop.mapred.Mapper) mapper).map((K1) data[0], (V1) data[1], output, reporter);
- } else
- throw new IllegalStateException(
- " Incorrect map method called for MapReduce code written using mapreduce package");
- } catch (IOException e) {
- throw new HyracksDataException(e);
- } catch (RuntimeException re) {
- System.out.println(" Runtime exceptione encoutered for row :" + data[0] + ": " + data[1]);
- re.printStackTrace();
- }
- }
-
- protected void closeMapper() throws HyracksDataException {
- try {
- if (!conf.getUseNewMapper()) {
- ((org.apache.hadoop.mapred.Mapper) mapper).close();
- } else {
- // do nothing. closing the mapper is handled internally by
- // run method on context.
- }
- } catch (IOException ioe) {
- throw new HyracksDataException(ioe);
- }
- }
-
- }
-
- private class MapperOperator extends MapperBaseOperator implements IOpenableDataWriterOperator {
-
- public MapperOperator(int partition) {
- super(partition);
- };
-
- @Override
- public void close() throws HyracksDataException {
- super.closeMapper();
- writer.close();
- }
-
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
-
- @Override
- public void open() throws HyracksDataException {
- initializeMapper();
- writer.open();
- output = new DataWritingOutputCollector<K2, V2>(writer);
- }
-
- @Override
- public void writeData(Object[] data) throws HyracksDataException {
- super.map(data);
- }
-
- public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
- if (index != 0) {
- throw new IllegalArgumentException();
- }
- this.writer = writer;
- }
-
- protected void initializeMapper() throws HyracksDataException {
- super.initializeMapper();
- try {
- mapper = createMapper(conf);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- if (!conf.getUseNewMapper()) {
- ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
- }
- }
-
- @Override
- public void flush() throws HyracksDataException {
- }
-
- }
-
- private class ReaderMapperOperator extends MapperBaseOperator {
-
- public ReaderMapperOperator(int partition, IOpenableDataWriter writer) throws HyracksDataException {
- super(partition);
- output = new DataWritingOutputCollector<K2, V2>(writer);
- this.writer = writer;
- this.writer.open();
- }
-
- protected void updateConfWithSplit(JobConf conf) {
- try {
- if (inputSplits == null) {
- inputSplits = inputSplitsProxy.toInputSplits(conf);
- }
- Object splitRead = inputSplits[partition];
- if (splitRead instanceof FileSplit) {
- conf.set("map.input.file", ((FileSplit) splitRead).getPath().toString());
- conf.setLong("map.input.start", ((FileSplit) splitRead).getStart());
- conf.setLong("map.input.length", ((FileSplit) splitRead).getLength());
- } else if (splitRead instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
- conf.set("map.input.file", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getPath()
- .toString());
- conf.setLong("map.input.start",
- ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getStart());
- conf.setLong("map.input.length",
- ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getLength());
- }
- } catch (Exception e) {
- e.printStackTrace();
- // we do not throw the exception here as we are setting
- // additional parameters that may not be
- // required by the mapper. If they are indeed required, the
- // configure method invoked on the mapper
- // shall report an exception because of the missing parameters.
- }
- }
-
- protected void initializeMapper() throws HyracksDataException {
- super.initializeMapper();
- updateConfWithSplit(conf);
- try {
- mapper = createMapper(conf);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- if (!conf.getUseNewMapper()) {
- ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
- }
- }
-
- public void mapInput() throws HyracksDataException, InterruptedException, ClassNotFoundException {
- try {
- initializeMapper();
- conf.setClassLoader(this.getClass().getClassLoader());
- Object reader;
- Object key = null;
- Object value = null;
- Object inputSplit = inputSplits[partition];
- reader = getRecordReader(conf, inputSplit);
- final Object[] data = new Object[2];
- if (conf.getUseNewMapper()) {
- org.apache.hadoop.mapreduce.RecordReader newReader = (org.apache.hadoop.mapreduce.RecordReader) reader;
- org.apache.hadoop.mapreduce.RecordWriter recordWriter = new RecordWriter() {
-
- @Override
- public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
- // TODO Auto-generated method stub
- }
-
- @Override
- public void write(Object key, Object value) throws IOException, InterruptedException {
- data[0] = key;
- data[1] = value;
- writer.writeData(data);
- }
- };;;
-
- OutputCommitter outputCommitter = new org.apache.hadoop.mapreduce.lib.output.NullOutputFormat()
- .getOutputCommitter(new ContextFactory().createContext(conf, new TaskAttemptID()));
- StatusReporter statusReporter = new StatusReporter() {
- @Override
- public void setStatus(String arg0) {
- }
-
- @Override
- public void progress() {
- }
-
- @Override
- public Counter getCounter(String arg0, String arg1) {
- return null;
- }
-
- @Override
- public Counter getCounter(Enum<?> arg0) {
- return null;
- }
-
- @Override
- public float getProgress() {
- // TODO Auto-generated method stub
- return 0;
- }
- };;;
- context = new MRContextUtil().createMapContext(conf, new TaskAttemptID(), newReader, recordWriter,
- outputCommitter, statusReporter, (org.apache.hadoop.mapreduce.InputSplit) inputSplit);
- newReader.initialize((org.apache.hadoop.mapreduce.InputSplit) inputSplit, context);
- ((org.apache.hadoop.mapreduce.Mapper) mapper).run(context);
- } else {
- Class inputKeyClass = null;
- Class inputValueClass = null;
-
- RecordReader oldReader = (RecordReader) reader;
- if (reader instanceof SequenceFileRecordReader) {
- inputKeyClass = ((SequenceFileRecordReader) oldReader).getKeyClass();
- inputValueClass = ((SequenceFileRecordReader) oldReader).getValueClass();
- } else {
- inputKeyClass = oldReader.createKey().getClass();
- inputValueClass = oldReader.createValue().getClass();
- }
- key = oldReader.createKey();
- value = oldReader.createValue();
- while (oldReader.next(key, value)) {
- data[0] = key;
- data[1] = value;
- super.map(data);
- }
- oldReader.close();
- }
-
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
-
- }
-
- public void close() throws HyracksDataException {
- super.closeMapper();
- writer.close();
- }
- }
-
- private static final long serialVersionUID = 1L;
- private Class mapperClass;
- private InputSplitsProxy inputSplitsProxy;
- private transient Object[] inputSplits;
- private boolean selfRead = false;
-
- private void initializeSplitInfo(Object[] splits) throws IOException {
- jobConf = super.getJobConf();
- InputFormat inputFormat = jobConf.getInputFormat();
- inputSplitsProxy = new InputSplitsProxy(jobConf, splits);
- }
-
- public HadoopMapperOperatorDescriptor(IOperatorDescriptorRegistry spec, JobConf jobConf,
- IHadoopClassFactory hadoopClassFactory) throws IOException {
- super(spec, 1, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory);
- }
-
- public HadoopMapperOperatorDescriptor(IOperatorDescriptorRegistry spec, JobConf jobConf, Object[] splits,
- IHadoopClassFactory hadoopClassFactory) throws IOException {
- super(spec, 0, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory);
- initializeSplitInfo(splits);
- this.selfRead = true;
- }
-
- public static RecordDescriptor getRecordDescriptor(JobConf conf, IHadoopClassFactory hadoopClassFactory) {
- RecordDescriptor recordDescriptor = null;
- String mapOutputKeyClassName = conf.getMapOutputKeyClass().getName();
- String mapOutputValueClassName = conf.getMapOutputValueClass().getName();
- try {
- if (hadoopClassFactory == null) {
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
- (Class<? extends Writable>) Class.forName(mapOutputKeyClassName),
- (Class<? extends Writable>) Class.forName(mapOutputValueClassName));
- } else {
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
- (Class<? extends Writable>) hadoopClassFactory.loadClass(mapOutputKeyClassName),
- (Class<? extends Writable>) hadoopClassFactory.loadClass(mapOutputValueClassName));
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return recordDescriptor;
- }
-
- private Object createMapper(JobConf conf) throws Exception {
- Object mapper;
- if (mapperClass != null) {
- return ReflectionUtils.newInstance(mapperClass, conf);
- } else {
- String mapperClassName = null;
- if (jobConf.getUseNewMapper()) {
- JobContext jobContext = new ContextFactory().createJobContext(conf);
- mapperClass = jobContext.getMapperClass();
- mapperClassName = mapperClass.getName();
- } else {
- mapperClass = conf.getMapperClass();
- mapperClassName = mapperClass.getName();
- }
- mapper = getHadoopClassFactory().createMapper(mapperClassName, conf);
- }
- return mapper;
- }
-
- private Object getRecordReader(JobConf conf, Object inputSplit) throws ClassNotFoundException, IOException,
- InterruptedException {
- if (conf.getUseNewMapper()) {
- JobContext context = new ContextFactory().createJobContext(conf);
- org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils
- .newInstance(context.getInputFormatClass(), conf);
- TaskAttemptContext taskAttemptContext = new ContextFactory().createContext(conf, new TaskAttemptID());
- return inputFormat.createRecordReader((org.apache.hadoop.mapreduce.InputSplit) inputSplit,
- taskAttemptContext);
- } else {
- Class inputFormatClass = conf.getInputFormat().getClass();
- InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
- return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf,
- super.createReporter());
- }
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-
- JobConf conf = getJobConf();
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- try {
- if (selfRead) {
- RecordDescriptor recordDescriptor = null;
- if (inputSplits == null) {
- inputSplits = inputSplitsProxy.toInputSplits(conf);
- }
- Object reader = getRecordReader(conf, inputSplits[partition]);
- if (conf.getUseNewMapper()) {
- org.apache.hadoop.mapreduce.RecordReader newReader = (org.apache.hadoop.mapreduce.RecordReader) reader;
- newReader.initialize((org.apache.hadoop.mapreduce.InputSplit) inputSplits[partition],
- new ContextFactory().createContext(conf, new TaskAttemptID()));
- newReader.nextKeyValue();
- Object key = newReader.getCurrentKey();
- Class keyClass = null;
- if (key == null) {
- keyClass = Class.forName("org.apache.hadoop.io.NullWritable");
- }
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
- (Class<? extends Writable>) keyClass, (Class<? extends Writable>) newReader
- .getCurrentValue().getClass());
- } else {
- RecordReader oldReader = (RecordReader) reader;
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
- (Class<? extends Writable>) oldReader.createKey().getClass(),
- (Class<? extends Writable>) oldReader.createValue().getClass());
- }
- return createSelfReadingMapper(ctx, recordDescriptor, partition);
- } else {
- return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition),
- recordDescProvider.getInputRecordDescriptor(this.activityNodeId, 0));
- }
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- private IOperatorNodePushable createSelfReadingMapper(final IHyracksTaskContext ctx,
- final RecordDescriptor recordDescriptor, final int partition) {
- return new AbstractUnaryOutputSourceOperatorNodePushable() {
- @Override
- public void initialize() throws HyracksDataException {
- SerializingDataWriter writer = new SerializingDataWriter(ctx, recordDescriptor, this.writer);
- ReaderMapperOperator readMapOp = new ReaderMapperOperator(partition, writer);
- try {
- readMapOp.mapInput();
- } catch (Exception e) {
- writer.fail();
- throw new HyracksDataException(e);
- } finally {
- readMapOp.close();
- }
- }
- };
- }
-
- public Class<? extends Mapper> getMapperClass() {
- return mapperClass;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
deleted file mode 100644
index 7764265..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileRecordReader;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper;
-import org.apache.hyracks.dataflow.hadoop.util.InputSplitsProxy;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import org.apache.hyracks.hdfs.ContextFactory;
-
-public class HadoopReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
-
- private String inputFormatClassName;
- private Map<String, String> jobConfMap;
- private InputSplitsProxy inputSplitsProxy;
- private transient JobConf jobConf;
-
- public JobConf getJobConf() {
- if (jobConf == null) {
- jobConf = DatatypeHelper.map2JobConf(jobConfMap);
- }
- return jobConf;
- }
-
- public HadoopReadOperatorDescriptor(JobConf jobConf, JobSpecification spec, Object[] splits) throws IOException {
- super(spec, 0, 1);
- this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
- InputFormat inputFormat = jobConf.getInputFormat();
- RecordReader recordReader;
- try {
- recordReader = getRecordReader(DatatypeHelper.map2JobConf(jobConfMap), splits[0]);
- } catch (Exception e) {
- throw new IOException(e);
- }
- recordDescriptors[0] = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) recordReader
- .createKey().getClass(), (Class<? extends Writable>) recordReader.createValue().getClass());
- PartitionConstraintHelper.addPartitionCountConstraint(spec, this, splits.length);
- inputSplitsProxy = new InputSplitsProxy(jobConf, splits);
- this.inputFormatClassName = inputFormat.getClass().getName();
- }
-
- private RecordReader getRecordReader(JobConf conf, Object inputSplit) throws ClassNotFoundException, IOException,
- InterruptedException {
- RecordReader hadoopRecordReader = null;
- if (conf.getUseNewMapper()) {
- JobContext context = new ContextFactory().createJobContext(conf);
- org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils
- .newInstance(context.getInputFormatClass(), conf);
- TaskAttemptContext taskAttemptContext = new ContextFactory().createContext(jobConf, null);
- hadoopRecordReader = (RecordReader) inputFormat.createRecordReader(
- (org.apache.hadoop.mapreduce.InputSplit) inputSplit, taskAttemptContext);
- } else {
- Class inputFormatClass = conf.getInputFormat().getClass();
- InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
- hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(
- (org.apache.hadoop.mapred.InputSplit) inputSplit, conf, createReporter());
- }
- return hadoopRecordReader;
- }
-
- public Object[] getInputSplits() throws InstantiationException, IllegalAccessException, IOException {
- return inputSplitsProxy.toInputSplits(getJobConf());
- }
-
- protected Reporter createReporter() {
- return new Reporter() {
- @Override
- public Counter getCounter(Enum<?> name) {
- return null;
- }
-
- @Override
- public Counter getCounter(String group, String name) {
- return null;
- }
-
- @Override
- public InputSplit getInputSplit() throws UnsupportedOperationException {
- return null;
- }
-
- @Override
- public void incrCounter(Enum<?> key, long amount) {
-
- }
-
- @Override
- public void incrCounter(String group, String counter, long amount) {
-
- }
-
- @Override
- public void progress() {
-
- }
-
- @Override
- public void setStatus(String status) {
-
- }
-
- @Override
- public float getProgress() {
- // TODO Auto-generated method stub
- return 0;
- }
- };
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
- throws HyracksDataException {
- return new AbstractUnaryOutputSourceOperatorNodePushable() {
- @Override
- public void initialize() throws HyracksDataException {
- try {
- JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- conf.setClassLoader(this.getClass().getClassLoader());
- RecordReader hadoopRecordReader;
- Object key;
- Object value;
- Object[] splits = inputSplitsProxy.toInputSplits(conf);
- Object inputSplit = splits[partition];
-
- if (conf.getUseNewMapper()) {
- JobContext context = new ContextFactory().createJobContext(conf);
- org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils
- .newInstance(context.getInputFormatClass(), conf);
- TaskAttemptContext taskAttemptContext = new ContextFactory().createContext(jobConf, null);
- hadoopRecordReader = (RecordReader) inputFormat.createRecordReader(
- (org.apache.hadoop.mapreduce.InputSplit) inputSplit, taskAttemptContext);
- } else {
- Class inputFormatClass = conf.getInputFormat().getClass();
- InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
- hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(
- (org.apache.hadoop.mapred.InputSplit) inputSplit, conf, createReporter());
- }
-
- Class inputKeyClass;
- Class inputValueClass;
- if (hadoopRecordReader instanceof SequenceFileRecordReader) {
- inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader).getKeyClass();
- inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader).getValueClass();
- } else {
- inputKeyClass = hadoopRecordReader.createKey().getClass();
- inputValueClass = hadoopRecordReader.createValue().getClass();
- }
-
- key = hadoopRecordReader.createKey();
- value = hadoopRecordReader.createValue();
- FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
- RecordDescriptor outputRecordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
- (Class<? extends Writable>) hadoopRecordReader.createKey().getClass(),
- (Class<? extends Writable>) hadoopRecordReader.createValue().getClass());
- int nFields = outputRecordDescriptor.getFieldCount();
- ArrayTupleBuilder tb = new ArrayTupleBuilder(nFields);
- writer.open();
- try {
- while (hadoopRecordReader.next(key, value)) {
- tb.reset();
- switch (nFields) {
- case 2:
- tb.addField(outputRecordDescriptor.getFields()[0], key);
- case 1:
- tb.addField(outputRecordDescriptor.getFields()[1], value);
- }
- FrameUtils
- .appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(),
- 0, tb.getSize());
- }
- appender.write(writer, true);
- } catch (Exception e) {
- writer.fail();
- throw new HyracksDataException(e);
- } finally {
- writer.close();
- }
- hadoopRecordReader.close();
- } catch (InstantiationException e) {
- throw new HyracksDataException(e);
- } catch (IllegalAccessException e) {
- throw new HyracksDataException(e);
- } catch (ClassNotFoundException e) {
- throw new HyracksDataException(e);
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
- };
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
deleted file mode 100644
index 6913876..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ /dev/null
@@ -1,422 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RawKeyValueIterator;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IDataReader;
-import org.apache.hyracks.api.dataflow.IDataWriter;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IComparator;
-import org.apache.hyracks.api.dataflow.value.IComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.hadoop.data.KeyComparatorFactory;
-import org.apache.hyracks.dataflow.hadoop.data.RawComparingComparatorFactory;
-import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper;
-import org.apache.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
-import org.apache.hyracks.dataflow.hadoop.util.MRContextUtil;
-import org.apache.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
-import org.apache.hyracks.dataflow.std.group.DeserializedPreclusteredGroupOperator;
-import org.apache.hyracks.dataflow.std.group.IGroupAggregator;
-import org.apache.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
-import org.apache.hyracks.hdfs.ContextFactory;
-
-public class HadoopReducerOperatorDescriptor<K2, V2, K3, V3> extends AbstractHadoopOperatorDescriptor {
- private class ReducerAggregator implements IGroupAggregator {
- private Object reducer;
- private DataWritingOutputCollector<K3, V3> output;
- private Reporter reporter;
- private ReducerContext reducerContext;
- RawKeyValueIterator rawKeyValueIterator = new RawKeyValueIterator() {
-
- @Override
- public boolean next() throws IOException {
- return false;
- }
-
- @Override
- public DataInputBuffer getValue() throws IOException {
- return null;
- }
-
- @Override
- public Progress getProgress() {
- return null;
- }
-
- @Override
- public DataInputBuffer getKey() throws IOException {
- return null;
- }
-
- @Override
- public void close() throws IOException {
-
- }
- };
-
- class ReducerContext extends org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer.Context {
- private HadoopReducerOperatorDescriptor.ValueIterator iterator;
-
- @SuppressWarnings("unchecked")
- ReducerContext(org.apache.hadoop.mapreduce.Reducer reducer, JobConf conf) throws IOException,
- InterruptedException, ClassNotFoundException {
- ((org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer) reducer).super(new MRContextUtil()
- .createReduceContext(conf, new TaskAttemptID(), rawKeyValueIterator, null, null, null, null,
- null, null, Class.forName("org.apache.hadoop.io.NullWritable"),
- Class.forName("org.apache.hadoop.io.NullWritable")));
- }
-
- public void setIterator(HadoopReducerOperatorDescriptor.ValueIterator iter) {
- iterator = iter;
- }
-
- @Override
- public Iterable<V2> getValues() throws IOException, InterruptedException {
- return new Iterable<V2>() {
- @Override
- public Iterator<V2> iterator() {
- return iterator;
- }
- };
- }
-
- /** Start processing next unique key. */
- @Override
- public boolean nextKey() throws IOException, InterruptedException {
- boolean hasMore = iterator.hasNext();
- if (hasMore) {
- nextKeyValue();
- }
- return hasMore;
- }
-
- /**
- * Advance to the next key/value pair.
- */
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- iterator.next();
- return true;
- }
-
- public Object getCurrentKey() {
- return iterator.getKey();
- }
-
- @Override
- public Object getCurrentValue() {
- return iterator.getValue();
- }
-
- /**
- * Generate an output key/value pair.
- */
- @Override
- public void write(Object key, Object value) throws IOException, InterruptedException {
- output.collect(key, value);
- }
-
- }
-
- public ReducerAggregator(Object reducer) throws HyracksDataException {
- this.reducer = reducer;
- initializeReducer();
- output = new DataWritingOutputCollector<K3, V3>();
- reporter = new Reporter() {
- @Override
- public void progress() {
-
- }
-
- @Override
- public void setStatus(String arg0) {
-
- }
-
- @Override
- public void incrCounter(String arg0, String arg1, long arg2) {
-
- }
-
- @Override
- public void incrCounter(Enum<?> arg0, long arg1) {
-
- }
-
- @Override
- public InputSplit getInputSplit() throws UnsupportedOperationException {
- return null;
- }
-
- @Override
- public Counter getCounter(String arg0, String arg1) {
- return null;
- }
-
- @Override
- public Counter getCounter(Enum<?> arg0) {
- return null;
- }
-
- @Override
- public float getProgress() {
- // TODO Auto-generated method stub
- return 0;
- }
- };
- }
-
- @Override
- public void aggregate(IDataReader<Object[]> reader, IDataWriter<Object[]> writer) throws HyracksDataException {
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- ValueIterator i = new ValueIterator();
- i.reset(reader);
- output.setWriter(writer);
- try {
- if (jobConf.getUseNewReducer()) {
- try {
- reducerContext.setIterator(i);
- ((org.apache.hadoop.mapreduce.Reducer) reducer).run(reducerContext);
- } catch (InterruptedException e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
- }
- } else {
- ((org.apache.hadoop.mapred.Reducer) reducer).reduce(i.getKey(), i, output, reporter);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- // -- - close - --
- try {
- if (!jobConf.getUseNewMapper()) {
- ((org.apache.hadoop.mapred.Reducer) reducer).close();
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- private void initializeReducer() throws HyracksDataException {
- jobConf.setClassLoader(this.getClass().getClassLoader());
- if (!jobConf.getUseNewReducer()) {
- ((org.apache.hadoop.mapred.Reducer) reducer).configure(getJobConf());
- } else {
- try {
- reducerContext = new ReducerContext((org.apache.hadoop.mapreduce.Reducer) reducer, jobConf);
- } catch (IOException e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
- } catch (InterruptedException e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
- } catch (RuntimeException e) {
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- private class ValueIterator implements Iterator<V2> {
- private IDataReader<Object[]> reader;
- private K2 key;
- private V2 value;
-
- public K2 getKey() {
- return key;
- }
-
- public V2 getValue() {
- return value;
- }
-
- @Override
- public boolean hasNext() {
- if (value == null) {
- Object[] tuple;
- try {
- tuple = reader.readData();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- if (tuple != null) {
- value = (V2) tuple[1];
- }
- }
- return value != null;
- }
-
- @Override
- public V2 next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- V2 v = value;
- value = null;
- return v;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- void reset(IDataReader<Object[]> reader) {
- this.reader = reader;
- try {
- Object[] tuple = reader.readData();
- key = (K2) tuple[0];
- value = (V2) tuple[1];
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- private static final long serialVersionUID = 1L;
- private Class reducerClass;
- private IComparatorFactory comparatorFactory;
- private boolean useAsCombiner = false;
-
- public HadoopReducerOperatorDescriptor(IOperatorDescriptorRegistry spec, JobConf conf,
- IComparatorFactory comparatorFactory, IHadoopClassFactory classFactory, boolean useAsCombiner) {
- super(spec, 1, getRecordDescriptor(conf, classFactory), conf, classFactory);
- this.comparatorFactory = comparatorFactory;
- this.useAsCombiner = useAsCombiner;
- }
-
- private Object createReducer() throws Exception {
- if (reducerClass != null) {
- return ReflectionUtils.newInstance(reducerClass, getJobConf());
- } else {
- Object reducer;
- if (!useAsCombiner) {
- if (getJobConf().getUseNewReducer()) {
- JobContext jobContext = new ContextFactory().createJobContext(getJobConf());
- reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?>>) jobContext
- .getReducerClass();
- } else {
- reducerClass = (Class<? extends Reducer>) getJobConf().getReducerClass();
- }
- } else {
- if (getJobConf().getUseNewReducer()) {
- JobContext jobContext = new ContextFactory().createJobContext(getJobConf());
- reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?>>) jobContext
- .getCombinerClass();
- } else {
- reducerClass = (Class<? extends Reducer>) getJobConf().getCombinerClass();
- }
- }
- reducer = getHadoopClassFactory().createReducer(reducerClass.getName(), getJobConf());
- return reducer;
- }
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- try {
- if (this.comparatorFactory == null) {
- String comparatorClassName = getJobConf().getOutputValueGroupingComparator().getClass().getName();
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- RawComparator rawComparator = null;
- if (comparatorClassName != null) {
- Class comparatorClazz = getHadoopClassFactory().loadClass(comparatorClassName);
- this.comparatorFactory = new KeyComparatorFactory(comparatorClazz);
-
- } else {
- String mapOutputKeyClass = getJobConf().getMapOutputKeyClass().getName();
- if (getHadoopClassFactory() != null) {
- rawComparator = WritableComparator.get(getHadoopClassFactory().loadClass(mapOutputKeyClass));
- } else {
- rawComparator = WritableComparator.get((Class<? extends WritableComparable>) Class
- .forName(mapOutputKeyClass));
- }
- this.comparatorFactory = new RawComparingComparatorFactory(rawComparator.getClass());
- }
- }
- IOpenableDataWriterOperator op = new DeserializedPreclusteredGroupOperator(new int[] { 0 },
- new IComparator[] { comparatorFactory.createComparator() }, new ReducerAggregator(createReducer()));
- return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
- getActivityId(), 0));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public static RecordDescriptor getRecordDescriptor(JobConf conf, IHadoopClassFactory classFactory) {
- String outputKeyClassName = null;
- String outputValueClassName = null;
-
- if (conf.getUseNewMapper()) {
- JobContext context = new ContextFactory().createJobContext(conf);
- outputKeyClassName = context.getOutputKeyClass().getName();
- outputValueClassName = context.getOutputValueClass().getName();
- } else {
- outputKeyClassName = conf.getOutputKeyClass().getName();
- outputValueClassName = conf.getOutputValueClass().getName();
- }
-
- RecordDescriptor recordDescriptor = null;
- try {
- if (classFactory == null) {
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
- (Class<? extends Writable>) Class.forName(outputKeyClassName),
- (Class<? extends Writable>) Class.forName(outputValueClassName));
- } else {
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
- (Class<? extends Writable>) classFactory.loadClass(outputKeyClassName),
- (Class<? extends Writable>) classFactory.loadClass(outputValueClassName));
- }
- } catch (Exception e) {
- e.printStackTrace();
- return null;
- }
- return recordDescriptor;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
deleted file mode 100644
index 5709082..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.FileOutputCommitter;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper;
-import org.apache.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.IRecordWriter;
-import org.apache.hyracks.hdfs.ContextFactory;
-
-public class HadoopWriteOperatorDescriptor extends AbstractFileWriteOperatorDescriptor {
-
- private class HadoopFileWriter implements IRecordWriter {
-
- Object recordWriter;
- JobConf conf;
- Path finalOutputFile;
- Path tempOutputFile;
- Path tempDir;
-
- HadoopFileWriter(Object recordWriter, int index, JobConf conf) throws Exception {
- this.recordWriter = recordWriter;
- this.conf = conf;
- initialize(index, conf);
- }
-
- private void initialize(int index, JobConf conf) throws Exception {
- if (!(conf.getOutputFormat() instanceof NullOutputFormat)) {
- boolean isMap = conf.getNumReduceTasks() == 0;
- TaskAttemptID taskAttempId = new TaskAttemptID("0", index, isMap, index, index);
- conf.set("mapred.task.id", taskAttempId.toString());
- String suffix = new String("part-00000");
- suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
- suffix = suffix + index;
- outputPath = new Path(conf.get("mapred.output.dir"));
- tempDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
- FileSystem fileSys = tempDir.getFileSystem(conf);
- if (!fileSys.mkdirs(tempDir)) {
- throw new IOException("Mkdirs failed to create " + tempDir.toString());
- }
- tempOutputFile = new Path(tempDir, new Path("_" + taskAttempId.toString()));
- tempOutputFile = new Path(tempOutputFile, suffix);
- finalOutputFile = new Path(outputPath, suffix);
- if (conf.getUseNewMapper()) {
- org.apache.hadoop.mapreduce.JobContext jobContext = new ContextFactory().createJobContext(conf);
- org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat) ReflectionUtils
- .newInstance(jobContext.getOutputFormatClass(), conf);
- recordWriter = newOutputFormat.getRecordWriter(new ContextFactory().createContext(conf,
- taskAttempId));
- } else {
- recordWriter = conf.getOutputFormat().getRecordWriter(FileSystem.get(conf), conf, suffix,
- new Progressable() {
- @Override
- public void progress() {
- }
- });
- }
- }
- }
-
- @Override
- public void write(Object[] record) throws Exception {
- if (recordWriter != null) {
- if (conf.getUseNewMapper()) {
- ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).write(record[0], record[1]);
- } else {
- ((org.apache.hadoop.mapred.RecordWriter) recordWriter).write(record[0], record[1]);
- }
- }
- }
-
- @Override
- public void close() {
- try {
- if (recordWriter != null) {
- if (conf.getUseNewMapper()) {
- ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).close(new ContextFactory()
- .createContext(conf, new TaskAttemptID()));
- } else {
- ((org.apache.hadoop.mapred.RecordWriter) recordWriter).close(null);
- }
- if (outputPath != null) {
- FileSystem fileSystem = FileSystem.get(conf);
- fileSystem.rename(tempOutputFile, finalOutputFile);
- fileSystem.delete(tempDir, true);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- private static final long serialVersionUID = 1L;
- Map<String, String> jobConfMap;
-
- @Override
- protected IRecordWriter createRecordWriter(FileSplit fileSplit, int index) throws Exception {
- JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
- conf.setClassLoader(this.getClass().getClassLoader());
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- FileSystem fileSystem = FileSystem.get(conf);
- Object recordWriter = null;
- return new HadoopFileWriter(recordWriter, index, conf);
- }
-
- Path outputPath;
- Path outputTempPath;
-
- protected Reporter createReporter() {
- return new Reporter() {
- @Override
- public Counter getCounter(Enum<?> name) {
- return null;
- }
-
- @Override
- public Counter getCounter(String group, String name) {
- return null;
- }
-
- @Override
- public InputSplit getInputSplit() throws UnsupportedOperationException {
- return null;
- }
-
- @Override
- public void incrCounter(Enum<?> key, long amount) {
-
- }
-
- @Override
- public void incrCounter(String group, String counter, long amount) {
-
- }
-
- @Override
- public void progress() {
-
- }
-
- @Override
- public void setStatus(String status) {
-
- }
-
- @Override
- public float getProgress() {
- // TODO Auto-generated method stub
- return 0;
- }
- };
- }
-
- private boolean checkIfCanWriteToHDFS(FileSplit[] fileSplits) throws Exception {
- JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
- try {
- FileSystem fileSystem = FileSystem.get(conf);
- for (FileSplit fileSplit : fileSplits) {
- Path path = new Path(fileSplit.getLocalFile().getFile().getPath());
- if (fileSystem.exists(path)) {
- throw new Exception(" Output path : already exists : " + path);
- }
- }
- } catch (IOException ioe) {
- ioe.printStackTrace();
- throw ioe;
- }
- return true;
- }
-
- private static FileSplit[] getOutputSplits(JobConf conf, int noOfMappers) throws ClassNotFoundException {
- int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
- Object outputFormat = null;
- if (conf.getUseNewMapper()) {
- outputFormat = ReflectionUtils.newInstance(new ContextFactory().createJobContext(conf)
- .getOutputFormatClass(), conf);
- } else {
- outputFormat = conf.getOutputFormat();
- }
- if (outputFormat instanceof NullOutputFormat) {
- FileSplit[] outputFileSplits = new FileSplit[numOutputters];
- for (int i = 0; i < numOutputters; i++) {
- String outputPath = "/tmp/" + System.currentTimeMillis() + i;
- outputFileSplits[i] = new FileSplit("localhost", new FileReference(new File(outputPath)));
- }
- return outputFileSplits;
- } else {
-
- FileSplit[] outputFileSplits = new FileSplit[numOutputters];
- String absolutePath = FileOutputFormat.getOutputPath(conf).toString();
- for (int index = 0; index < numOutputters; index++) {
- String suffix = new String("part-00000");
- suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
- suffix = suffix + index;
- String outputPath = absolutePath + "/" + suffix;
- outputFileSplits[index] = new FileSplit("localhost", outputPath);
- }
- return outputFileSplits;
- }
- }
-
- public HadoopWriteOperatorDescriptor(IOperatorDescriptorRegistry jobSpec, JobConf jobConf, int numMapTasks)
- throws Exception {
- super(jobSpec, getOutputSplits(jobConf, numMapTasks));
- this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
- checkIfCanWriteToHDFS(super.splits);
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/AbstractClassBasedDelegate.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/AbstractClassBasedDelegate.java
deleted file mode 100644
index 62c37ed..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/AbstractClassBasedDelegate.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.data;
-
-import java.io.ObjectStreamException;
-import java.io.Serializable;
-
-public class AbstractClassBasedDelegate<T> implements Serializable {
- private static final long serialVersionUID = 1L;
- private Class<? extends T> klass;
- protected transient T instance;
-
- public AbstractClassBasedDelegate(Class<? extends T> klass) {
- this.klass = klass;
- init();
- }
-
- protected Object readResolve() throws ObjectStreamException {
- init();
- return this;
- }
-
- private void init() {
- try {
- instance = klass.newInstance();
- } catch (InstantiationException e) {
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopHashTuplePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopHashTuplePartitionComputerFactory.java
deleted file mode 100644
index fe8a612..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopHashTuplePartitionComputerFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.data;
-
-import java.io.DataInputStream;
-
-import org.apache.hadoop.io.Writable;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-
-public class HadoopHashTuplePartitionComputerFactory<K extends Writable> implements ITuplePartitionComputerFactory {
- private static final long serialVersionUID = 1L;
- private final ISerializerDeserializer<K> keyIO;
-
- public HadoopHashTuplePartitionComputerFactory(ISerializerDeserializer<K> keyIO) {
- this.keyIO = keyIO;
- }
-
- @Override
- public ITuplePartitionComputer createPartitioner() {
- return new ITuplePartitionComputer() {
- private final ByteBufferInputStream bbis = new ByteBufferInputStream();
- private final DataInputStream dis = new DataInputStream(bbis);
-
- @Override
- public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
- int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, 0);
- bbis.setByteBuffer(accessor.getBuffer(), keyStart);
- K key = keyIO.deserialize(dis);
- int h = key.hashCode();
- if (h < 0) {
- h = -h;
- }
- return h % nParts;
- }
- };
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopNewPartitionerTuplePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopNewPartitionerTuplePartitionComputerFactory.java
deleted file mode 100644
index b20c6e0..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopNewPartitionerTuplePartitionComputerFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.data;
-
-import java.io.DataInputStream;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Partitioner;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-
-public class HadoopNewPartitionerTuplePartitionComputerFactory<K extends Writable, V extends Writable> extends
- AbstractClassBasedDelegate<Partitioner<K, V>> implements ITuplePartitionComputerFactory {
- private static final long serialVersionUID = 1L;
- private final ISerializerDeserializer<K> keyIO;
- private final ISerializerDeserializer<V> valueIO;
-
- public HadoopNewPartitionerTuplePartitionComputerFactory(Class<? extends Partitioner<K, V>> klass,
- ISerializerDeserializer<K> keyIO, ISerializerDeserializer<V> valueIO) {
- super(klass);
- this.keyIO = keyIO;
- this.valueIO = valueIO;
- }
-
- @Override
- public ITuplePartitionComputer createPartitioner() {
- return new ITuplePartitionComputer() {
- private final ByteBufferInputStream bbis = new ByteBufferInputStream();
- private final DataInputStream dis = new DataInputStream(bbis);
-
- @Override
- public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
- int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, 0);
- bbis.setByteBuffer(accessor.getBuffer(), keyStart);
- K key = keyIO.deserialize(dis);
- int valueStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, 1);
- bbis.setByteBuffer(accessor.getBuffer(), valueStart);
- V value = valueIO.deserialize(dis);
- return instance.getPartition(key, value, nParts);
- }
- };
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopPartitionerTuplePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopPartitionerTuplePartitionComputerFactory.java
deleted file mode 100644
index 127eed9..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopPartitionerTuplePartitionComputerFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.data;
-
-import java.io.DataInputStream;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.Partitioner;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-
-public class HadoopPartitionerTuplePartitionComputerFactory<K extends Writable, V extends Writable> extends
- AbstractClassBasedDelegate<Partitioner<K, V>> implements ITuplePartitionComputerFactory {
- private static final long serialVersionUID = 1L;
- private final ISerializerDeserializer<K> keyIO;
- private final ISerializerDeserializer<V> valueIO;
-
- public HadoopPartitionerTuplePartitionComputerFactory(Class<? extends Partitioner<K, V>> klass,
- ISerializerDeserializer<K> keyIO, ISerializerDeserializer<V> valueIO) {
- super(klass);
- this.keyIO = keyIO;
- this.valueIO = valueIO;
- }
-
- @Override
- public ITuplePartitionComputer createPartitioner() {
- return new ITuplePartitionComputer() {
- private final ByteBufferInputStream bbis = new ByteBufferInputStream();
- private final DataInputStream dis = new DataInputStream(bbis);
-
- @Override
- public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
- int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, 0);
- bbis.setByteBuffer(accessor.getBuffer(), keyStart);
- K key = keyIO.deserialize(dis);
- int valueStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, 1);
- bbis.setByteBuffer(accessor.getBuffer(), valueStart);
- V value = valueIO.deserialize(dis);
- return instance.getPartition(key, value, nParts);
- }
- };
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyBinaryComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyBinaryComparatorFactory.java
deleted file mode 100644
index b74fdde..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyBinaryComparatorFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.data;
-
-import org.apache.hadoop.io.RawComparator;
-
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.dataflow.common.util.ReflectionUtils;
-
-public class KeyBinaryComparatorFactory<T> implements IBinaryComparatorFactory {
- private static final long serialVersionUID = 1L;
-
- private Class<? extends RawComparator<T>> cmpClass;
-
- public KeyBinaryComparatorFactory(Class<? extends RawComparator<T>> cmpClass) {
- this.cmpClass = cmpClass;
- }
-
- @Override
- public IBinaryComparator createBinaryComparator() {
- final RawComparator<T> instance = ReflectionUtils.createInstance(cmpClass);
- return new IBinaryComparator() {
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- return instance.compare(b1, s1, l1, b2, s2, l2);
- }
- };
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyComparatorFactory.java
deleted file mode 100644
index 01024bd..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyComparatorFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.data;
-
-import org.apache.hadoop.io.RawComparator;
-
-import org.apache.hyracks.api.dataflow.value.IComparator;
-import org.apache.hyracks.api.dataflow.value.IComparatorFactory;
-import org.apache.hyracks.dataflow.common.util.ReflectionUtils;
-
-public class KeyComparatorFactory<T> implements IComparatorFactory<T> {
- private static final long serialVersionUID = 1L;
- private Class<? extends RawComparator<T>> cmpClass;
-
- public KeyComparatorFactory(Class<? extends RawComparator<T>> cmpClass) {
- this.cmpClass = cmpClass;
- }
-
- @Override
- public IComparator<T> createComparator() {
- final RawComparator<T> instance = ReflectionUtils.createInstance(cmpClass);
- return new IComparator<T>() {
- @Override
- public int compare(T o1, T o2) {
- return instance.compare(o1, o2);
- }
- };
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/RawComparingComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/RawComparingComparatorFactory.java
deleted file mode 100644
index d2f0ead..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/RawComparingComparatorFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.data;
-
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparable;
-
-import org.apache.hyracks.api.dataflow.value.IComparator;
-import org.apache.hyracks.api.dataflow.value.IComparatorFactory;
-import org.apache.hyracks.dataflow.common.util.ReflectionUtils;
-
-public class RawComparingComparatorFactory<T> implements IComparatorFactory<WritableComparable<T>> {
- private Class<? extends RawComparator> klass;
-
- public RawComparingComparatorFactory(Class<? extends RawComparator> klass) {
- this.klass = klass;
- }
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public IComparator<WritableComparable<T>> createComparator() {
- final RawComparator instance = ReflectionUtils.createInstance(klass);
- return new IComparator<WritableComparable<T>>() {
- @Override
- public int compare(WritableComparable<T> o1, WritableComparable<T> o2) {
- return instance.compare(o1, o2);
- }
- };
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java
deleted file mode 100644
index 01f8755..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.data;
-
-import org.apache.hadoop.io.RawComparator;
-
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.dataflow.common.util.ReflectionUtils;
-
-public class WritableComparingBinaryComparatorFactory<T> implements IBinaryComparatorFactory {
- private static final long serialVersionUID = 1L;
-
- private Class<? extends RawComparator<T>> cmpClass;
-
- public WritableComparingBinaryComparatorFactory(Class<? extends RawComparator<T>> cmpClass) {
- this.cmpClass = cmpClass;
- }
-
- @Override
- public IBinaryComparator createBinaryComparator() {
- final RawComparator<T> instance = ReflectionUtils.createInstance(cmpClass);
- return new IBinaryComparator() {
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- return instance.compare(b1, s1, l1, b2, s2, l2);
- }
- };
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java
deleted file mode 100644
index b248bff..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import org.apache.hyracks.dataflow.hadoop.data.HadoopNewPartitionerTuplePartitionComputerFactory;
-import org.apache.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
-import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper;
-import org.apache.hyracks.hdfs.ContextFactory;
-
-public class HadoopHelper {
- public static final int KEY_FIELD_INDEX = 0;
- public static final int VALUE_FIELD_INDEX = 1;
- public static final int BLOCKID_FIELD_INDEX = 2;
- private static final int[] KEY_SORT_FIELDS = new int[] { 0 };
-
- private MarshalledWritable<Configuration> mConfig;
- private Configuration config;
- private Job job;
-
- public HadoopHelper(MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
- this.mConfig = mConfig;
- ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
- try {
- Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
- config = mConfig.get();
- config.setClassLoader(getClass().getClassLoader());
- job = new Job(config);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- } finally {
- Thread.currentThread().setContextClassLoader(ctxCL);
- }
- }
-
- public RecordDescriptor getMapOutputRecordDescriptor() throws HyracksDataException {
- try {
- return new RecordDescriptor(
- new ISerializerDeserializer[] {
- DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
- .getMapOutputKeyClass()),
- DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
- .getMapOutputValueClass()), IntegerSerializerDeserializer.INSTANCE });
-
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- public RecordDescriptor getMapOutputRecordDescriptorWithoutExtraFields() throws HyracksDataException {
- try {
- return new RecordDescriptor(
- new ISerializerDeserializer[] {
- DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
- .getMapOutputKeyClass()),
- DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
- .getMapOutputValueClass()) });
-
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- public TaskAttemptContext createTaskAttemptContext(TaskAttemptID taId) throws HyracksDataException {
- ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
- try {
- Thread.currentThread().setContextClassLoader(config.getClassLoader());
- return new ContextFactory().createContext(config, taId);
- } catch (HyracksDataException e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
- } finally {
- Thread.currentThread().setContextClassLoader(ctxCL);
- }
- }
-
- public JobContext createJobContext() {
- ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
- try {
- Thread.currentThread().setContextClassLoader(config.getClassLoader());
- return new ContextFactory().createJobContext(config);
- } finally {
- Thread.currentThread().setContextClassLoader(ctxCL);
- }
- }
-
- public <K1, V1, K2, V2> Mapper<K1, V1, K2, V2> getMapper() throws HyracksDataException {
- try {
- return (Mapper<K1, V1, K2, V2>) HadoopTools.newInstance(job.getMapperClass());
- } catch (ClassNotFoundException e) {
- throw new HyracksDataException(e);
- } catch (InstantiationException e) {
- throw new HyracksDataException(e);
- } catch (IllegalAccessException e) {
- throw new HyracksDataException(e);
- }
- }
-
- public <K2, V2, K3, V3> Reducer<K2, V2, K3, V3> getReducer() throws HyracksDataException {
- try {
- return (Reducer<K2, V2, K3, V3>) HadoopTools.newInstance(job.getReducerClass());
- } catch (ClassNotFoundException e) {
- throw new HyracksDataException(e);
- } catch (InstantiationException e) {
- throw new HyracksDataException(e);
- } catch (IllegalAccessException e) {
- throw new HyracksDataException(e);
- }
- }
-
- public <K2, V2> Reducer<K2, V2, K2, V2> getCombiner() throws HyracksDataException {
- try {
- return (Reducer<K2, V2, K2, V2>) HadoopTools.newInstance(job.getCombinerClass());
- } catch (ClassNotFoundException e) {
- throw new HyracksDataException(e);
- } catch (InstantiationException e) {
- throw new HyracksDataException(e);
- } catch (IllegalAccessException e) {
- throw new HyracksDataException(e);
- }
- }
-
- public <K, V> InputFormat<K, V> getInputFormat() throws HyracksDataException {
- try {
- return (InputFormat<K, V>) ReflectionUtils.newInstance(job.getInputFormatClass(), config);
- } catch (ClassNotFoundException e) {
- throw new HyracksDataException(e);
- }
- }
-
- public <K, V> List<InputSplit> getInputSplits() throws HyracksDataException {
- ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
- try {
- Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
- InputFormat<K, V> fmt = getInputFormat();
- JobContext jCtx = new ContextFactory().createJobContext(config);
- try {
- return fmt.getSplits(jCtx);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- } finally {
- Thread.currentThread().setContextClassLoader(ctxCL);
- }
- }
-
- public IBinaryComparatorFactory[] getSortComparatorFactories() {
- WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
- .getSortComparator().getClass());
-
- return new IBinaryComparatorFactory[] { comparatorFactory };
- }
-
- public IBinaryComparatorFactory[] getGroupingComparatorFactories() {
- WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
- .getGroupingComparator().getClass());
-
- return new IBinaryComparatorFactory[] { comparatorFactory };
- }
-
- public RawComparator<?> getRawGroupingComparator() {
- return job.getGroupingComparator();
- }
-
- public int getSortFrameLimit(IHyracksCommonContext ctx) {
- int sortMemory = job.getConfiguration().getInt("io.sort.mb", 100);
- return (int) (((long) sortMemory * 1024 * 1024) / ctx.getInitialFrameSize());
- }
-
- public Job getJob() {
- return job;
- }
-
- public MarshalledWritable<Configuration> getMarshalledConfiguration() {
- return mConfig;
- }
-
- public Configuration getConfiguration() {
- return config;
- }
-
- public ITuplePartitionComputerFactory getTuplePartitionComputer() throws HyracksDataException {
- int nReducers = job.getNumReduceTasks();
- try {
- return new HadoopNewPartitionerTuplePartitionComputerFactory<Writable, Writable>(
- (Class<? extends Partitioner<Writable, Writable>>) job.getPartitionerClass(),
- (ISerializerDeserializer<Writable>) DatatypeHelper
- .createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputKeyClass()),
- (ISerializerDeserializer<Writable>) DatatypeHelper
- .createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputValueClass()));
- } catch (ClassNotFoundException e) {
- throw new HyracksDataException(e);
- }
- }
-
- public int[] getSortFields() {
- return KEY_SORT_FIELDS;
- }
-
- public <K> ISerializerDeserializer<K> getMapOutputKeySerializerDeserializer() {
- return (ISerializerDeserializer<K>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
- .getMapOutputKeyClass());
- }
-
- public <V> ISerializerDeserializer<V> getMapOutputValueSerializerDeserializer() {
- return (ISerializerDeserializer<V>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
- .getMapOutputValueClass());
- }
-
- public FileSystem getFilesystem() throws HyracksDataException {
- try {
- return FileSystem.get(config);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- public <K, V> OutputFormat<K, V> getOutputFormat() throws HyracksDataException {
- try {
- return (OutputFormat<K, V>) ReflectionUtils.newInstance(job.getOutputFormatClass(), config);
- } catch (ClassNotFoundException e) {
- throw new HyracksDataException(e);
- }
- }
-
- public boolean hasCombiner() throws HyracksDataException {
- try {
- return job.getCombinerClass() != null;
- } catch (ClassNotFoundException e) {
- throw new HyracksDataException(e);
- }
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java
deleted file mode 100644
index fb86385..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-public class HadoopTools {
- public static Object newInstance(String className) throws ClassNotFoundException, InstantiationException,
- IllegalAccessException {
- ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
- try {
- Thread.currentThread().setContextClassLoader(HadoopTools.class.getClassLoader());
- Class<?> clazz = Class.forName(className, true, HadoopTools.class.getClassLoader());
- return newInstance(clazz);
- } finally {
- Thread.currentThread().setContextClassLoader(ctxCL);
- }
- }
-
- public static Object newInstance(Class<?> clazz) throws InstantiationException, IllegalAccessException {
- return clazz.newInstance();
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java
deleted file mode 100644
index 4892935..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.util.BitSet;
-
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.hyracks.api.comm.IFrameReader;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.IPartitionCollector;
-import org.apache.hyracks.api.comm.IPartitionWriterFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
-import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
-import org.apache.hyracks.dataflow.std.connectors.PartitionDataWriter;
-
-public class HashPartitioningShuffleConnectorDescriptor extends AbstractMToNConnectorDescriptor {
- private static final long serialVersionUID = 1L;
-
- private final MarshalledWritable<Configuration> mConfig;
-
- public HashPartitioningShuffleConnectorDescriptor(IConnectorDescriptorRegistry spec, MarshalledWritable<Configuration> mConfig) {
- super(spec);
- this.mConfig = mConfig;
- }
-
- @Override
- public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
- IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
- throws HyracksDataException {
- HadoopHelper helper = new HadoopHelper(mConfig);
- ITuplePartitionComputerFactory tpcf = helper.getTuplePartitionComputer();
- return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner());
- }
-
- @Override
- public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
- int receiverIndex, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
- BitSet expectedPartitions = new BitSet();
- expectedPartitions.set(0, nProducerPartitions);
- NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
- expectedPartitions);
- IFrameReader frameReader = new ShuffleFrameReader(ctx, channelReader, mConfig);
- return new PartitionCollector(ctx, getConnectorId(), receiverIndex, expectedPartitions, frameReader,
- channelReader);
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java
deleted file mode 100644
index 9cafaea..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IInputSplitProvider {
- public InputSplit next() throws HyracksDataException;
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java
deleted file mode 100644
index 87f7cd7..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.io.Serializable;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IInputSplitProviderFactory extends Serializable {
- public IInputSplitProvider createInputSplitProvider(int id) throws HyracksDataException;
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java
deleted file mode 100644
index d0e6ce2..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-public class InputFileSplit extends InputSplit implements Writable {
- private Path file;
- private long start;
- private long length;
- private int blockId;
- private String[] hosts;
- private long scheduleTime;
-
- public InputFileSplit() {
- }
-
- public InputFileSplit(int blockId, Path file, long start, long length, String[] hosts, long schedule_time) {
- this.blockId = blockId;
- this.file = file;
- this.start = start;
- this.length = length;
- this.hosts = hosts;
- this.scheduleTime = schedule_time;
- }
-
- public int blockId() {
- return blockId;
- }
-
- public long scheduleTime() {
- return this.scheduleTime;
- }
-
- public Path getPath() {
- return file;
- }
-
- /** The position of the first byte in the file to process. */
- public long getStart() {
- return start;
- }
-
- /** The number of bytes in the file to process. */
- @Override
- public long getLength() {
- return length;
- }
-
- @Override
- public String toString() {
- return file + ":" + start + "+" + length;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, file.toString());
- out.writeLong(start);
- out.writeLong(length);
- out.writeInt(blockId);
- out.writeLong(this.scheduleTime);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- file = new Path(Text.readString(in));
- start = in.readLong();
- length = in.readLong();
- hosts = null;
- this.blockId = in.readInt();
- this.scheduleTime = in.readLong();
- }
-
- @Override
- public String[] getLocations() throws IOException {
- if (this.hosts == null) {
- return new String[] {};
- } else {
- return this.hosts;
- }
- }
-
- public FileSplit toFileSplit() {
- return new FileSplit(file, start, length, hosts);
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/KVIterator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/KVIterator.java
deleted file mode 100644
index 3f02279..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/KVIterator.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.mapred.RawKeyValueIterator;
-import org.apache.hadoop.util.Progress;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class KVIterator implements RawKeyValueIterator {
- private final HadoopHelper helper;
- private FrameTupleAccessor accessor;
- private DataInputBuffer kBuffer;
- private DataInputBuffer vBuffer;
- private List<IFrame> buffers;
- private int bSize;
- private int bPtr;
- private int tIdx;
- private boolean eog;
-
- public KVIterator(HadoopHelper helper, RecordDescriptor recordDescriptor) {
- this.helper = helper;
- accessor = new FrameTupleAccessor(recordDescriptor);
- kBuffer = new DataInputBuffer();
- vBuffer = new DataInputBuffer();
- }
-
- void reset(List<IFrame> buffers, int bSize) {
- this.buffers = buffers;
- this.bSize = bSize;
- bPtr = 0;
- tIdx = 0;
- eog = false;
- if (bSize > 0) {
- accessor.reset(buffers.get(0).getBuffer());
- tIdx = -1;
- } else {
- eog = true;
- }
- }
-
- @Override
- public DataInputBuffer getKey() throws IOException {
- return kBuffer;
- }
-
- @Override
- public DataInputBuffer getValue() throws IOException {
- return vBuffer;
- }
-
- @Override
- public boolean next() throws IOException {
- while (true) {
- if (eog) {
- return false;
- }
- ++tIdx;
- if (accessor.getTupleCount() <= tIdx) {
- ++bPtr;
- if (bPtr >= bSize) {
- eog = true;
- continue;
- }
- tIdx = -1;
- accessor.reset(buffers.get(bPtr).getBuffer());
- continue;
- }
- kBuffer.reset(accessor.getBuffer().array(),
- accessor.getAbsoluteFieldStartOffset(tIdx, helper.KEY_FIELD_INDEX),
- accessor.getFieldLength(tIdx, helper.KEY_FIELD_INDEX));
- vBuffer.reset(accessor.getBuffer().array(),
- accessor.getAbsoluteFieldStartOffset(tIdx, helper.KEY_FIELD_INDEX),
- accessor.getFieldLength(tIdx, helper.VALUE_FIELD_INDEX));
- break;
- }
- return true;
- }
-
- @Override
- public void close() throws IOException {
-
- }
-
- @Override
- public Progress getProgress() {
- return null;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
deleted file mode 100644
index 85cd34d..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.hadoop.util.MRContextUtil;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.sort.Algorithm;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortRunMerger;
-
-public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable>
- extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
- private final int jobId;
- private final MarshalledWritable<Configuration> config;
- private final IInputSplitProviderFactory factory;
-
- public MapperOperatorDescriptor(IOperatorDescriptorRegistry spec, int jobId,
- MarshalledWritable<Configuration> config, IInputSplitProviderFactory factory) throws HyracksDataException {
- super(spec, 0, 1);
- this.jobId = jobId;
- this.config = config;
- this.factory = factory;
- HadoopHelper helper = new HadoopHelper(config);
- recordDescriptors[0] = helper.getMapOutputRecordDescriptor();
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
- throws HyracksDataException {
- final HadoopHelper helper = new HadoopHelper(config);
- final Configuration conf = helper.getConfiguration();
- final Mapper<K1, V1, K2, V2> mapper = helper.getMapper();
- final InputFormat<K1, V1> inputFormat = helper.getInputFormat();
- final IInputSplitProvider isp = factory.createInputSplitProvider(partition);
- final TaskAttemptID taId = new TaskAttemptID("foo", jobId, true, partition, 0);
- final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
-
- final int framesLimit = helper.getSortFrameLimit(ctx);
- final IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
-
- class SortingRecordWriter extends RecordWriter<K2, V2> {
- private final ArrayTupleBuilder tb;
- private final IFrame frame;
- private final FrameTupleAppender fta;
- private ExternalSortRunGenerator runGen;
- private int blockId;
-
- public SortingRecordWriter() throws HyracksDataException {
- tb = new ArrayTupleBuilder(2);
- frame = new VSizeFrame(ctx);
- fta = new FrameTupleAppender(frame);
- }
-
- public void initBlock(int blockId) throws HyracksDataException {
- runGen = new ExternalSortRunGenerator(ctx, new int[] { 0 }, null, comparatorFactories,
- helper.getMapOutputRecordDescriptorWithoutExtraFields(), Algorithm.MERGE_SORT, framesLimit);
- this.blockId = blockId;
- }
-
- @Override
- public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
- }
-
- @Override
- public void write(K2 key, V2 value) throws IOException, InterruptedException {
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- key.write(dos);
- tb.addFieldEndOffset();
- value.write(dos);
- tb.addFieldEndOffset();
- if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- runGen.nextFrame(frame.getBuffer());
- fta.reset(frame, true);
- if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size ("
- + frame.getBuffer().capacity() + ")");
- }
- }
- }
-
- public void sortAndFlushBlock(final IFrameWriter writer) throws HyracksDataException {
- if (fta.getTupleCount() > 0) {
- runGen.nextFrame(frame.getBuffer());
- fta.reset(frame, true);
- }
- runGen.close();
- IFrameWriter delegatingWriter = new IFrameWriter() {
- private final FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
- private final FrameTupleAccessor fta = new FrameTupleAccessor(
- helper.getMapOutputRecordDescriptorWithoutExtraFields());
- private final ArrayTupleBuilder tb = new ArrayTupleBuilder(3);
-
- @Override
- public void open() throws HyracksDataException {
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- fta.reset(buffer);
- int n = fta.getTupleCount();
- for (int i = 0; i < n; ++i) {
- tb.reset();
- tb.addField(fta, i, 0);
- tb.addField(fta, i, 1);
- try {
- tb.getDataOutput().writeInt(blockId);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- tb.addFieldEndOffset();
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- appender.write(writer, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
- }
- }
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- appender.write(writer, true);
- }
-
- @Override
- public void fail() throws HyracksDataException {
- // TODO Auto-generated method stub
-
- }
- };
- if (helper.hasCombiner()) {
- Reducer<K2, V2, K2, V2> combiner = helper.getCombiner();
- TaskAttemptID ctaId = new TaskAttemptID("foo", jobId, true, partition, 0);
- TaskAttemptContext ctaskAttemptContext = helper.createTaskAttemptContext(taId);
- final IFrameWriter outputWriter = delegatingWriter;
- RecordWriter<K2, V2> recordWriter = new RecordWriter<K2, V2>() {
- private final FrameTupleAppender fta = new FrameTupleAppender(new VSizeFrame(ctx));
- private final ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
-
- {
- outputWriter.open();
- }
-
- @Override
- public void write(K2 key, V2 value) throws IOException, InterruptedException {
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- key.write(dos);
- tb.addFieldEndOffset();
- value.write(dos);
- tb.addFieldEndOffset();
- if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- fta.write(outputWriter, true);
- if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
- }
- }
- }
-
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- fta.write(outputWriter, true);
- }
- };
- delegatingWriter = new ReduceWriter<K2, V2, K2, V2>(ctx, helper,
- new int[] { HadoopHelper.KEY_FIELD_INDEX }, helper.getGroupingComparatorFactories(),
- helper.getMapOutputRecordDescriptorWithoutExtraFields(), combiner, recordWriter, ctaId,
- ctaskAttemptContext);
- }
- IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getSorter(), runGen.getRuns(),
- new int[] { 0 }, comparators, null, helper.getMapOutputRecordDescriptorWithoutExtraFields(),
- framesLimit, delegatingWriter);
- merger.process();
- }
- }
-
- return new AbstractUnaryOutputSourceOperatorNodePushable() {
- @SuppressWarnings("unchecked")
- @Override
- public void initialize() throws HyracksDataException {
- try {
- writer.open();
- SortingRecordWriter recordWriter = new SortingRecordWriter();
- InputSplit split = null;
- int blockId = 0;
- while ((split = isp.next()) != null) {
- try {
- RecordReader<K1, V1> recordReader = inputFormat.createRecordReader(split,
- taskAttemptContext);
- ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
- try {
- Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
- recordReader.initialize(split, taskAttemptContext);
- } finally {
- Thread.currentThread().setContextClassLoader(ctxCL);
- }
- recordWriter.initBlock(blockId);
- Mapper<K1, V1, K2, V2>.Context mCtx = new MRContextUtil().createMapContext(conf, taId,
- recordReader, recordWriter, null, null, split);
- mapper.run(mCtx);
- recordReader.close();
- recordWriter.sortAndFlushBlock(writer);
- ++blockId;
- } catch (IOException e) {
- throw new HyracksDataException(e);
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- } catch (Throwable th) {
- writer.fail();
- throw th;
- } finally {
- writer.close();
- }
- }
- };
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java
deleted file mode 100644
index df53c39..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.hadoop.io.Writable;
-
-public class MarshalledWritable<T extends Writable> implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private byte[] bytes;
-
- public MarshalledWritable() {
- }
-
- public void set(T o) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- dos.writeUTF(o.getClass().getName());
- o.write(dos);
- dos.close();
- bytes = baos.toByteArray();
- }
-
- public T get() throws Exception {
- ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
- DataInputStream dis = new DataInputStream(bais);
- String className = dis.readUTF();
- T o = (T) HadoopTools.newInstance(className);
- o.readFields(dis);
- return o;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
deleted file mode 100644
index e64978f..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.counters.GenericCounter;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.hadoop.util.MRContextUtil;
-
-public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
- private final IHyracksTaskContext ctx;
- private final HadoopHelper helper;
- private final int[] groupFields;
- private final FrameTupleAccessor accessor0;
- private final FrameTupleAccessor accessor1;
- private final IFrame copyFrame;
- private final IBinaryComparator[] comparators;
- private final KVIterator kvi;
- private final Reducer<K2, V2, K3, V3> reducer;
- private final RecordWriter<K3, V3> recordWriter;
- private final TaskAttemptID taId;
- private final TaskAttemptContext taskAttemptContext;
-
- private boolean first;
- private boolean groupStarted;
- private List<IFrame> group;
- private int bPtr;
- private FrameTupleAppender fta;
- private Counter keyCounter;
- private Counter valueCounter;
-
- public ReduceWriter(IHyracksTaskContext ctx, HadoopHelper helper, int[] groupFields,
- IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
- Reducer<K2, V2, K3, V3> reducer, RecordWriter<K3, V3> recordWriter, TaskAttemptID taId,
- TaskAttemptContext taskAttemptContext) throws HyracksDataException {
- this.ctx = ctx;
- this.helper = helper;
- this.groupFields = groupFields;
- accessor0 = new FrameTupleAccessor(recordDescriptor);
- accessor1 = new FrameTupleAccessor(recordDescriptor);
- copyFrame = new VSizeFrame(ctx);
- accessor1.reset(copyFrame.getBuffer());
- comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- this.reducer = reducer;
- this.recordWriter = recordWriter;
- this.taId = taId;
- this.taskAttemptContext = taskAttemptContext;
-
- kvi = new KVIterator(helper, recordDescriptor);
- }
-
- @Override
- public void open() throws HyracksDataException {
- first = true;
- groupStarted = false;
- group = new ArrayList<>();
- bPtr = 0;
- group.add(new VSizeFrame(ctx));
- fta = new FrameTupleAppender();
- keyCounter = new GenericCounter();
- valueCounter = new GenericCounter();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor0.reset(buffer);
- int nTuples = accessor0.getTupleCount();
- for (int i = 0; i < nTuples; ++i) {
- if (first) {
- groupInit();
- first = false;
- } else {
- if (i == 0) {
- accessor1.reset(copyFrame.getBuffer());
- switchGroupIfRequired(accessor1, accessor1.getTupleCount() - 1, accessor0, i);
- } else {
- switchGroupIfRequired(accessor0, i - 1, accessor0, i);
- }
- }
- accumulate(accessor0, i);
- }
- copyFrame.ensureFrameSize(buffer.capacity());
- FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
- }
-
- private void accumulate(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
- if (!fta.append(accessor, tIndex)) {
- ++bPtr;
- if (group.size() <= bPtr) {
- group.add(new VSizeFrame(ctx));
- }
- fta.reset(group.get(bPtr), true);
- if (!fta.append(accessor, tIndex)) {
- throw new HyracksDataException("Record size ("
- + (accessor.getTupleEndOffset(tIndex) - accessor.getTupleStartOffset(tIndex))
- + ") larger than frame size (" + group.get(bPtr).getBuffer().capacity() + ")");
- }
- }
- }
-
- private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
- FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
- if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
- reduce();
- groupInit();
- }
- }
-
- private void groupInit() throws HyracksDataException {
- groupStarted = true;
- bPtr = 0;
- fta.reset(group.get(0), true);
- }
-
- private void reduce() throws HyracksDataException {
- kvi.reset(group, bPtr + 1);
- try {
- Reducer<K2, V2, K3, V3>.Context rCtx = new MRContextUtil().createReduceContext(helper.getConfiguration(),
- taId, kvi, keyCounter, valueCounter, recordWriter, null, null, (RawComparator<K2>) helper
- .getRawGroupingComparator(), (Class<K2>) helper.getJob().getMapOutputKeyClass(),
- (Class<V2>) helper.getJob().getMapOutputValueClass());
- reducer.run(rCtx);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- groupStarted = false;
- }
-
- private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx)
- throws HyracksDataException {
- for (int i = 0; i < comparators.length; ++i) {
- int fIdx = groupFields[i];
- int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
- int l1 = a1.getFieldLength(t1Idx, fIdx);
- int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
- int l2 = a2.getFieldLength(t2Idx, fIdx);
- if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (groupStarted) {
- reduce();
- }
- try {
- recordWriter.close(taskAttemptContext);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java
deleted file mode 100644
index d1bfbf7..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-
-public class ReducerOperatorDescriptor<K2 extends Writable, V2 extends Writable, K3 extends Writable, V3 extends Writable>
- extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
-
- private final int jobId;
-
- private MarshalledWritable<Configuration> mConfig;
-
- public ReducerOperatorDescriptor(IOperatorDescriptorRegistry spec, int jobId, MarshalledWritable<Configuration> mConfig) {
- super(spec, 1, 0);
- this.jobId = jobId;
- this.mConfig = mConfig;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- final HadoopHelper helper = new HadoopHelper(mConfig);
- final Reducer<K2, V2, K3, V3> reducer = helper.getReducer();
- final RecordDescriptor recordDescriptor = helper.getMapOutputRecordDescriptor();
- final int[] groupFields = helper.getSortFields();
- IBinaryComparatorFactory[] groupingComparators = helper.getGroupingComparatorFactories();
-
- final TaskAttemptID taId = new TaskAttemptID("foo", jobId, false, partition, 0);
- final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
- final RecordWriter recordWriter;
- try {
- recordWriter = helper.getOutputFormat().getRecordWriter(taskAttemptContext);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
-
- final ReduceWriter<K2, V2, K3, V3> rw = new ReduceWriter<K2, V2, K3, V3>(ctx, helper, groupFields,
- groupingComparators, recordDescriptor, reducer, recordWriter, taId, taskAttemptContext);
-
- return new AbstractUnaryInputSinkOperatorNodePushable() {
- @Override
- public void open() throws HyracksDataException {
- rw.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- rw.nextFrame(buffer);
- }
-
- @Override
- public void close() throws HyracksDataException {
- rw.close();
- }
-
- @Override
- public void fail() throws HyracksDataException {
- }
- };
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
deleted file mode 100644
index fb9c6fb..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.mapreduce;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hyracks.api.comm.FrameHelper;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameReader;
-import org.apache.hyracks.api.comm.NoShrinkVSizeFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortRunMerger;
-import org.apache.hyracks.dataflow.std.structures.RunAndMaxFrameSizePair;
-
-public class ShuffleFrameReader implements IFrameReader {
- private final IHyracksTaskContext ctx;
- private final NonDeterministicChannelReader channelReader;
- private final HadoopHelper helper;
- private final RecordDescriptor recordDescriptor;
- private final IFrame vframe;
- private List<RunFileWriter> runFileWriters;
- private List<Integer> runFileMaxFrameSize;
- private RunFileReader reader;
-
- public ShuffleFrameReader(IHyracksTaskContext ctx, NonDeterministicChannelReader channelReader,
- MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
- this.ctx = ctx;
- this.channelReader = channelReader;
- this.helper = new HadoopHelper(mConfig);
- this.recordDescriptor = helper.getMapOutputRecordDescriptor();
- this.vframe = new NoShrinkVSizeFrame(ctx);
- }
-
- @Override
- public void open() throws HyracksDataException {
- channelReader.open();
- int nSenders = channelReader.getSenderPartitionCount();
- runFileWriters = new ArrayList<RunFileWriter>();
- runFileMaxFrameSize = new ArrayList<>();
- RunInfo[] infos = new RunInfo[nSenders];
- FrameTupleAccessor accessor = new FrameTupleAccessor(recordDescriptor);
- while (true) {
- int entry = channelReader.findNextSender();
- if (entry < 0) {
- break;
- }
- RunInfo info = infos[entry];
- ByteBuffer netBuffer = channelReader.getNextBuffer(entry);
- netBuffer.clear();
- int nBlocks = FrameHelper.deserializeNumOfMinFrame(netBuffer);
-
- if (nBlocks > 1) {
- netBuffer = getCompleteBuffer(nBlocks, netBuffer, entry);
- }
-
- accessor.reset(netBuffer, 0, netBuffer.limit());
- int nTuples = accessor.getTupleCount();
- for (int i = 0; i < nTuples; ++i) {
- int tBlockId = IntegerPointable.getInteger(accessor.getBuffer().array(),
- accessor.getAbsoluteFieldStartOffset(i, HadoopHelper.BLOCKID_FIELD_INDEX));
- if (info == null) {
- info = new RunInfo();
- info.reset(tBlockId);
- infos[entry] = info;
- } else if (info.blockId != tBlockId) {
- info.close();
- info.reset(tBlockId);
- }
- info.write(accessor, i);
- }
-
- if (nBlocks == 1) {
- channelReader.recycleBuffer(entry, netBuffer);
- }
- }
- for (int i = 0; i < infos.length; ++i) {
- RunInfo info = infos[i];
- if (info != null) {
- info.close();
- }
- }
-
- FileReference outFile = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
- int framesLimit = helper.getSortFrameLimit(ctx);
- IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
- IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- List<RunAndMaxFrameSizePair> runs = new LinkedList<>();
- for (int i = 0; i < runFileWriters.size(); i++) {
- runs.add(new RunAndMaxFrameSizePair(runFileWriters.get(i).createDeleteOnCloseReader(), runFileMaxFrameSize
- .get(i)));
- }
- RunFileWriter rfw = new RunFileWriter(outFile, ctx.getIOManager());
- ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 }, comparators, null,
- recordDescriptor, framesLimit, rfw);
- merger.process();
-
- reader = rfw.createDeleteOnCloseReader();
- reader.open();
- }
-
- private ByteBuffer getCompleteBuffer(int nBlocks, ByteBuffer netBuffer, int entry) throws HyracksDataException {
- vframe.reset();
- vframe.ensureFrameSize(vframe.getMinSize() * nBlocks);
- FrameUtils.copyWholeFrame(netBuffer, vframe.getBuffer());
- channelReader.recycleBuffer(entry, netBuffer);
- for (int i = 1; i < nBlocks; ++i) {
- netBuffer = channelReader.getNextBuffer(entry);
- netBuffer.clear();
- vframe.getBuffer().put(netBuffer);
- channelReader.recycleBuffer(entry, netBuffer);
- }
- if (vframe.getBuffer().hasRemaining()) { // bigger frame
- FrameHelper.clearRemainingFrame(vframe.getBuffer(), vframe.getBuffer().position());
- }
- vframe.getBuffer().flip();
- return vframe.getBuffer();
- }
-
- @Override
- public boolean nextFrame(IFrame frame) throws HyracksDataException {
- return reader.nextFrame(frame);
- }
-
- @Override
- public void close() throws HyracksDataException {
- reader.close();
- }
-
- private class RunInfo {
- private final IFrame buffer;
- private final FrameTupleAppender fta;
-
- private FileReference file;
- private RunFileWriter rfw;
- private int blockId;
- private int maxFrameSize = ctx.getInitialFrameSize();
-
- public RunInfo() throws HyracksDataException {
- buffer = new VSizeFrame(ctx);
- fta = new FrameTupleAppender();
- }
-
- public void reset(int blockId) throws HyracksDataException {
- this.blockId = blockId;
- this.maxFrameSize = ctx.getInitialFrameSize();
- fta.reset(buffer, true);
- try {
- file = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
- rfw = new RunFileWriter(file, ctx.getIOManager());
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- public void write(FrameTupleAccessor accessor, int tIdx) throws HyracksDataException {
- if (!fta.append(accessor, tIdx)) {
- flush();
- if (!fta.append(accessor, tIdx)) {
- throw new HyracksDataException("Record size ("
- + (accessor.getTupleEndOffset(tIdx) - accessor.getTupleStartOffset(tIdx))
- + ") larger than frame size (" + fta.getBuffer().capacity() + ")");
- }
- }
- }
-
- public void close() throws HyracksDataException {
- flush();
- rfw.close();
- runFileWriters.add(rfw);
- runFileMaxFrameSize.add(maxFrameSize);
- }
-
- private void flush() throws HyracksDataException {
- if (fta.getTupleCount() <= 0) {
- return;
- }
- maxFrameSize = buffer.getFrameSize() > maxFrameSize ? buffer.getFrameSize() : maxFrameSize;
- rfw.nextFrame((ByteBuffer) buffer.getBuffer().clear());
- fta.reset(buffer, true);
- }
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
deleted file mode 100644
index 438b817..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.util;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-
-public class ClasspathBasedHadoopClassFactory implements IHadoopClassFactory {
-
- @Override
- public Object createMapper(String mapClassName, JobConf conf) throws Exception {
- Class clazz = loadClass(mapClassName);
- return ReflectionUtils.newInstance(clazz, conf);
- }
-
- @Override
- public Object createReducer(String reduceClassName, JobConf conf) throws Exception {
- Class clazz = loadClass(reduceClassName);
- return ReflectionUtils.newInstance(clazz, conf);
- }
-
- @Override
- public Class loadClass(String className) throws Exception {
- Class clazz = Class.forName(className);
- return clazz;
- }
-
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/DatatypeHelper.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/DatatypeHelper.java
deleted file mode 100644
index ff4d282..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/DatatypeHelper.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.util;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-@SuppressWarnings("deprecation")
-public class DatatypeHelper {
- private static final class WritableSerializerDeserializer<T extends Writable> implements ISerializerDeserializer<T> {
- private static final long serialVersionUID = 1L;
-
- private Class<T> clazz;
-
- private WritableSerializerDeserializer(Class<T> clazz) {
- this.clazz = clazz;
- }
-
- private T createInstance() throws HyracksDataException {
- // TODO remove "if", create a new WritableInstanceOperations class
- // that deals with Writables that don't have public constructors
- if (NullWritable.class.equals(clazz)) {
- return (T) NullWritable.get();
- }
- try {
- return clazz.newInstance();
- } catch (InstantiationException e) {
- throw new HyracksDataException(e);
- } catch (IllegalAccessException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public T deserialize(DataInput in) throws HyracksDataException {
- T o = createInstance();
- try {
- o.readFields(in);
- } catch (IOException e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
- }
- return o;
- }
-
- @Override
- public void serialize(T instance, DataOutput out) throws HyracksDataException {
- try {
- instance.write(out);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
- }
-
- public static ISerializerDeserializer<? extends Writable> createSerializerDeserializer(
- Class<? extends Writable> fClass) {
- return new WritableSerializerDeserializer(fClass);
- }
-
- public static RecordDescriptor createKeyValueRecordDescriptor(Class<? extends Writable> keyClass,
- Class<? extends Writable> valueClass) {
- ISerializerDeserializer[] fields = new ISerializerDeserializer[2];
- fields[0] = createSerializerDeserializer(keyClass);
- fields[1] = createSerializerDeserializer(valueClass);
- return new RecordDescriptor(fields);
- }
-
- public static RecordDescriptor createOneFieldRecordDescriptor(Class<? extends Writable> fieldClass) {
- ISerializerDeserializer[] fields = new ISerializerDeserializer[1];
- fields[0] = createSerializerDeserializer(fieldClass);
- return new RecordDescriptor(fields);
- }
-
- public static JobConf map2JobConf(Map<String, String> jobConfMap) {
- JobConf jobConf;
- synchronized (Configuration.class) {
- jobConf = new JobConf();
- for (Entry<String, String> entry : jobConfMap.entrySet()) {
- jobConf.set(entry.getKey(), entry.getValue());
- }
- }
- return jobConf;
- }
-
- public static Map<String, String> jobConf2Map(JobConf jobConf) {
- Map<String, String> jobConfMap = new HashMap<String, String>();
- for (Entry<String, String> entry : jobConf) {
- jobConfMap.put(entry.getKey(), entry.getValue());
- }
- return jobConfMap;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/DuplicateKeyMapper.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/DuplicateKeyMapper.java
deleted file mode 100644
index ab32ade..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/DuplicateKeyMapper.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.util;
-
-import java.util.Properties;
-
-import org.apache.hyracks.api.dataflow.IDataWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.map.IDeserializedMapper;
-
-public class DuplicateKeyMapper implements IDeserializedMapper {
-
- @Override
- public void map(Object[] data, IDataWriter<Object[]> writer) throws HyracksDataException {
- writer.writeData(new Object[] { data[0], data[1], data[0] });
-
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
deleted file mode 100644
index 07c4c89..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.util;
-
-import java.io.Serializable;
-
-import org.apache.hadoop.mapred.JobConf;
-
-public interface IHadoopClassFactory extends Serializable {
-
- public Object createMapper(String mapClassName, JobConf conf) throws Exception;
-
- public Object createReducer(String reduceClassName, JobConf conf) throws Exception;
-
- public Class loadClass(String className) throws Exception;
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/InputSplitsProxy.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
deleted file mode 100644
index eed2885..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.util;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-
-public class InputSplitsProxy implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final Class[] isClasses;
- private final byte[] bytes;
-
- public InputSplitsProxy(JobConf conf, Object[] inputSplits) throws IOException {
- isClasses = new Class[inputSplits.length];
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- if (conf.getUseNewMapper()) {
- for (int i = 0; i < inputSplits.length; ++i) {
- isClasses[i] = ((org.apache.hadoop.mapreduce.InputSplit) inputSplits[i]).getClass();
- ((Writable) inputSplits[i]).write(dos);
- }
- } else {
- for (int i = 0; i < inputSplits.length; ++i) {
- isClasses[i] = ((org.apache.hadoop.mapred.InputSplit) inputSplits[i]).getClass();
- ((Writable) inputSplits[i]).write(dos);
- }
- }
- dos.close();
- bytes = baos.toByteArray();
-
- }
-
- public Object[] toInputSplits(JobConf jobConf) throws InstantiationException, IllegalAccessException, IOException {
- Object[] splits = new Object[isClasses.length];
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
- for (int i = 0; i < splits.length; ++i) {
- splits[i] = ReflectionUtils.newInstance(isClasses[i], jobConf);
- if (jobConf.getUseNewMapper()) {
- ((Writable) splits[i]).readFields(dis);
- } else {
- ((Writable) splits[i]).readFields(dis);
- }
- }
- return splits;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/MRContextUtil.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/MRContextUtil.java
deleted file mode 100644
index 047a6fc..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/MRContextUtil.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.util;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.mapred.RawKeyValueIterator;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.StatusReporter;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
-import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
-import org.apache.hadoop.mapreduce.task.MapContextImpl;
-import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * The wrapper to generate TaskTattemptContext
- */
-public class MRContextUtil {
- //public static Reducer.Context = create
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public Mapper.Context createMapContext(Configuration conf, TaskAttemptID taskid, RecordReader reader,
- RecordWriter writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) {
- return new WrappedMapper().getMapContext(new MapContextImpl(conf, taskid, reader, writer, committer, reporter,
- split));
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public Reducer.Context createReduceContext(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input,
- Counter inputKeyCounter, Counter inputValueCounter, RecordWriter output, OutputCommitter committer,
- StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass)
- throws HyracksDataException {
- try {
- return new WrappedReducer().getReducerContext(new ReduceContextImpl(conf, taskid, input, inputKeyCounter,
- inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass));
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/PreappendLongWritableMapper.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/PreappendLongWritableMapper.java
deleted file mode 100644
index ac5aec2..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/PreappendLongWritableMapper.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.hadoop.util;
-
-import java.util.Properties;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-import org.apache.hyracks.api.dataflow.IDataWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.map.IDeserializedMapper;
-
-public class PreappendLongWritableMapper implements IDeserializedMapper {
-
- @Override
- public void map(Object[] data, IDataWriter<Object[]> writer) throws HyracksDataException {
- writer.writeData(new Object[] { new LongWritable(0), new Text(String.valueOf(data[0])) });
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/pom.xml b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/pom.xml
deleted file mode 100644
index a247eff..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/pom.xml
+++ /dev/null
@@ -1,88 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <artifactId>hyracks-yarn-am</artifactId>
- <parent>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-yarn</artifactId>
- <version>0.2.1-SNAPSHOT</version>
- </parent>
-
- <properties>
- <root.dir>${basedir}/../../..</root.dir>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>appassembler-maven-plugin</artifactId>
- <executions>
- <execution>
- <configuration>
- <programs>
- <program>
- <mainClass>org.apache.hyracks.yarn.am.HyracksYarnApplicationMaster</mainClass>
- <name>hyracks-yarn-am</name>
- </program>
- </programs>
- <repositoryLayout>flat</repositoryLayout>
- <repositoryName>lib</repositoryName>
- </configuration>
- <phase>package</phase>
- <goals>
- <goal>assemble</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>2.2-beta-5</version>
- <executions>
- <execution>
- <configuration>
- <descriptors>
- <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
- </descriptors>
- </configuration>
- <phase>package</phase>
- <goals>
- <goal>attached</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>args4j</groupId>
- <artifactId>args4j</artifactId>
- <version>2.0.16</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-yarn-common</artifactId>
- <version>0.2.1-SNAPSHOT</version>
- </dependency>
- </dependencies>
-</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/assembly/binary-assembly.xml b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/assembly/binary-assembly.xml
deleted file mode 100644
index d4a2ea1..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/assembly/binary-assembly.xml
+++ /dev/null
@@ -1,38 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<assembly>
- <id>binary-assembly</id>
- <formats>
- <format>zip</format>
- <format>dir</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>target/appassembler/bin</directory>
- <outputDirectory>bin</outputDirectory>
- <fileMode>0755</fileMode>
- </fileSet>
- <fileSet>
- <directory>target/appassembler/lib</directory>
- <outputDirectory>lib</outputDirectory>
- </fileSet>
- </fileSets>
-</assembly>
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/HyracksYarnApplicationMaster.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/HyracksYarnApplicationMaster.java
deleted file mode 100644
index 86f1539..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/HyracksYarnApplicationMaster.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.am;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.kohsuke.args4j.CmdLineParser;
-
-import org.apache.hyracks.yarn.am.manifest.AbstractProcess;
-import org.apache.hyracks.yarn.am.manifest.ContainerSpecification;
-import org.apache.hyracks.yarn.am.manifest.HyracksCluster;
-import org.apache.hyracks.yarn.am.manifest.ManifestParser;
-import org.apache.hyracks.yarn.am.manifest.NodeController;
-import org.apache.hyracks.yarn.common.protocols.amrm.AMRMConnection;
-
-public class HyracksYarnApplicationMaster {
- private final Options options;
-
- private final Timer timer;
-
- private final List<ResourceRequest> asks;
-
- private final Map<Resource, Set<AskRecord>> resource2AskMap;
-
- private final Map<AbstractProcess, AskRecord> proc2AskMap;
-
- private final AtomicInteger lastResponseId;
-
- private final ApplicationAttemptId appAttemptId;
-
- private YarnConfiguration config;
-
- private AMRMConnection amrmc;
-
- private RegisterApplicationMasterResponse registration;
-
- private HyracksCluster hcManifest;
-
- private HyracksYarnApplicationMaster(Options options) {
- this.options = options;
- timer = new Timer(true);
- asks = new ArrayList<ResourceRequest>();
- resource2AskMap = new HashMap<Resource, Set<AskRecord>>();
- proc2AskMap = new HashMap<AbstractProcess, AskRecord>();
- lastResponseId = new AtomicInteger();
-
- String containerIdStr = System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
- ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
- appAttemptId = containerId.getApplicationAttemptId();
- }
-
- private void run() throws Exception {
- Configuration conf = new Configuration();
- config = new YarnConfiguration(conf);
- amrmc = new AMRMConnection(config);
-
- performRegistration();
- setupHeartbeats();
- parseManifest();
- setupAsks();
- while (true) {
- Thread.sleep(1000);
- }
- }
-
- private synchronized void setupAsks() {
- setupAsk(hcManifest.getClusterController());
- for (NodeController nc : hcManifest.getNodeControllers()) {
- setupAsk(nc);
- }
- }
-
- private void setupAsk(AbstractProcess proc) {
- ContainerSpecification cSpec = proc.getContainerSpecification();
- ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
-
- rsrcRequest.setHostName(cSpec.getHostname());
-
- Priority pri = Records.newRecord(Priority.class);
- pri.setPriority(0);
- rsrcRequest.setPriority(pri);
-
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(cSpec.getMemory());
- rsrcRequest.setCapability(capability);
-
- rsrcRequest.setNumContainers(1);
-
- AskRecord ar = new AskRecord();
- ar.req = rsrcRequest;
- ar.proc = proc;
-
- Set<AskRecord> arSet = resource2AskMap.get(capability);
- if (arSet == null) {
- arSet = new HashSet<AskRecord>();
- resource2AskMap.put(capability, arSet);
- }
- arSet.add(ar);
- proc2AskMap.put(proc, ar);
-
- System.err.println(proc + " -> [" + rsrcRequest.getHostName() + ", " + rsrcRequest.getNumContainers() + ", "
- + rsrcRequest.getPriority() + ", " + rsrcRequest.getCapability().getMemory() + "]");
-
- asks.add(rsrcRequest);
- }
-
- private void parseManifest() throws Exception {
- String str = FileUtils.readFileToString(new File("manifest.xml"));
- hcManifest = ManifestParser.parse(str);
- }
-
- private void setupHeartbeats() {
- long heartbeatInterval = config.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
- YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
- System.err.println("Heartbeat interval: " + heartbeatInterval);
- heartbeatInterval = Math.min(heartbeatInterval, 1000);
- System.err.println("Heartbeat interval: " + heartbeatInterval);
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- AllocateRequest hb = Records.newRecord(AllocateRequest.class);
- populateAllocateRequest(hb);
- hb.setApplicationAttemptId(amrmc.getApplicationAttemptId());
- hb.setProgress(0);
- try {
- AllocateResponse allocateResponse = amrmc.getAMRMProtocol().allocate(hb);
- List<Container> allocatedContainers = allocateResponse.getAMResponse().getAllocatedContainers();
- List<ContainerStatus> completedContainers = allocateResponse.getAMResponse()
- .getCompletedContainersStatuses();
- processAllocation(allocatedContainers, completedContainers);
- } catch (YarnRemoteException e) {
- e.printStackTrace();
- }
- }
- }, 0, heartbeatInterval);
- }
-
- private synchronized void populateAllocateRequest(AllocateRequest hb) {
- hb.addAllAsks(asks);
- hb.addAllReleases(new ArrayList<ContainerId>());
- hb.setResponseId(lastResponseId.incrementAndGet());
- hb.setApplicationAttemptId(appAttemptId);
- }
-
- private synchronized void processAllocation(List<Container> allocatedContainers,
- List<ContainerStatus> completedContainers) {
- System.err.println(allocatedContainers);
- for (Container c : allocatedContainers) {
- System.err.println("Got container: " + c.getContainerStatus());
- NodeId nodeId = c.getNodeId();
- Resource resource = c.getResource();
-
- Set<AskRecord> arSet = resource2AskMap.get(resource);
- boolean found = false;
- if (arSet != null) {
- AskRecord wildcardMatch = null;
- AskRecord nameMatch = null;
- for (AskRecord ar : arSet) {
- ResourceRequest req = ar.req;
- if (ar.allocation == null) {
- if ("*".equals(req.getHostName()) && wildcardMatch == null) {
- wildcardMatch = ar;
- }
- if (req.getHostName().equals(nodeId.getHost()) && nameMatch == null) {
- nameMatch = ar;
- break;
- }
- }
- }
- if (nameMatch != null) {
- found = true;
- nameMatch.allocation = c;
- } else if (wildcardMatch != null) {
- found = true;
- wildcardMatch.allocation = c;
- }
- }
- if (!found) {
- System.err.println("Unknown request satisfied: " + resource);
- }
- }
- }
-
- private void performRegistration() throws YarnRemoteException {
- RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
- appMasterRequest.setApplicationAttemptId(amrmc.getApplicationAttemptId());
-
- registration = amrmc.getAMRMProtocol().registerApplicationMaster(appMasterRequest);
- }
-
- public static void main(String[] args) throws Exception {
- Options options = new Options();
- CmdLineParser parser = new CmdLineParser(options);
- try {
- parser.parseArgument(args);
- } catch (Exception e) {
- parser.printUsage(System.err);
- return;
- }
- new HyracksYarnApplicationMaster(options).run();
- }
-
- private static class Options {
- }
-
- private static class AskRecord {
- ResourceRequest req;
- AbstractProcess proc;
- Container allocation;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/AbstractProcess.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/AbstractProcess.java
deleted file mode 100644
index 5c877ad..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/AbstractProcess.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.am.manifest;
-
-public abstract class AbstractProcess {
- protected ContainerSpecification cSpec;
-
- protected String cmdLineArgs;
-
- public void setContainerSpecification(ContainerSpecification cSpec) {
- this.cSpec = cSpec;
- }
-
- public ContainerSpecification getContainerSpecification() {
- return cSpec;
- }
-
- public void setCommandLineArguments(String cmdLineArgs) {
- this.cmdLineArgs = cmdLineArgs;
- }
-
- public String getCommandLineArguments() {
- return cmdLineArgs;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ClusterController.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ClusterController.java
deleted file mode 100644
index 666b05e..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ClusterController.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.am.manifest;
-
-public class ClusterController extends AbstractProcess {
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ContainerSpecification.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ContainerSpecification.java
deleted file mode 100644
index 919a8bf..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ContainerSpecification.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.am.manifest;
-
-public class ContainerSpecification {
- private String hostname;
-
- private int memory;
-
- public ContainerSpecification() {
- hostname = "*";
- }
-
- public void setHostname(String hostname) {
- this.hostname = hostname;
- }
-
- public String getHostname() {
- return hostname;
- }
-
- public void setMemory(int memory) {
- this.memory = memory;
- }
-
- public int getMemory() {
- return memory;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/HyracksCluster.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/HyracksCluster.java
deleted file mode 100644
index 5536e0f..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/HyracksCluster.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.am.manifest;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class HyracksCluster {
- private String name;
-
- private ClusterController cc;
-
- private List<NodeController> ncs;
-
- public HyracksCluster() {
- ncs = new ArrayList<NodeController>();
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getName() {
- return name;
- }
-
- public void setClusterController(ClusterController cc) {
- this.cc = cc;
- }
-
- public ClusterController getClusterController() {
- return cc;
- }
-
- public void addNodeController(NodeController nc) {
- ncs.add(nc);
- }
-
- public List<NodeController> getNodeControllers() {
- return ncs;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ManifestParser.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ManifestParser.java
deleted file mode 100644
index fd85353..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ManifestParser.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.am.manifest;
-
-import java.io.StringReader;
-
-import org.apache.commons.digester.Digester;
-
-public class ManifestParser {
- public static HyracksCluster parse(String mXML) throws Exception {
- Digester d = createDigester();
- return (HyracksCluster) d.parse(new StringReader(mXML));
- }
-
- private static Digester createDigester() {
- Digester d = new Digester();
- d.setValidating(false);
-
- d.addObjectCreate("hyracks-cluster", HyracksCluster.class);
- d.addSetProperties("hyracks-cluster");
-
- d.addObjectCreate("hyracks-cluster/cluster-controller", ClusterController.class);
- d.addSetProperties("hyracks-cluster/cluster-controller");
- d.addSetNext("hyracks-cluster/cluster-controller", "setClusterController");
-
- d.addObjectCreate("hyracks-cluster/node-controllers/node-controller", NodeController.class);
- d.addSetProperties("hyracks-cluster/node-controllers/node-controller");
- d.addSetNext("hyracks-cluster/node-controllers/node-controller", "addNodeController");
-
- d.addObjectCreate("*/container-specification", ContainerSpecification.class);
- d.addSetProperties("*/container-specification");
- d.addSetNext("*/container-specification", "setContainerSpecification");
- return d;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/NodeController.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/NodeController.java
deleted file mode 100644
index 2356bb0..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/NodeController.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.am.manifest;
-
-public class NodeController extends AbstractProcess {
- private String id;
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getId() {
- return id;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/pom.xml b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/pom.xml
deleted file mode 100644
index 6ebadc9..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/pom.xml
+++ /dev/null
@@ -1,99 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <artifactId>hyracks-yarn-client</artifactId>
- <parent>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-yarn</artifactId>
- <version>0.2.1-SNAPSHOT</version>
- </parent>
-
- <properties>
- <root.dir>${basedir}/../../..</root.dir>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>appassembler-maven-plugin</artifactId>
- <executions>
- <execution>
- <configuration>
- <programs>
- <program>
- <mainClass>org.apache.hyracks.yarn.client.LaunchHyracksApplication</mainClass>
- <name>launch-hyracks-application</name>
- </program>
- <program>
- <mainClass>org.apache.hyracks.yarn.client.KillHyracksApplication</mainClass>
- <name>kill-hyracks-application</name>
- </program>
- </programs>
- <repositoryLayout>flat</repositoryLayout>
- <repositoryName>lib</repositoryName>
- </configuration>
- <phase>package</phase>
- <goals>
- <goal>assemble</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>2.2-beta-5</version>
- <executions>
- <execution>
- <configuration>
- <descriptors>
- <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
- </descriptors>
- </configuration>
- <phase>package</phase>
- <goals>
- <goal>attached</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>args4j</groupId>
- <artifactId>args4j</artifactId>
- <version>2.0.16</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-yarn-common</artifactId>
- <version>0.2.1-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-yarn-am</artifactId>
- <version>0.2.1-SNAPSHOT</version>
- <type>zip</type>
- <classifier>binary-assembly</classifier>
- </dependency>
- </dependencies>
-</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/assembly/binary-assembly.xml b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/assembly/binary-assembly.xml
deleted file mode 100644
index 04567a3..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/assembly/binary-assembly.xml
+++ /dev/null
@@ -1,49 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<assembly>
- <id>binary-assembly</id>
- <formats>
- <format>zip</format>
- <format>dir</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>target/appassembler/bin</directory>
- <outputDirectory>bin</outputDirectory>
- <fileMode>0755</fileMode>
- </fileSet>
- <fileSet>
- <directory>target/appassembler/lib</directory>
- <outputDirectory>lib</outputDirectory>
- </fileSet>
- </fileSets>
- <dependencySets>
- <dependencySet>
- <outputDirectory>hyracks-yarn-am</outputDirectory>
- <includes>
- <include>hyracks-yarn-am*</include>
- </includes>
- <unpack>false</unpack>
- <outputFileNameMapping>${artifact.artifactId}.${artifact.extension}</outputFileNameMapping>
- <useTransitiveDependencies>false</useTransitiveDependencies>
- </dependencySet>
- </dependencySets>
-</assembly>
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/org/apache/hyracks/yarn/client/KillHyracksApplication.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/org/apache/hyracks/yarn/client/KillHyracksApplication.java
deleted file mode 100644
index 8f3b68d..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/org/apache/hyracks/yarn/client/KillHyracksApplication.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.client;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
-import org.apache.hyracks.yarn.common.protocols.clientrm.YarnClientRMConnection;
-
-public class KillHyracksApplication {
- private final Options options;
-
- private KillHyracksApplication(Options options) {
- this.options = options;
- }
-
- private void run() throws Exception {
- Configuration conf = new Configuration();
- YarnConfiguration yconf = new YarnConfiguration(conf);
- YarnClientRMConnection crmc = new YarnClientRMConnection(yconf);
- crmc.killApplication(options.appId);
- }
-
- public static void main(String[] args) throws Exception {
- Options options = new Options();
- CmdLineParser parser = new CmdLineParser(options);
- try {
- parser.parseArgument(args);
- } catch (Exception e) {
- parser.printUsage(System.err);
- return;
- }
- new KillHyracksApplication(options).run();
- }
-
- private static class Options {
- @Option(name = "-application-id", required = true, usage = "Application Id")
- String appId;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/org/apache/hyracks/yarn/client/LaunchHyracksApplication.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/org/apache/hyracks/yarn/client/LaunchHyracksApplication.java
deleted file mode 100644
index e5d727f..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/org/apache/hyracks/yarn/client/LaunchHyracksApplication.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.client;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
-import org.apache.hyracks.yarn.common.protocols.clientrm.YarnApplication;
-import org.apache.hyracks.yarn.common.protocols.clientrm.YarnClientRMConnection;
-import org.apache.hyracks.yarn.common.resources.LocalResourceHelper;
-import org.apache.hyracks.yarn.common.resources.ResourceHelper;
-
-public class LaunchHyracksApplication {
- private final Options options;
-
- private LaunchHyracksApplication(Options options) {
- this.options = options;
- }
-
- private void run() throws Exception {
- Configuration conf = new Configuration();
- YarnConfiguration yconf = new YarnConfiguration(conf);
- YarnClientRMConnection crmc = new YarnClientRMConnection(yconf);
-
- YarnApplication app = crmc.createApplication(options.appName);
-
- ContainerLaunchContext clCtx = app.getContainerLaunchContext();
-
- Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-
- File amZipFile = new File(System.getProperty("basedir") + "/hyracks-yarn-am/hyracks-yarn-am.zip");
- localResources.put("archive", LocalResourceHelper.createArchiveResource(conf, amZipFile));
- clCtx.setLocalResources(localResources);
-
- String command = "./archive/bin/hyracks-yarn-am 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
- + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";
-
- List<String> commands = new ArrayList<String>();
- commands.add(command);
- clCtx.setCommands(commands);
-
- clCtx.setResource(ResourceHelper.createMemoryCapability(options.amMemory));
-
- app.submit();
- }
-
- public static void main(String[] args) throws Exception {
- Options options = new Options();
- CmdLineParser parser = new CmdLineParser(options);
- try {
- parser.parseArgument(args);
- } catch (Exception e) {
- parser.printUsage(System.err);
- return;
- }
- new LaunchHyracksApplication(options).run();
- }
-
- private static class Options {
- @Option(name = "-application-name", required = true, usage = "Application Name")
- String appName;
-
- @Option(name = "-am-host", required = false, usage = "Application master host name (default: *). Currently has NO effect")
- String amHostName = "*";
-
- @Option(name = "-am-memory", required = false, usage = "Application Master memory requirements")
- int amMemory = 128;
-
- @Option(name = "-workers", required = true, usage = "Number of worker containers")
- int nWorkers;
-
- @Option(name = "-worker-memory", required = true, usage = "Amount of memory to provide to each worker")
- int workerMemory;
-
- @Option(name = "-extra-jars", required = false, usage = "Other jars that need to be added to the classpath")
- String extraJars = "";
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/pom.xml
deleted file mode 100644
index 43b4cc0..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/pom.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <artifactId>hyracks-yarn-common</artifactId>
- <parent>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-yarn</artifactId>
- <version>0.2.1-SNAPSHOT</version>
- </parent>
-
- <properties>
- <root.dir>${basedir}/../../..</root.dir>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- <version>2.0.0-alpha</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-common</artifactId>
- <version>2.0.0-alpha</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>2.0.0-alpha</version>
- </dependency>
- </dependencies>
-</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/amrm/AMRMConnection.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/amrm/AMRMConnection.java
deleted file mode 100644
index 931f39d..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/amrm/AMRMConnection.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.common.protocols.amrm;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-
-public class AMRMConnection {
- private final YarnConfiguration config;
-
- private final ApplicationAttemptId appAttemptId;
-
- private final AMRMProtocol amrmp;
-
- public AMRMConnection(YarnConfiguration config) {
- this.config = config;
- Map<String, String> envs = System.getenv();
- String containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
- if (containerIdString == null) {
- throw new IllegalArgumentException("ContainerId not set in the environment");
- }
- ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
- appAttemptId = containerId.getApplicationAttemptId();
- InetSocketAddress rmAddress = NetUtils.createSocketAddr(config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
- YarnRPC rpc = YarnRPC.create(config);
-
- amrmp = (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, config);
- }
-
- public ApplicationAttemptId getApplicationAttemptId() {
- return appAttemptId;
- }
-
- public AMRMProtocol getAMRMProtocol() {
- return amrmp;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/clientrm/YarnApplication.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/clientrm/YarnApplication.java
deleted file mode 100644
index 8ea8f21..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/clientrm/YarnApplication.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.common.protocols.clientrm;
-
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.util.Records;
-
-public class YarnApplication {
- private final YarnClientRMConnection crmc;
-
- private ApplicationSubmissionContext appCtx;
-
- private ContainerLaunchContext clCtx;
-
- YarnApplication(YarnClientRMConnection crmc, String appName) throws YarnRemoteException {
- this.crmc = crmc;
- appCtx = Records.newRecord(ApplicationSubmissionContext.class);
- appCtx.setApplicationId(getNewApplicationId(crmc));
- appCtx.setApplicationName(appName);
- clCtx = Records.newRecord(ContainerLaunchContext.class);
- }
-
- public ContainerLaunchContext getContainerLaunchContext() {
- return clCtx;
- }
-
- public void submit() throws YarnRemoteException {
- appCtx.setAMContainerSpec(clCtx);
- SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class);
- appRequest.setApplicationSubmissionContext(appCtx);
- crmc.getClientRMProtocol().submitApplication(appRequest);
- }
-
- private static ApplicationId getNewApplicationId(YarnClientRMConnection crmc) throws YarnRemoteException {
- GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
- GetNewApplicationResponse response = crmc.getClientRMProtocol().getNewApplication(request);
-
- return response.getApplicationId();
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/clientrm/YarnClientRMConnection.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/clientrm/YarnClientRMConnection.java
deleted file mode 100644
index 8ed9a98..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/clientrm/YarnClientRMConnection.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.common.protocols.clientrm;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityInfo;
-import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
-import org.apache.hadoop.yarn.util.Records;
-
-public class YarnClientRMConnection {
- private final YarnConfiguration config;
-
- private final ClientRMProtocol crmp;
-
- public YarnClientRMConnection(YarnConfiguration config) {
- this.config = config;
- InetSocketAddress remoteAddress = NetUtils.createSocketAddr(config.get(YarnConfiguration.RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_ADDRESS));
- Configuration appsManagerServerConf = new Configuration(config);
- appsManagerServerConf.setClass(YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CLIENT_RESOURCEMANAGER,
- ClientRMSecurityInfo.class, SecurityInfo.class);
- YarnRPC rpc = YarnRPC.create(appsManagerServerConf);
- crmp = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, remoteAddress, appsManagerServerConf));
- }
-
- public YarnApplication createApplication(String appName) throws YarnRemoteException {
- return new YarnApplication(this, appName);
- }
-
- public ClientRMProtocol getClientRMProtocol() {
- return crmp;
- }
-
- public void killApplication(String appId) throws Exception {
- KillApplicationRequest killRequest = Records.newRecord(KillApplicationRequest.class);
- ApplicationId aid = Records.newRecord(ApplicationId.class);
- long ts = Long.parseLong(appId.substring(appId.indexOf('_') + 1, appId.lastIndexOf('_')));
- aid.setClusterTimestamp(ts);
- int id = Integer.parseInt(appId.substring(appId.lastIndexOf('_') + 1));
- aid.setId(id);
- killRequest.setApplicationId(aid);
- crmp.forceKillApplication(killRequest);
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/resources/LocalResourceHelper.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/resources/LocalResourceHelper.java
deleted file mode 100644
index c185ef2..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/resources/LocalResourceHelper.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.common.resources;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-
-public class LocalResourceHelper {
- private static LocalResource createLocalResourceFromPath(Configuration config, File path) throws IOException {
- LocalResource lr = Records.newRecord(LocalResource.class);
- URL url = ConverterUtils.getYarnUrlFromPath(FileContext.getFileContext().makeQualified(new Path(path.toURI())));
- lr.setResource(url);
- lr.setVisibility(LocalResourceVisibility.APPLICATION);
- lr.setTimestamp(path.lastModified());
- lr.setSize(path.length());
- return lr;
- }
-
- public static LocalResource createFileResource(Configuration config, File path) throws IOException {
- LocalResource lr = createLocalResourceFromPath(config, path);
- lr.setType(LocalResourceType.FILE);
- return lr;
- }
-
- public static LocalResource createArchiveResource(Configuration config, File path) throws IOException {
- LocalResource lr = createLocalResourceFromPath(config, path);
- lr.setType(LocalResourceType.ARCHIVE);
- return lr;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/resources/ResourceHelper.java b/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/resources/ResourceHelper.java
deleted file mode 100644
index a878dc2..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/resources/ResourceHelper.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.yarn.common.resources;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.Records;
-
-public class ResourceHelper {
- public static Resource createMemoryCapability(int memory) {
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(memory);
- return capability;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-yarn/pom.xml b/hyracks-fullstack/hyracks/hyracks-yarn/pom.xml
deleted file mode 100644
index a86befb..0000000
--- a/hyracks-fullstack/hyracks/hyracks-yarn/pom.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <artifactId>hyracks-yarn</artifactId>
- <packaging>pom</packaging>
- <name>hyracks-yarn</name>
-
- <parent>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks</artifactId>
- <version>0.2.1-SNAPSHOT</version>
- </parent>
-
- <licenses>
- <license>
- <name>Apache License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- <distribution>repo</distribution>
- <comments>A business-friendly OSS license</comments>
- </license>
- </licenses>
-
- <properties>
- <root.dir>${basedir}/../..</root.dir>
- </properties>
-
- <modules>
- <module>hyracks-yarn-common</module>
- <module>hyracks-yarn-client</module>
- <module>hyracks-yarn-am</module>
- </modules>
-</project>