add the support for CDH-4.1 and CDH-4.2
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_release_cleanup@3051 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
index 541f54e..9092655 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
@@ -64,6 +64,10 @@
<profile>
<activation>
<activeByDefault>false</activeByDefault>
+ <property>
+ <name>hadoop</name>
+ <value>1.0.4</value>
+ </property>
</activation>
<id>hadoop-1.0.4</id>
<dependencies>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
index a2b16c6..16ce76b 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
@@ -1,7 +1,8 @@
package edu.uci.ics.hyracks.hdfs;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -14,12 +15,25 @@
public class ContextFactory {
@SuppressWarnings({ "unchecked", "rawtypes" })
- public TaskAttemptContext createContext(Configuration conf, InputSplit split) throws HyracksDataException {
+ public TaskAttemptContext createContext(Configuration conf, TaskAttemptID tid) throws HyracksDataException {
try {
- return new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null, split);
+ return new Mapper().new Context(conf, tid, null, null, null, null, null);
} catch (Exception e) {
throw new HyracksDataException(e);
}
}
+ public TaskAttemptContext createContext(Configuration conf, int partition) throws HyracksDataException {
+ try {
+ TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
+ return new TaskAttemptContext(conf, tid);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public JobContext createJobContext(Configuration conf) {
+ return new JobContext(conf, new JobID("0", 0));
+ }
+
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
index a848ef9..8b7ecf0 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
@@ -41,6 +41,10 @@
<profile>
<activation>
<activeByDefault>true</activeByDefault>
+ <property>
+ <name>hadoop</name>
+ <value>0.23.1</value>
+ </property>
</activation>
<id>hadoop-0.23.1</id>
<dependencies>
@@ -78,6 +82,10 @@
<id>hadoop-0.23.6</id>
<activation>
<activeByDefault>false</activeByDefault>
+ <property>
+ <name>hadoop</name>
+ <value>0.23.6</value>
+ </property>
</activation>
<dependencies>
<dependency>
@@ -110,6 +118,86 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ <property>
+ <name>hadoop</name>
+ <value>cdh-4.2</value>
+ </property>
+ </activation>
+ <id>cdh-4.2</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.0.0-cdh4.2.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>2.0.0-cdh4.2.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>2.0.0-cdh4.2.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>2.0.0-cdh4.2.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ <property>
+ <name>hadoop</name>
+ <value>cdh-4.1</value>
+ </property>
+ </activation>
+ <id>cdh-4.1</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.0.0-cdh4.1.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>2.0.0-cdh4.1.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>2.0.0-cdh4.1.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>2.0.0-cdh4.1.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ </profile>
</profiles>
<dependencies>
@@ -121,4 +209,11 @@
<scope>compile</scope>
</dependency>
</dependencies>
+
+ <repositories>
+ <repository>
+ <id>cloudera</id>
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+ </repository>
+ </repositories>
</project>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
index 60ae5d3..ddcce64 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ContextFactory.java
@@ -1,9 +1,12 @@
package edu.uci.ics.hyracks.hdfs;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -13,12 +16,25 @@
*/
public class ContextFactory {
- public TaskAttemptContext createContext(Configuration conf, InputSplit split) throws HyracksDataException {
+ public TaskAttemptContext createContext(Configuration conf, TaskAttemptID tid) throws HyracksDataException {
try {
- return new TaskAttemptContextImpl(conf, new TaskAttemptID());
+ return new TaskAttemptContextImpl(conf, tid);
} catch (Exception e) {
throw new HyracksDataException(e);
}
}
+ public TaskAttemptContext createContext(Configuration conf, int partition) throws HyracksDataException {
+ try {
+ TaskAttemptID tid = new TaskAttemptID("", 0, TaskType.REDUCE, partition, 0);
+ return new TaskAttemptContextImpl(conf, tid);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public JobContext createJobContext(Configuration conf) {
+ return new JobContextImpl(conf, new JobID("0", 0));
+ }
+
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
index 5887601..a28c698a 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
@@ -76,6 +76,10 @@
<profile>
<activation>
<activeByDefault>false</activeByDefault>
+ <property>
+ <name>hadoop</name>
+ <value>1.0.4</value>
+ </property>
</activation>
<id>hadoop-1.0.4</id>
<dependencies>
@@ -91,6 +95,10 @@
<profile>
<activation>
<activeByDefault>false</activeByDefault>
+ <property>
+ <name>hadoop</name>
+ <value>0.23.1</value>
+ </property>
</activation>
<id>hadoop-0.23.1</id>
<dependencies>
@@ -106,6 +114,10 @@
<profile>
<activation>
<activeByDefault>false</activeByDefault>
+ <property>
+ <name>hadoop</name>
+ <value>0.23.6</value>
+ </property>
</activation>
<id>hadoop-0.23.6</id>
<dependencies>
@@ -118,6 +130,44 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ <property>
+ <name>hadoop</name>
+ <value>cdh-4.1</value>
+ </property>
+ </activation>
+ <id>cdh-4.1</id>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-0.23.1</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ <property>
+ <name>hadoop</name>
+ <value>cdh-4.2</value>
+ </property>
+ </activation>
+ <id>cdh-4.2</id>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-0.23.1</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ </profile>
</profiles>
<dependencies>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 90f5603..eb7b6e1 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -139,8 +139,7 @@
/**
* read the split
*/
- TaskAttemptContext context = ctxFactory.createContext(job.getConfiguration(),
- inputSplits.get(i));
+ TaskAttemptContext context = ctxFactory.createContext(job.getConfiguration(), i);
RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
reader.initialize(inputSplits.get(i), context);
while (reader.nextKeyValue() == true) {
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index 0133d761..2402748 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.pregelix.dataflow;
+import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -23,9 +25,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -37,6 +37,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -69,6 +70,7 @@
private TaskAttemptContext context;
private String TEMP_DIR = "_temporary";
private ClassLoader ctxCL;
+ private ContextFactory ctxFactory = new ContextFactory();
@Override
public void open() throws HyracksDataException {
@@ -80,8 +82,7 @@
conf = confFactory.createConfiguration();
VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
- TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
- context = new TaskAttemptContext(conf, tid);
+ context = ctxFactory.createContext(conf, partition);
try {
vertexWriter = outputFormat.createVertexWriter(context);
} catch (InterruptedException e) {
@@ -127,31 +128,26 @@
private void moveFilesToFinalPath() throws HyracksDataException {
try {
- JobContext job = new JobContext(conf, new JobID("0", 0));
+ JobContext job = ctxFactory.createJobContext(conf);
Path outputPath = FileOutputFormat.getOutputPath(job);
FileSystem dfs = FileSystem.get(conf);
Path filePath = new Path(outputPath, "part-" + new Integer(partition).toString());
- FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
- @Override
- public boolean accept(Path dir) {
- return dir.getName().endsWith(TEMP_DIR);
- }
- });
- Path tempDir = tempPaths[0].getPath();
- FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
- @Override
- public boolean accept(Path dir) {
- return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
- }
- });
- Path srcDir = results[0].getPath();
- if (!dfs.exists(srcDir))
- throw new HyracksDataException("file " + srcDir.toString() + " does not exist!");
-
- FileStatus[] srcFiles = dfs.listStatus(srcDir);
- Path srcFile = srcFiles[0].getPath();
- dfs.delete(filePath, true);
- dfs.rename(srcFile, filePath);
+ FileStatus[] results = findPartitionPaths(outputPath, dfs);
+ if (results.length >= 1) {
+ /**
+ * for Hadoop-0.20.2
+ */
+ renameFile(dfs, filePath, results);
+ } else {
+ /**
+ * for Hadoop-0.23.1
+ */
+ int jobId = job.getJobID().getId();
+ outputPath = new Path(outputPath.toString() + File.separator + TEMP_DIR + File.separator
+ + jobId);
+ results = findPartitionPaths(outputPath, dfs);
+ renameFile(dfs, filePath, results);
+ }
} catch (IOException e) {
throw new HyracksDataException(e);
} finally {
@@ -159,6 +155,36 @@
}
}
+ private FileStatus[] findPartitionPaths(Path outputPath, FileSystem dfs) throws FileNotFoundException,
+ IOException {
+ FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
+ @Override
+ public boolean accept(Path dir) {
+ return dir.getName().endsWith(TEMP_DIR);
+ }
+ });
+ Path tempDir = tempPaths[0].getPath();
+ FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
+ @Override
+ public boolean accept(Path dir) {
+ return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
+ }
+ });
+ return results;
+ }
+
+ private void renameFile(FileSystem dfs, Path filePath, FileStatus[] results) throws IOException,
+ HyracksDataException, FileNotFoundException {
+ Path srcDir = results[0].getPath();
+ if (!dfs.exists(srcDir))
+ throw new HyracksDataException("file " + srcDir.toString() + " does not exist!");
+
+ FileStatus[] srcFiles = dfs.listStatus(srcDir);
+ Path srcFile = srcFiles[0].getPath();
+ dfs.delete(filePath, true);
+ dfs.rename(srcFile, filePath);
+ }
+
};
}
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index a38b19e..0da7baf 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -26,7 +26,6 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -134,11 +133,11 @@
appender.reset(frame, true);
VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
- TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
InputSplit split = splits.get(splitId);
+ TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splitId);
- VertexReader vertexReader = vertexInputFormat.createVertexReader(split, context);
- vertexReader.initialize(split, context);
+ VertexReader vertexReader = vertexInputFormat.createVertexReader(split, mapperContext);
+ vertexReader.initialize(split, mapperContext);
Vertex readerVertex = (Vertex) BspUtils.createVertex(conf);
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldSize);
DataOutput dos = tb.getDataOutput();
@@ -146,7 +145,6 @@
/**
* set context
*/
- TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splits.get(splitId));
Vertex.setContext(mapperContext);
/**
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index d968262..c025f85 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -16,6 +16,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -45,7 +46,7 @@
public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
Configuration conf = confFactory.createConfiguration();
try {
- TaskAttemptContext mapperContext = ctxFactory.createContext(conf, null);
+ TaskAttemptContext mapperContext = ctxFactory.createContext(conf, new TaskAttemptID());
Vertex.setContext(mapperContext);
BspUtils.setDefaultConfiguration(conf);
} catch (Exception e) {