indirect hdfs dependency to hyracks-hdfs-xxx
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@2920 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
index 4b35535..26450fc 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
@@ -3,10 +3,8 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-hdfs-0.20.2</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- <name>hyracks-hdfs-0.20.2</name>
+ <artifactId>hyracks-hdfs-0.20.2</artifactId>
+ <name>hyracks-hdfs-0.20.2</name>
<parent>
<artifactId>hyracks-hdfs</artifactId>
<groupId>edu.uci.ics.hyracks</groupId>
@@ -38,4 +36,28 @@
</plugin>
</plugins>
</build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-test</artifactId>
+ <version>0.20.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
</project>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
new file mode 100644
index 0000000..a2b16c6
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
@@ -0,0 +1,25 @@
+package edu.uci.ics.hyracks.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * The wrapper to generate TaskTattemptContext
+ */
+public class ContextFactory {
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public TaskAttemptContext createContext(Configuration conf, InputSplit split) throws HyracksDataException {
+ try {
+ return new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null, split);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
index 62ef73a..c218702 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
@@ -3,10 +3,8 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-hdfs-0.23.1</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- <name>hyracks-hdfs-0.20.2</name>
+ <artifactId>hyracks-hdfs-0.23.1</artifactId>
+ <name>hyracks-hdfs-0.20.2</name>
<parent>
<artifactId>hyracks-hdfs</artifactId>
<groupId>edu.uci.ics.hyracks</groupId>
@@ -37,4 +35,42 @@
</plugin>
</plugins>
</build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>0.22.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapred</artifactId>
+ <version>0.22.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>0.22.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-test</artifactId>
+ <version>0.22.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
</project>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
new file mode 100644
index 0000000..60ae5d3
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.hyracks.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * The wrapper to generate TaskTattemptContext
+ */
+public class ContextFactory {
+
+ public TaskAttemptContext createContext(Configuration conf, InputSplit split) throws HyracksDataException {
+ try {
+ return new TaskAttemptContextImpl(conf, new TaskAttemptID());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
index c9aa1b9..fa7c7bb 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
@@ -4,7 +4,7 @@
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<artifactId>hyracks-hdfs-core</artifactId>
- <name>hyracks-hdfs-core</name>
+ <name>hyracks-hdfs-core</name>
<parent>
<artifactId>hyracks-hdfs</artifactId>
<groupId>edu.uci.ics.hyracks</groupId>
@@ -63,16 +63,9 @@
</activation>
<dependencies>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-test</artifactId>
- <version>0.20.2</version>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-0.20.2</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
@@ -85,30 +78,9 @@
<id>hadoop-0.22.0</id>
<dependencies>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>0.22.0</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapred</artifactId>
- <version>0.22.0</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>0.22.0</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs-test</artifactId>
- <version>0.22.0</version>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-0.23.1</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 19f795d..90f5603 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -22,10 +22,8 @@
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.ReflectionUtils;
@@ -38,12 +36,13 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
/**
- * The HDFS file read operator using the Hadoop new API.
- * To use this operator, a user need to provide an IKeyValueParserFactory implementation which convert
+ * The HDFS file read operator using the Hadoop new API. To use this operator, a
+ * user need to provide an IKeyValueParserFactory implementation which convert
* key-value pairs into tuples.
*/
@SuppressWarnings("rawtypes")
@@ -64,12 +63,16 @@
* @param rd
* the output record descriptor
* @param conf
- * the Hadoop JobConf object, which contains the input format and the input paths
+ * the Hadoop JobConf object, which contains the input format and
+ * the input paths
* @param splits
* the array of FileSplits (HDFS chunks).
* @param scheduledLocations
- * the node controller names to scan the FileSplits, which is an one-to-one mapping. The String array
- * is obtained from the edu.cui.ics.hyracks.hdfs.scheduler.Scheduler.getLocationConstraints(InputSplits[]).
+ * the node controller names to scan the FileSplits, which is an
+ * one-to-one mapping. The String array is obtained from the
+ * edu.cui
+ * .ics.hyracks.hdfs.scheduler.Scheduler.getLocationConstraints
+ * (InputSplits[]).
* @param tupleParserFactory
* the ITupleParserFactory implementation instance.
* @throws HyracksException
@@ -102,6 +105,7 @@
return new AbstractUnaryOutputSourceOperatorNodePushable() {
private String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+ private ContextFactory ctxFactory = new ContextFactory();
@SuppressWarnings("unchecked")
@Override
@@ -121,8 +125,8 @@
*/
if (scheduledLocations[i].equals(nodeName)) {
/**
- * pick an unread split to read
- * synchronize among simultaneous partitions in the same machine
+ * pick an unread split to read synchronize among
+ * simultaneous partitions in the same machine
*/
synchronized (executed) {
if (executed[i] == false) {
@@ -135,8 +139,8 @@
/**
* read the split
*/
- Context context = new Mapper().new Context(job.getConfiguration(), new TaskAttemptID(),
- null, null, null, null, inputSplits.get(i));
+ TaskAttemptContext context = ctxFactory.createContext(job.getConfiguration(),
+ inputSplits.get(i));
RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
reader.initialize(inputSplits.get(i), context);
while (reader.nextKeyValue() == true) {
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
index 508ba07..2e0b655 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
@@ -32,8 +32,6 @@
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -141,7 +139,7 @@
Scheduler scheduler = new Scheduler(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
InputFormat inputFormat = ReflectionUtils.newInstance(conf.getInputFormatClass(), conf.getConfiguration());
- List<InputSplit> splits = inputFormat.getSplits(new JobContext(conf.getConfiguration(), new JobID()));
+ List<InputSplit> splits = inputFormat.getSplits(conf);
String[] readSchedule = scheduler.getLocationConstraints(splits);
JobSpecification jobSpec = new JobSpecification();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index b7f9e3d..a8cd3db 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -26,7 +26,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -66,7 +66,7 @@
/** List of incoming messages from the previous superstep */
private final List<M> msgList = new ArrayList<M>();
/** map context */
- private static Mapper.Context context = null;
+ private static TaskAttemptContext context = null;
/** a delegate for hyracks stuff */
private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
/** this vertex is updated or not */
@@ -444,8 +444,10 @@
/**
* Add a new vertex into the graph
*
- * @param vertexId the vertex id
- * @param vertex the vertex
+ * @param vertexId
+ * the vertex id
+ * @param vertex
+ * the vertex
*/
public final void addVertex(I vertexId, V vertex) {
delegate.addVertex(vertexId, vertex);
@@ -454,7 +456,8 @@
/**
* Delete a vertex from id
*
- * @param vertexId the vertex id
+ * @param vertexId
+ * the vertex id
*/
public final void deleteVertex(I vertexId) {
delegate.deleteVertex(vertexId);
@@ -528,7 +531,7 @@
/**
* Pregelix internal use only
*/
- public static final Mapper<?, ?, ?, ?>.Context getContext() {
+ public static final TaskAttemptContext getContext() {
return context;
}
@@ -537,7 +540,7 @@
*
* @param context
*/
- public static final void setContext(Mapper<?, ?, ?, ?>.Context context) {
+ public static final void setContext(TaskAttemptContext context) {
Vertex.context = context;
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index ee16c08..a38b19e 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -25,8 +25,6 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -43,6 +41,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
import edu.uci.ics.hyracks.hdfs2.dataflow.FileSplitsFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
@@ -84,6 +83,7 @@
return new AbstractUnaryOutputSourceOperatorNodePushable() {
private ClassLoader ctxCL;
+ private ContextFactory ctxFactory = new ContextFactory();
@Override
public void initialize() throws HyracksDataException {
@@ -146,8 +146,7 @@
/**
* set context
*/
- Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
- splits.get(splitId));
+ TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splits.get(splitId));
Vertex.setContext(mapperContext);
/**
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index f7d0018..d968262 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -15,12 +15,11 @@
package edu.uci.ics.pregelix.runtime.touchpoint;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
@@ -40,14 +39,13 @@
public IRuntimeHook createRuntimeHook() {
return new IRuntimeHook() {
+ private ContextFactory ctxFactory = new ContextFactory();
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
Configuration conf = confFactory.createConfiguration();
try {
- Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
- null);
+ TaskAttemptContext mapperContext = ctxFactory.createContext(conf, null);
Vertex.setContext(mapperContext);
BspUtils.setDefaultConfiguration(conf);
} catch (Exception e) {