add the support for CDH-4.1 and CDH-4.2
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_release_cleanup@3051 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index 0133d76..2402748 100644
--- a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.pregelix.dataflow;
+import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -23,9 +25,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -37,6 +37,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -69,6 +70,7 @@
private TaskAttemptContext context;
private String TEMP_DIR = "_temporary";
private ClassLoader ctxCL;
+ private ContextFactory ctxFactory = new ContextFactory();
@Override
public void open() throws HyracksDataException {
@@ -80,8 +82,7 @@
conf = confFactory.createConfiguration();
VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
- TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
- context = new TaskAttemptContext(conf, tid);
+ context = ctxFactory.createContext(conf, partition);
try {
vertexWriter = outputFormat.createVertexWriter(context);
} catch (InterruptedException e) {
@@ -127,31 +128,26 @@
private void moveFilesToFinalPath() throws HyracksDataException {
try {
- JobContext job = new JobContext(conf, new JobID("0", 0));
+ JobContext job = ctxFactory.createJobContext(conf);
Path outputPath = FileOutputFormat.getOutputPath(job);
FileSystem dfs = FileSystem.get(conf);
Path filePath = new Path(outputPath, "part-" + new Integer(partition).toString());
- FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
- @Override
- public boolean accept(Path dir) {
- return dir.getName().endsWith(TEMP_DIR);
- }
- });
- Path tempDir = tempPaths[0].getPath();
- FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
- @Override
- public boolean accept(Path dir) {
- return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
- }
- });
- Path srcDir = results[0].getPath();
- if (!dfs.exists(srcDir))
- throw new HyracksDataException("file " + srcDir.toString() + " does not exist!");
-
- FileStatus[] srcFiles = dfs.listStatus(srcDir);
- Path srcFile = srcFiles[0].getPath();
- dfs.delete(filePath, true);
- dfs.rename(srcFile, filePath);
+ FileStatus[] results = findPartitionPaths(outputPath, dfs);
+ if (results.length >= 1) {
+ /**
+ * for Hadoop-0.20.2
+ */
+ renameFile(dfs, filePath, results);
+ } else {
+ /**
+ * for Hadoop-0.23.1
+ */
+ int jobId = job.getJobID().getId();
+ outputPath = new Path(outputPath.toString() + File.separator + TEMP_DIR + File.separator
+ + jobId);
+ results = findPartitionPaths(outputPath, dfs);
+ renameFile(dfs, filePath, results);
+ }
} catch (IOException e) {
throw new HyracksDataException(e);
} finally {
@@ -159,6 +155,36 @@
}
}
+ private FileStatus[] findPartitionPaths(Path outputPath, FileSystem dfs) throws FileNotFoundException,
+ IOException {
+ FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
+ @Override
+ public boolean accept(Path dir) {
+ return dir.getName().endsWith(TEMP_DIR);
+ }
+ });
+ Path tempDir = tempPaths[0].getPath();
+ FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
+ @Override
+ public boolean accept(Path dir) {
+ return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
+ }
+ });
+ return results;
+ }
+
+ private void renameFile(FileSystem dfs, Path filePath, FileStatus[] results) throws IOException,
+ HyracksDataException, FileNotFoundException {
+ Path srcDir = results[0].getPath();
+ if (!dfs.exists(srcDir))
+ throw new HyracksDataException("file " + srcDir.toString() + " does not exist!");
+
+ FileStatus[] srcFiles = dfs.listStatus(srcDir);
+ Path srcFile = srcFiles[0].getPath();
+ dfs.delete(filePath, true);
+ dfs.rename(srcFile, filePath);
+ }
+
};
}
}
diff --git a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index a38b19e..0da7baf 100644
--- a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -26,7 +26,6 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -134,11 +133,11 @@
appender.reset(frame, true);
VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
- TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
InputSplit split = splits.get(splitId);
+ TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splitId);
- VertexReader vertexReader = vertexInputFormat.createVertexReader(split, context);
- vertexReader.initialize(split, context);
+ VertexReader vertexReader = vertexInputFormat.createVertexReader(split, mapperContext);
+ vertexReader.initialize(split, mapperContext);
Vertex readerVertex = (Vertex) BspUtils.createVertex(conf);
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldSize);
DataOutput dos = tb.getDataOutput();
@@ -146,7 +145,6 @@
/**
* set context
*/
- TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splits.get(splitId));
Vertex.setContext(mapperContext);
/**
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index d968262..c025f85 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -16,6 +16,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -45,7 +46,7 @@
public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
Configuration conf = confFactory.createConfiguration();
try {
- TaskAttemptContext mapperContext = ctxFactory.createContext(conf, null);
+ TaskAttemptContext mapperContext = ctxFactory.createContext(conf, new TaskAttemptID());
Vertex.setContext(mapperContext);
BspUtils.setDefaultConfiguration(conf);
} catch (Exception e) {