cross merge fullstack_release_candidate into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@3208 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/pregelix/pregelix-dataflow/pom.xml b/fullstack/pregelix/pregelix-dataflow/pom.xml
index aaa7186..d3f396e 100644
--- a/fullstack/pregelix/pregelix-dataflow/pom.xml
+++ b/fullstack/pregelix/pregelix-dataflow/pom.xml
@@ -1,14 +1,15 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>pregelix-dataflow</artifactId>
 	<packaging>jar</packaging>
 	<name>pregelix-dataflow</name>
 
 	<parent>
-    		<groupId>edu.uci.ics.hyracks</groupId>
-    		<artifactId>pregelix</artifactId>
-    		<version>0.2.3-SNAPSHOT</version>
-  	</parent>
+		<groupId>edu.uci.ics.hyracks</groupId>
+		<artifactId>pregelix</artifactId>
+		<version>0.2.3-SNAPSHOT</version>
+	</parent>
 
 
 	<properties>
@@ -22,8 +23,9 @@
 				<artifactId>maven-compiler-plugin</artifactId>
 				<version>2.0.2</version>
 				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
+					<source>1.7</source>
+					<target>1.7</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
@@ -42,6 +44,7 @@
 			</plugin>
 			<plugin>
 				<artifactId>maven-clean-plugin</artifactId>
+				<version>2.5</version>
 				<configuration>
 					<filesets>
 						<fileset>
@@ -102,13 +105,6 @@
 			<version>0.2.3-SNAPSHOT</version>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-core</artifactId>
-			<version>0.20.2</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-common</artifactId>
 			<version>0.2.3-SNAPSHOT</version>
diff --git a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
index d29afca..ae47ed8 100644
--- a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -15,25 +15,44 @@
 
 package edu.uci.ics.pregelix.dataflow;
 
+import org.apache.commons.lang3.tuple.Pair;
+
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedBlockingConnectorPolicy;
 import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 
 public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
     private static final long serialVersionUID = 1L;
-    private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+    private IConnectorPolicy senderSideMatPipPolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+    private IConnectorPolicy senderSideMatBlkPolicy = new SendSideMaterializedBlockingConnectorPolicy();
     private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+    private JobSpecification spec;
+
+    public ConnectorPolicyAssignmentPolicy(JobSpecification spec) {
+        this.spec = spec;
+    }
 
     @Override
     public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
             int[] fanouts) {
         if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
-            return senderSideMaterializePolicy;
+            return senderSideMatPipPolicy;
         } else {
-            return pipeliningPolicy;
+            Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> endPoints = spec
+                    .getConnectorOperatorMap().get(c.getConnectorId());
+            IOperatorDescriptor consumer = endPoints.getRight().getLeft();
+            if (consumer instanceof TreeIndexInsertUpdateDeleteOperatorDescriptor) {
+                return senderSideMatBlkPolicy;
+            } else {
+                return pipeliningPolicy;
+            }
         }
     }
 }
diff --git a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index c25e4c6..2402748 100644
--- a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.pregelix.dataflow;
 
+import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -23,9 +25,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -37,6 +37,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
 import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -64,21 +65,24 @@
         return new AbstractUnaryInputSinkOperatorNodePushable() {
             private RecordDescriptor rd0;
             private FrameDeserializer frameDeserializer;
-            private Configuration conf = confFactory.createConfiguration();
+            private Configuration conf;
             private VertexWriter vertexWriter;
             private TaskAttemptContext context;
             private String TEMP_DIR = "_temporary";
+            private ClassLoader ctxCL;
+            private ContextFactory ctxFactory = new ContextFactory();
 
             @Override
             public void open() throws HyracksDataException {
                 rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
                         : inputRdFactory.createRecordDescriptor();
                 frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+                ctxCL = Thread.currentThread().getContextClassLoader();
                 Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                conf = confFactory.createConfiguration();
 
                 VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
-                TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
-                context = new TaskAttemptContext(conf, tid);
+                context = ctxFactory.createContext(conf, partition);
                 try {
                     vertexWriter = outputFormat.createVertexWriter(context);
                 } catch (InterruptedException e) {
@@ -107,7 +111,7 @@
 
             @Override
             public void fail() throws HyracksDataException {
-
+                Thread.currentThread().setContextClassLoader(ctxCL);
             }
 
             @Override
@@ -124,36 +128,63 @@
 
             private void moveFilesToFinalPath() throws HyracksDataException {
                 try {
-                    JobContext job = new JobContext(conf, new JobID("0", 0));
+                    JobContext job = ctxFactory.createJobContext(conf);
                     Path outputPath = FileOutputFormat.getOutputPath(job);
                     FileSystem dfs = FileSystem.get(conf);
                     Path filePath = new Path(outputPath, "part-" + new Integer(partition).toString());
-                    FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
-                        @Override
-                        public boolean accept(Path dir) {
-                            return dir.getName().endsWith(TEMP_DIR);
-                        }
-                    });
-                    Path tempDir = tempPaths[0].getPath();
-                    FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
-                        @Override
-                        public boolean accept(Path dir) {
-                            return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
-                        }
-                    });
-                    Path srcDir = results[0].getPath();
-                    if (!dfs.exists(srcDir))
-                        throw new HyracksDataException("file " + srcDir.toString() + " does not exist!");
-
-                    FileStatus[] srcFiles = dfs.listStatus(srcDir);
-                    Path srcFile = srcFiles[0].getPath();
-                    dfs.delete(filePath, true);
-                    dfs.rename(srcFile, filePath);
+                    FileStatus[] results = findPartitionPaths(outputPath, dfs);
+                    if (results.length >= 1) {
+                        /**
+                         * for Hadoop-0.20.2
+                         */
+                        renameFile(dfs, filePath, results);
+                    } else {
+                        /**
+                         * for Hadoop-0.23.1
+                         */
+                        int jobId = job.getJobID().getId();
+                        outputPath = new Path(outputPath.toString() + File.separator + TEMP_DIR + File.separator
+                                + jobId);
+                        results = findPartitionPaths(outputPath, dfs);
+                        renameFile(dfs, filePath, results);
+                    }
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
 
+            private FileStatus[] findPartitionPaths(Path outputPath, FileSystem dfs) throws FileNotFoundException,
+                    IOException {
+                FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
+                    @Override
+                    public boolean accept(Path dir) {
+                        return dir.getName().endsWith(TEMP_DIR);
+                    }
+                });
+                Path tempDir = tempPaths[0].getPath();
+                FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
+                    @Override
+                    public boolean accept(Path dir) {
+                        return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
+                    }
+                });
+                return results;
+            }
+
+            private void renameFile(FileSystem dfs, Path filePath, FileStatus[] results) throws IOException,
+                    HyracksDataException, FileNotFoundException {
+                Path srcDir = results[0].getPath();
+                if (!dfs.exists(srcDir))
+                    throw new HyracksDataException("file " + srcDir.toString() + " does not exist!");
+
+                FileStatus[] srcFiles = dfs.listStatus(srcDir);
+                Path srcFile = srcFiles[0].getPath();
+                dfs.delete(filePath, true);
+                dfs.rename(srcFile, filePath);
+            }
+
         };
     }
 }
diff --git a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index f1b98f5..0da7baf 100644
--- a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -17,17 +17,15 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
-import java.util.logging.Logger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -42,6 +40,8 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.FileSplitsFactory;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexReader;
@@ -50,38 +50,67 @@
 
 @SuppressWarnings("rawtypes")
 public class VertexFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-    private static final Logger LOGGER = Logger.getLogger(VertexFileScanOperatorDescriptor.class.getName());
     private static final long serialVersionUID = 1L;
-    private final List<InputSplit> splits;
+    private final FileSplitsFactory splitsFactory;
     private final IConfigurationFactory confFactory;
     private final int fieldSize = 2;
+    private final String[] scheduledLocations;
+    private final boolean[] executed;
 
     /**
      * @param spec
      */
     public VertexFileScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, List<InputSplit> splits,
-            IConfigurationFactory confFactory) throws HyracksException {
+            String[] scheduledLocations, IConfigurationFactory confFactory) throws HyracksException {
         super(spec, 0, 1);
-        this.splits = splits;
+        List<FileSplit> fileSplits = new ArrayList<FileSplit>();
+        for (int i = 0; i < splits.size(); i++) {
+            fileSplits.add((FileSplit) splits.get(i));
+        }
+        this.splitsFactory = new FileSplitsFactory(fileSplits);
         this.confFactory = confFactory;
+        this.scheduledLocations = scheduledLocations;
+        this.executed = new boolean[scheduledLocations.length];
+        Arrays.fill(executed, false);
         this.recordDescriptors[0] = rd;
     }
 
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
             throws HyracksDataException {
+        final List<FileSplit> splits = splitsFactory.getSplits();
+
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
-            private Configuration conf = confFactory.createConfiguration();
+            private ClassLoader ctxCL;
+            private ContextFactory ctxFactory = new ContextFactory();
 
             @Override
             public void initialize() throws HyracksDataException {
+                ctxCL = Thread.currentThread().getContextClassLoader();
                 try {
                     Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                    Configuration conf = confFactory.createConfiguration();
                     writer.open();
-                    loadVertices(ctx, partition);
+                    for (int i = 0; i < scheduledLocations.length; i++) {
+                        if (scheduledLocations[i].equals(ctx.getJobletContext().getApplicationContext().getNodeId())) {
+                            /**
+                             * pick one from the FileSplit queue
+                             */
+                            synchronized (executed) {
+                                if (!executed[i]) {
+                                    executed[i] = true;
+                                } else {
+                                    continue;
+                                }
+                            }
+                            loadVertices(ctx, conf, i);
+                        }
+                    }
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
 
@@ -96,24 +125,19 @@
              * @throws InterruptedException
              */
             @SuppressWarnings("unchecked")
-            private void loadVertices(final IHyracksTaskContext ctx, int partitionId) throws IOException,
-                    ClassNotFoundException, InterruptedException, InstantiationException, IllegalAccessException {
+            private void loadVertices(final IHyracksTaskContext ctx, Configuration conf, int splitId)
+                    throws IOException, ClassNotFoundException, InterruptedException, InstantiationException,
+                    IllegalAccessException {
                 ByteBuffer frame = ctx.allocateFrame();
                 FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
                 appender.reset(frame, true);
 
                 VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
-                TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
-                InputSplit split = splits.get(partition);
+                InputSplit split = splits.get(splitId);
+                TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splitId);
 
-                if (split instanceof FileSplit) {
-                    FileSplit fileSplit = (FileSplit) split;
-                    LOGGER.info("read file split: " + fileSplit.getPath() + " location:" + fileSplit.getLocations()[0]
-                            + " start:" + fileSplit.getStart() + " length:" + split.getLength() + " partition:"
-                            + partition);
-                }
-                VertexReader vertexReader = vertexInputFormat.createVertexReader(split, context);
-                vertexReader.initialize(split, context);
+                VertexReader vertexReader = vertexInputFormat.createVertexReader(split, mapperContext);
+                vertexReader.initialize(split, mapperContext);
                 Vertex readerVertex = (Vertex) BspUtils.createVertex(conf);
                 ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldSize);
                 DataOutput dos = tb.getDataOutput();
@@ -121,8 +145,6 @@
                 /**
                  * set context
                  */
-                Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
-                        splits.get(partition));
                 Vertex.setContext(mapperContext);
 
                 /**
@@ -166,5 +188,4 @@
             }
         };
     }
-
 }
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
new file mode 100644
index 0000000..d7cbb3a
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.util.StringSerializationUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+
+public class VertexWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private final FileSplit[] splits;
+    private final IRuntimeHookFactory preHookFactory;
+    private final IRuntimeHookFactory postHookFactory;
+    private final IRecordDescriptorFactory inputRdFactory;
+
+    public VertexWriteOperatorDescriptor(JobSpecification spec, IRecordDescriptorFactory inputRdFactory,
+            IFileSplitProvider fileSplitProvider, IRuntimeHookFactory preHookFactory,
+            IRuntimeHookFactory postHookFactory) {
+        super(spec, 1, 0);
+        this.splits = fileSplitProvider.getFileSplits();
+        this.preHookFactory = preHookFactory;
+        this.postHookFactory = postHookFactory;
+        this.inputRdFactory = inputRdFactory;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+            throws HyracksDataException {
+        IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+            private RecordDescriptor rd0;
+            private FrameDeserializer frameDeserializer;
+            private PrintWriter outputWriter;
+
+            @Override
+            public void open() throws HyracksDataException {
+                rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+                        : inputRdFactory.createRecordDescriptor();
+                frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+                try {
+                    outputWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(splits[partition]
+                            .getLocalFile().getFile())));
+                    if (preHookFactory != null)
+                        preHookFactory.createRuntimeHook().configure(ctx);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+                frameDeserializer.reset(frame);
+                while (!frameDeserializer.done()) {
+                    Object[] tuple = frameDeserializer.deserializeRecord();
+                    // output the vertex
+                    outputWriter.print(StringSerializationUtils.toString(tuple[tuple.length - 1]));
+                    outputWriter.println();
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                if (postHookFactory != null)
+                    postHookFactory.createRuntimeHook().configure(ctx);
+                outputWriter.close();
+            }
+
+        };
+        return op;
+    }
+}
diff --git a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 43b6d17..567e220 100644
--- a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -119,23 +119,23 @@
             Vertex.setNumEdges(numEdges);
             giraphJobIdToSuperStep.put(giraphJobId, superStep);
             giraphJobIdToMove.put(giraphJobId, false);
-            LOGGER.info("start iteration " + Vertex.getCurrentSuperstep());
+            LOGGER.info("start iteration " + Vertex.getSuperstep());
         }
         System.gc();
     }
 
     public synchronized void endSuperStep(String giraphJobId) {
         giraphJobIdToMove.put(giraphJobId, true);
-        LOGGER.info("end iteration " + Vertex.getCurrentSuperstep());
+        LOGGER.info("end iteration " + Vertex.getSuperstep());
     }
 
     @Override
     public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
         final FileReference fRef = ioManager.createWorkspaceFile(prefix);
-        List<FileReference> files = iterationToFiles.get(Vertex.getCurrentSuperstep());
+        List<FileReference> files = iterationToFiles.get(Vertex.getSuperstep());
         if (files == null) {
             files = new ArrayList<FileReference>();
-            iterationToFiles.put(Vertex.getCurrentSuperstep(), files);
+            iterationToFiles.put(Vertex.getSuperstep(), files);
         }
         files.add(fRef);
         return fRef;