Merged fullstack_asterix_stabilization -r 2933:3157

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_ioc@3164 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
index 77a76aa..2d0859b 100644
--- a/pregelix/pregelix-dataflow/pom.xml
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -23,8 +23,9 @@
 				<artifactId>maven-compiler-plugin</artifactId>
 				<version>2.0.2</version>
 				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
+					<source>1.7</source>
+					<target>1.7</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
index d29afca..ae47ed8 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -15,25 +15,44 @@
 
 package edu.uci.ics.pregelix.dataflow;
 
+import org.apache.commons.lang3.tuple.Pair;
+
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedBlockingConnectorPolicy;
 import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 
 public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
     private static final long serialVersionUID = 1L;
-    private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+    private IConnectorPolicy senderSideMatPipPolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+    private IConnectorPolicy senderSideMatBlkPolicy = new SendSideMaterializedBlockingConnectorPolicy();
     private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+    private JobSpecification spec;
+
+    public ConnectorPolicyAssignmentPolicy(JobSpecification spec) {
+        this.spec = spec;
+    }
 
     @Override
     public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
             int[] fanouts) {
         if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
-            return senderSideMaterializePolicy;
+            return senderSideMatPipPolicy;
         } else {
-            return pipeliningPolicy;
+            Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> endPoints = spec
+                    .getConnectorOperatorMap().get(c.getConnectorId());
+            IOperatorDescriptor consumer = endPoints.getRight().getLeft();
+            if (consumer instanceof TreeIndexInsertUpdateDeleteOperatorDescriptor) {
+                return senderSideMatBlkPolicy;
+            } else {
+                return pipeliningPolicy;
+            }
         }
     }
 }
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index 0133d761..2402748 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.pregelix.dataflow;
 
+import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -23,9 +25,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -37,6 +37,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
 import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -69,6 +70,7 @@
             private TaskAttemptContext context;
             private String TEMP_DIR = "_temporary";
             private ClassLoader ctxCL;
+            private ContextFactory ctxFactory = new ContextFactory();
 
             @Override
             public void open() throws HyracksDataException {
@@ -80,8 +82,7 @@
                 conf = confFactory.createConfiguration();
 
                 VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
-                TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
-                context = new TaskAttemptContext(conf, tid);
+                context = ctxFactory.createContext(conf, partition);
                 try {
                     vertexWriter = outputFormat.createVertexWriter(context);
                 } catch (InterruptedException e) {
@@ -127,31 +128,26 @@
 
             private void moveFilesToFinalPath() throws HyracksDataException {
                 try {
-                    JobContext job = new JobContext(conf, new JobID("0", 0));
+                    JobContext job = ctxFactory.createJobContext(conf);
                     Path outputPath = FileOutputFormat.getOutputPath(job);
                     FileSystem dfs = FileSystem.get(conf);
                     Path filePath = new Path(outputPath, "part-" + new Integer(partition).toString());
-                    FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
-                        @Override
-                        public boolean accept(Path dir) {
-                            return dir.getName().endsWith(TEMP_DIR);
-                        }
-                    });
-                    Path tempDir = tempPaths[0].getPath();
-                    FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
-                        @Override
-                        public boolean accept(Path dir) {
-                            return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
-                        }
-                    });
-                    Path srcDir = results[0].getPath();
-                    if (!dfs.exists(srcDir))
-                        throw new HyracksDataException("file " + srcDir.toString() + " does not exist!");
-
-                    FileStatus[] srcFiles = dfs.listStatus(srcDir);
-                    Path srcFile = srcFiles[0].getPath();
-                    dfs.delete(filePath, true);
-                    dfs.rename(srcFile, filePath);
+                    FileStatus[] results = findPartitionPaths(outputPath, dfs);
+                    if (results.length >= 1) {
+                        /**
+                         * for Hadoop-0.20.2
+                         */
+                        renameFile(dfs, filePath, results);
+                    } else {
+                        /**
+                         * for Hadoop-0.23.1
+                         */
+                        int jobId = job.getJobID().getId();
+                        outputPath = new Path(outputPath.toString() + File.separator + TEMP_DIR + File.separator
+                                + jobId);
+                        results = findPartitionPaths(outputPath, dfs);
+                        renameFile(dfs, filePath, results);
+                    }
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
                 } finally {
@@ -159,6 +155,36 @@
                 }
             }
 
+            private FileStatus[] findPartitionPaths(Path outputPath, FileSystem dfs) throws FileNotFoundException,
+                    IOException {
+                FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
+                    @Override
+                    public boolean accept(Path dir) {
+                        return dir.getName().endsWith(TEMP_DIR);
+                    }
+                });
+                Path tempDir = tempPaths[0].getPath();
+                FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
+                    @Override
+                    public boolean accept(Path dir) {
+                        return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
+                    }
+                });
+                return results;
+            }
+
+            private void renameFile(FileSystem dfs, Path filePath, FileStatus[] results) throws IOException,
+                    HyracksDataException, FileNotFoundException {
+                Path srcDir = results[0].getPath();
+                if (!dfs.exists(srcDir))
+                    throw new HyracksDataException("file " + srcDir.toString() + " does not exist!");
+
+                FileStatus[] srcFiles = dfs.listStatus(srcDir);
+                Path srcFile = srcFiles[0].getPath();
+                dfs.delete(filePath, true);
+                dfs.rename(srcFile, filePath);
+            }
+
         };
     }
 }
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index a38b19e..0da7baf 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -26,7 +26,6 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -134,11 +133,11 @@
                 appender.reset(frame, true);
 
                 VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
-                TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
                 InputSplit split = splits.get(splitId);
+                TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splitId);
 
-                VertexReader vertexReader = vertexInputFormat.createVertexReader(split, context);
-                vertexReader.initialize(split, context);
+                VertexReader vertexReader = vertexInputFormat.createVertexReader(split, mapperContext);
+                vertexReader.initialize(split, mapperContext);
                 Vertex readerVertex = (Vertex) BspUtils.createVertex(conf);
                 ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldSize);
                 DataOutput dos = tb.getDataOutput();
@@ -146,7 +145,6 @@
                 /**
                  * set context
                  */
-                TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splits.get(splitId));
                 Vertex.setContext(mapperContext);
 
                 /**