Merged fullstack_asterix_stabilization -r 2933:3157
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_ioc@3164 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
index 77a76aa..2d0859b 100644
--- a/pregelix/pregelix-dataflow/pom.xml
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -23,8 +23,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
</configuration>
</plugin>
<plugin>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
index d29afca..ae47ed8 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -15,25 +15,44 @@
package edu.uci.ics.pregelix.dataflow;
+import org.apache.commons.lang3.tuple.Pair;
+
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedBlockingConnectorPolicy;
import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
private static final long serialVersionUID = 1L;
- private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+ private IConnectorPolicy senderSideMatPipPolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+ private IConnectorPolicy senderSideMatBlkPolicy = new SendSideMaterializedBlockingConnectorPolicy();
private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+ private JobSpecification spec;
+
+ public ConnectorPolicyAssignmentPolicy(JobSpecification spec) {
+ this.spec = spec;
+ }
@Override
public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
int[] fanouts) {
if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
- return senderSideMaterializePolicy;
+ return senderSideMatPipPolicy;
} else {
- return pipeliningPolicy;
+ Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> endPoints = spec
+ .getConnectorOperatorMap().get(c.getConnectorId());
+ IOperatorDescriptor consumer = endPoints.getRight().getLeft();
+ if (consumer instanceof TreeIndexInsertUpdateDeleteOperatorDescriptor) {
+ return senderSideMatBlkPolicy;
+ } else {
+ return pipeliningPolicy;
+ }
}
}
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index 0133d761..2402748 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.pregelix.dataflow;
+import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -23,9 +25,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -37,6 +37,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -69,6 +70,7 @@
private TaskAttemptContext context;
private String TEMP_DIR = "_temporary";
private ClassLoader ctxCL;
+ private ContextFactory ctxFactory = new ContextFactory();
@Override
public void open() throws HyracksDataException {
@@ -80,8 +82,7 @@
conf = confFactory.createConfiguration();
VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
- TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
- context = new TaskAttemptContext(conf, tid);
+ context = ctxFactory.createContext(conf, partition);
try {
vertexWriter = outputFormat.createVertexWriter(context);
} catch (InterruptedException e) {
@@ -127,31 +128,26 @@
private void moveFilesToFinalPath() throws HyracksDataException {
try {
- JobContext job = new JobContext(conf, new JobID("0", 0));
+ JobContext job = ctxFactory.createJobContext(conf);
Path outputPath = FileOutputFormat.getOutputPath(job);
FileSystem dfs = FileSystem.get(conf);
Path filePath = new Path(outputPath, "part-" + new Integer(partition).toString());
- FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
- @Override
- public boolean accept(Path dir) {
- return dir.getName().endsWith(TEMP_DIR);
- }
- });
- Path tempDir = tempPaths[0].getPath();
- FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
- @Override
- public boolean accept(Path dir) {
- return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
- }
- });
- Path srcDir = results[0].getPath();
- if (!dfs.exists(srcDir))
- throw new HyracksDataException("file " + srcDir.toString() + " does not exist!");
-
- FileStatus[] srcFiles = dfs.listStatus(srcDir);
- Path srcFile = srcFiles[0].getPath();
- dfs.delete(filePath, true);
- dfs.rename(srcFile, filePath);
+ FileStatus[] results = findPartitionPaths(outputPath, dfs);
+ if (results.length >= 1) {
+ /**
+ * for Hadoop-0.20.2
+ */
+ renameFile(dfs, filePath, results);
+ } else {
+ /**
+ * for Hadoop-0.23.1
+ */
+ int jobId = job.getJobID().getId();
+ outputPath = new Path(outputPath.toString() + File.separator + TEMP_DIR + File.separator
+ + jobId);
+ results = findPartitionPaths(outputPath, dfs);
+ renameFile(dfs, filePath, results);
+ }
} catch (IOException e) {
throw new HyracksDataException(e);
} finally {
@@ -159,6 +155,36 @@
}
}
+ private FileStatus[] findPartitionPaths(Path outputPath, FileSystem dfs) throws FileNotFoundException,
+ IOException {
+ FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
+ @Override
+ public boolean accept(Path dir) {
+ return dir.getName().endsWith(TEMP_DIR);
+ }
+ });
+ Path tempDir = tempPaths[0].getPath();
+ FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
+ @Override
+ public boolean accept(Path dir) {
+ return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
+ }
+ });
+ return results;
+ }
+
+ private void renameFile(FileSystem dfs, Path filePath, FileStatus[] results) throws IOException,
+ HyracksDataException, FileNotFoundException {
+ Path srcDir = results[0].getPath();
+ if (!dfs.exists(srcDir))
+ throw new HyracksDataException("file " + srcDir.toString() + " does not exist!");
+
+ FileStatus[] srcFiles = dfs.listStatus(srcDir);
+ Path srcFile = srcFiles[0].getPath();
+ dfs.delete(filePath, true);
+ dfs.rename(srcFile, filePath);
+ }
+
};
}
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index a38b19e..0da7baf 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -26,7 +26,6 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -134,11 +133,11 @@
appender.reset(frame, true);
VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
- TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
InputSplit split = splits.get(splitId);
+ TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splitId);
- VertexReader vertexReader = vertexInputFormat.createVertexReader(split, context);
- vertexReader.initialize(split, context);
+ VertexReader vertexReader = vertexInputFormat.createVertexReader(split, mapperContext);
+ vertexReader.initialize(split, mapperContext);
Vertex readerVertex = (Vertex) BspUtils.createVertex(conf);
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldSize);
DataOutput dos = tb.getDataOutput();
@@ -146,7 +145,6 @@
/**
* set context
*/
- TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splits.get(splitId));
Vertex.setContext(mapperContext);
/**