add Pregelix codebase

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1960 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
new file mode 100644
index 0000000..0d9b059
--- /dev/null
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -0,0 +1,169 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<artifactId>pregelix-dataflow</artifactId>
+	<packaging>jar</packaging>
+	<name>pregelix-dataflow</name>
+
+	<parent>
+    		<groupId>edu.uci.ics.pregelix</groupId>
+    		<artifactId>pregelix</artifactId>
+    		<version>0.0.1-SNAPSHOT</version>
+  	</parent>
+
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>2.0.2</version>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.7.2</version>
+				<configuration>
+					<forkMode>pertest</forkMode>
+					<argLine>-enableassertions -Xmx2047m -Dfile.encoding=UTF-8
+						-Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
+					<includes>
+						<include>**/*TestSuite.java</include>
+						<include>**/*Test.java</include>
+					</includes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-clean-plugin</artifactId>
+				<configuration>
+					<filesets>
+						<fileset>
+							<directory>.</directory>
+							<includes>
+								<include>teststore*</include>
+								<include>edu*</include>
+								<include>actual*</include>
+								<include>build*</include>
+								<include>expect*</include>
+								<include>ClusterController*</include>
+							</includes>
+						</fileset>
+					</filesets>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+	<dependencies>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.8.1</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.pregelix</groupId>
+			<artifactId>pregelix-api</artifactId>
+			<version>0.0.1-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.pregelix</groupId>
+			<artifactId>pregelix-dataflow-std-base</artifactId>
+			<version>0.0.1-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-api</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-dataflow-common</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-dataflow-hadoop</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-data-std</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-core</artifactId>
+			<version>0.20.2</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-storage-am-common</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-storage-am-btree</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks.examples.btree</groupId>
+			<artifactId>btreehelper</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-control-cc</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-control-nc</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-hadoop-compat</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-ipc</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+</project>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..0b06dcf
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.pregelix.dataflow;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
+    private static final long serialVersionUID = 1L;
+    private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+    private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+
+    @Override
+    public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+            int[] fanouts) {
+        if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
+            return senderSideMaterializePolicy;
+        } else {
+            return pipeliningPolicy;
+        }
+    }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/CountTupleOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/CountTupleOperatorDescriptor.java
new file mode 100644
index 0000000..7ad6aa5
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/CountTupleOperatorDescriptor.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class CountTupleOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public CountTupleOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc) {
+        super(spec, 1, 1);
+        this.recordDescriptors[0] = rDesc;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+            throws HyracksDataException {
+        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+            private final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+            private final FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
+            private int tupleCount = 0;
+
+            @Override
+            public void close() throws HyracksDataException {
+                System.out.println(this.toString() + " tuple count " + tupleCount);
+                writer.close();
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                writer.fail();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+                fta.reset(frame);
+                tupleCount += fta.getTupleCount();
+                writer.nextFrame(frame);
+            }
+
+            @Override
+            public void open() throws HyracksDataException {
+                writer.open();
+            }
+
+        };
+    }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/EmptySinkOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/EmptySinkOperatorDescriptor.java
new file mode 100644
index 0000000..fab7198
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/EmptySinkOperatorDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class EmptySinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public EmptySinkOperatorDescriptor(JobSpecification spec) {
+        super(spec, 1, 0);
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return new AbstractUnaryInputSinkOperatorNodePushable() {
+
+            @Override
+            public void open() throws HyracksDataException {
+
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+
+            }
+
+        };
+    }
+
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/EmptyTupleSourceOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/EmptyTupleSourceOperatorDescriptor.java
new file mode 100644
index 0000000..5276c82
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/EmptyTupleSourceOperatorDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public class EmptyTupleSourceOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public EmptyTupleSourceOperatorDescriptor(JobSpecification spec) {
+        super(spec, 0, 1);
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+            private ByteBuffer frame = ctx.allocateFrame();
+            private ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
+            private FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+
+            @Override
+            public void initialize() throws HyracksDataException {
+                try {
+                    writer.open();
+                    appender.reset(frame, true);
+                    if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                        throw new IllegalStateException();
+                    }
+                    FrameUtils.flushFrame(frame, writer);
+                    writer.close();
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
+
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
new file mode 100644
index 0000000..c25e4c6
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+
+public class HDFSFileWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private final IConfigurationFactory confFactory;
+    private final IRecordDescriptorFactory inputRdFactory;
+
+    public HDFSFileWriteOperatorDescriptor(JobSpecification spec, IConfigurationFactory confFactory,
+            IRecordDescriptorFactory inputRdFactory) {
+        super(spec, 1, 0);
+        this.confFactory = confFactory;
+        this.inputRdFactory = inputRdFactory;
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+            throws HyracksDataException {
+        return new AbstractUnaryInputSinkOperatorNodePushable() {
+            private RecordDescriptor rd0;
+            private FrameDeserializer frameDeserializer;
+            private Configuration conf = confFactory.createConfiguration();
+            private VertexWriter vertexWriter;
+            private TaskAttemptContext context;
+            private String TEMP_DIR = "_temporary";
+
+            @Override
+            public void open() throws HyracksDataException {
+                rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+                        : inputRdFactory.createRecordDescriptor();
+                frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+
+                VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
+                TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
+                context = new TaskAttemptContext(conf, tid);
+                try {
+                    vertexWriter = outputFormat.createVertexWriter(context);
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+                frameDeserializer.reset(frame);
+                try {
+                    while (!frameDeserializer.done()) {
+                        Object[] tuple = frameDeserializer.deserializeRecord();
+                        Vertex value = (Vertex) tuple[1];
+                        vertexWriter.writeVertex(value);
+                    }
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                try {
+                    vertexWriter.close(context);
+                    moveFilesToFinalPath();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            private void moveFilesToFinalPath() throws HyracksDataException {
+                try {
+                    JobContext job = new JobContext(conf, new JobID("0", 0));
+                    Path outputPath = FileOutputFormat.getOutputPath(job);
+                    FileSystem dfs = FileSystem.get(conf);
+                    Path filePath = new Path(outputPath, "part-" + new Integer(partition).toString());
+                    FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
+                        @Override
+                        public boolean accept(Path dir) {
+                            return dir.getName().endsWith(TEMP_DIR);
+                        }
+                    });
+                    Path tempDir = tempPaths[0].getPath();
+                    FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
+                        @Override
+                        public boolean accept(Path dir) {
+                            return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
+                        }
+                    });
+                    Path srcDir = results[0].getPath();
+                    if (!dfs.exists(srcDir))
+                        throw new HyracksDataException("file " + srcDir.toString() + " does not exist!");
+
+                    FileStatus[] srcFiles = dfs.listStatus(srcDir);
+                    Path srcFile = srcFiles[0].getPath();
+                    dfs.delete(filePath, true);
+                    dfs.rename(srcFile, filePath);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+        };
+    }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
new file mode 100644
index 0000000..b1bb555
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.pregelix.dataflow.state.MaterializerTaskState;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+
+public class MaterializingReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public MaterializingReadOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor) {
+        super(spec, 1, 1);
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+            private ByteBuffer frame = ctx.allocateFrame();
+            private boolean complete = false;
+
+            @Override
+            public void open() throws HyracksDataException {
+
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                if (!complete) {
+                    MaterializerTaskState state = (MaterializerTaskState) IterationUtils.getIterationState(ctx,
+                            partition);
+                    RunFileReader in = state.getRunFileWriter().createReader();
+                    writer.open();
+                    try {
+                        in.open();
+                        while (in.nextFrame(frame)) {
+                            frame.flip();
+                            writer.nextFrame(frame);
+                            frame.clear();
+                        }
+                        in.close();
+                    } catch (Exception e) {
+                        writer.fail();
+                        throw new HyracksDataException(e);
+                    } finally {
+                        /**
+                         * remove last iteration's state
+                         */
+                        IterationUtils.removeIterationState(ctx, partition);
+                        writer.close();
+                    }
+                    complete = true;
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
new file mode 100644
index 0000000..efe5f1b
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+import edu.uci.ics.pregelix.dataflow.state.MaterializerTaskState;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+
+public class MaterializingWriteOperatorDescriptor extends AbstractOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private final static int MATERIALIZER_ACTIVITY_ID = 0;
+
+    public MaterializingWriteOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor) {
+        super(spec, 1, 1);
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, MATERIALIZER_ACTIVITY_ID));
+
+        builder.addActivity(this, ma);
+        builder.addSourceEdge(0, ma, 0);
+        builder.addTargetEdge(0, ma, 0);
+    }
+
+    private final class MaterializerActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public MaterializerActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+            return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+                private MaterializerTaskState state;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+                            partition));
+                    INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+                    RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+                    FileReference file = context.createManagedWorkspaceFile(MaterializingWriteOperatorDescriptor.class
+                            .getSimpleName());
+                    state.setRunFileWriter(new RunFileWriter(file, ctx.getIOManager()));
+                    state.getRunFileWriter().open();
+                    writer.open();
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    state.getRunFileWriter().nextFrame(buffer);
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    state.getRunFileWriter().close();
+                    /**
+                     * set iteration state
+                     */
+                    IterationUtils.setIterationState(ctx, partition, state);
+                    writer.close();
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                }
+            };
+        }
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/NonCombinerConnectorPolicyAssignmentPolicy.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/NonCombinerConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..8023fe5
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/NonCombinerConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.pregelix.dataflow;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+
+public class NonCombinerConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
+    private static final long serialVersionUID = 1L;
+    private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+    private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+
+    @Override
+    public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+            int[] fanouts) {
+        if (c instanceof MToNPartitioningConnectorDescriptor) {
+            return senderSideMaterializePolicy;
+        } else {
+            return pipeliningPolicy;
+        }
+    }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java
new file mode 100644
index 0000000..22251fd
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java
@@ -0,0 +1,64 @@
+package edu.uci.ics.pregelix.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+
+public class TerminationStateWriterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(TerminationStateWriterOperatorDescriptor.class.getName());
+
+    private final IConfigurationFactory confFactory;
+    private final String jobId;
+
+    public TerminationStateWriterOperatorDescriptor(JobSpecification spec, IConfigurationFactory confFactory,
+            String jobId) {
+        super(spec, 1, 0);
+        this.confFactory = confFactory;
+        this.jobId = jobId;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return new AbstractUnaryInputSinkOperatorNodePushable() {
+            private Configuration conf = confFactory.createConfiguration();
+            private boolean terminate = true;
+
+            @Override
+            public void open() throws HyracksDataException {
+                terminate = true;
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                terminate = false;
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                IterationUtils.writeTerminationState(conf, jobId, terminate);
+                LOGGER.info("close termination state");
+                if (terminate)
+                    LOGGER.info("write termination to HDFS");
+            }
+
+        };
+    }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
new file mode 100644
index 0000000..4795718
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+
+@SuppressWarnings("rawtypes")
+public class VertexFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final Logger LOGGER = Logger.getLogger(VertexFileScanOperatorDescriptor.class.getName());
+    private static final long serialVersionUID = 1L;
+    private final List<InputSplit> splits;
+    private final IConfigurationFactory confFactory;
+    private final int fieldSize = 2;
+
+    /**
+     * @param spec
+     */
+    public VertexFileScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, List<InputSplit> splits,
+            IConfigurationFactory confFactory) throws HyracksException {
+        super(spec, 0, 1);
+        this.splits = splits;
+        this.confFactory = confFactory;
+        this.recordDescriptors[0] = rd;
+    }
+
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+            throws HyracksDataException {
+        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+            private Configuration conf = confFactory.createConfiguration();
+
+            @Override
+            public void initialize() throws HyracksDataException {
+                try {
+                    Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                    writer.open();
+                    loadVertices(ctx, partition);
+                    writer.close();
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            /**
+             * Load the vertices
+             * 
+             * @parameter IHyracks ctx
+             * 
+             * @throws IOException
+             * @throws IllegalAccessException
+             * @throws InstantiationException
+             * @throws ClassNotFoundException
+             * @throws InterruptedException
+             */
+            @SuppressWarnings("unchecked")
+            private void loadVertices(final IHyracksTaskContext ctx, int partitionId) throws IOException,
+                    ClassNotFoundException, InterruptedException, InstantiationException, IllegalAccessException {
+                ByteBuffer frame = ctx.allocateFrame();
+                FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+                appender.reset(frame, true);
+
+                VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
+                TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
+                InputSplit split = splits.get(partition);
+
+                if (split instanceof FileSplit) {
+                    FileSplit fileSplit = (FileSplit) split;
+                    LOGGER.info("read file split: " + fileSplit.getPath() + " location:" + fileSplit.getLocations()[0]
+                            + " start:" + fileSplit.getStart() + " length:" + split.getLength() + " partition:"
+                            + partition);
+                }
+                VertexReader vertexReader = vertexInputFormat.createVertexReader(split, context);
+                vertexReader.initialize(split, context);
+                Vertex readerVertex = (Vertex) BspUtils.createVertex(conf);
+                ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldSize);
+                DataOutput dos = tb.getDataOutput();
+
+                /**
+                 * set context
+                 */
+                Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
+                        splits.get(partition));
+                Vertex.setContext(mapperContext);
+
+                /**
+                 * empty vertex value
+                 */
+                Writable emptyVertexValue = (Writable) BspUtils.createVertexValue(conf);
+
+                while (vertexReader.nextVertex()) {
+                    readerVertex = vertexReader.getCurrentVertex();
+                    tb.reset();
+                    if (readerVertex.getVertexId() == null) {
+                        throw new IllegalArgumentException("loadVertices: Vertex reader returned a vertex "
+                                + "without an id!  - " + readerVertex);
+                    }
+                    if (readerVertex.getVertexValue() == null) {
+                        readerVertex.setVertexValue(emptyVertexValue);
+                    }
+                    WritableComparable vertexId = readerVertex.getVertexId();
+                    vertexId.write(dos);
+                    tb.addFieldEndOffset();
+
+                    readerVertex.write(dos);
+                    tb.addFieldEndOffset();
+
+                    if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                        if (appender.getTupleCount() <= 0)
+                            throw new IllegalStateException("zero tuples in a frame!");
+                        FrameUtils.flushFrame(frame, writer);
+                        appender.reset(frame, true);
+                        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                            throw new IllegalStateException();
+                        }
+                    }
+                }
+
+                vertexReader.close();
+                if (appender.getTupleCount() > 0) {
+                    FrameUtils.flushFrame(frame, writer);
+                }
+                System.gc();
+            }
+        };
+    }
+
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
new file mode 100644
index 0000000..b31f376
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.base;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IConfigurationFactory extends Serializable {
+
+    public Configuration createConfiguration() throws HyracksDataException;
+
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
new file mode 100644
index 0000000..f18be9e
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.context;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
+import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+import edu.uci.ics.hyracks.storage.common.smi.TransientFileMapManager;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+
+public class RuntimeContext implements IWorkspaceFileFactory {
+    private static final Logger LOGGER = Logger.getLogger(RuntimeContext.class.getName());
+
+    private IndexRegistry<IIndex> treeIndexRegistry;
+    private IBufferCache bufferCache;
+    private IFileMapManager fileMapManager;
+    private Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
+    private Map<String, Long> giraphJobIdToSuperStep = new ConcurrentHashMap<String, Long>();
+    private Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
+    private IOManager ioManager;
+    private Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
+
+    public RuntimeContext(INCApplicationContext appCtx) {
+        fileMapManager = new TransientFileMapManager();
+        ICacheMemoryAllocator allocator = new HeapBufferAllocator();
+        IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
+        int pageSize = 64 * 1024;
+        long memSize = Runtime.getRuntime().maxMemory();
+        long bufferSize = memSize / 4;
+        int numPages = (int) (bufferSize / pageSize);
+        bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs, fileMapManager, pageSize,
+                numPages, 1000000);
+        treeIndexRegistry = new IndexRegistry<IIndex>();
+        ioManager = (IOManager) appCtx.getRootContext().getIOManager();
+    }
+
+    public void close() {
+        for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
+            for (FileReference fileRef : entry.getValue())
+                fileRef.delete();
+
+        iterationToFiles.clear();
+        bufferCache.close();
+        appStateMap.clear();
+
+        System.gc();
+    }
+
+    public IBufferCache getBufferCache() {
+        return bufferCache;
+    }
+
+    public IFileMapProvider getFileMapManager() {
+        return fileMapManager;
+    }
+
+    public IndexRegistry<IIndex> getTreeIndexRegistry() {
+        return treeIndexRegistry;
+    }
+
+    public Map<StateKey, IStateObject> getAppStateStore() {
+        return appStateMap;
+    }
+
+    public static RuntimeContext get(IHyracksTaskContext ctx) {
+        return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
+    }
+
+    public synchronized void setVertexProperties(String giraphJobId, long numVertices, long numEdges) {
+        Boolean toMove = giraphJobIdToMove.get(giraphJobId);
+        if (toMove == null || toMove == true) {
+            if (giraphJobIdToSuperStep.get(giraphJobId) == null) {
+                giraphJobIdToSuperStep.put(giraphJobId, 0L);
+            }
+
+            long superStep = giraphJobIdToSuperStep.get(giraphJobId);
+            List<FileReference> files = iterationToFiles.remove(superStep - 1);
+            if (files != null) {
+                for (FileReference fileRef : files)
+                    fileRef.delete();
+            }
+
+            Vertex.setSuperstep(++superStep);
+            Vertex.setNumVertices(numVertices);
+            Vertex.setNumEdges(numEdges);
+            giraphJobIdToSuperStep.put(giraphJobId, superStep);
+            giraphJobIdToMove.put(giraphJobId, false);
+            LOGGER.info("start iteration " + Vertex.getCurrentSuperstep());
+        }
+        System.gc();
+    }
+
+    public synchronized void endSuperStep(String giraphJobId) {
+        giraphJobIdToMove.put(giraphJobId, true);
+        LOGGER.info("end iteration " + Vertex.getCurrentSuperstep());
+    }
+
+    @Override
+    public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
+        final FileReference fRef = ioManager.createWorkspaceFile(prefix);
+        List<FileReference> files = iterationToFiles.get(Vertex.getCurrentSuperstep());
+        if (files == null) {
+            files = new ArrayList<FileReference>();
+            iterationToFiles.put(Vertex.getCurrentSuperstep(), files);
+        }
+        files.add(fRef);
+        return fRef;
+    }
+
+    @Override
+    public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
+        return ioManager.createWorkspaceFile(prefix);
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/StateKey.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/StateKey.java
new file mode 100644
index 0000000..ae58802
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/StateKey.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.context;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class StateKey {
+    private final JobId jobId;
+    private final int partition;
+
+    public StateKey(JobId jobId, int partition) {
+        this.jobId = jobId;
+        this.partition = partition;
+    }
+
+    @Override
+    public int hashCode() {
+        return jobId.hashCode() * partition;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof StateKey))
+            return false;
+        StateKey key = (StateKey) o;
+        return key.jobId.equals(jobId) && key.partition == partition;
+    }
+
+    @Override
+    public String toString() {
+        return jobId.toString() + ":" + partition;
+    }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/state/MaterializerTaskState.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/state/MaterializerTaskState.java
new file mode 100644
index 0000000..058a1bc
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/state/MaterializerTaskState.java
@@ -0,0 +1,39 @@
+package edu.uci.ics.pregelix.dataflow.state;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+
+public class MaterializerTaskState extends AbstractStateObject {
+    private RunFileWriter out;
+
+    public MaterializerTaskState() {
+    }
+
+    public MaterializerTaskState(JobId jobId, TaskId taskId) {
+        super(jobId, taskId);
+    }
+
+    @Override
+    public void toBytes(DataOutput out) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void fromBytes(DataInput in) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    public RunFileWriter getRunFileWriter() {
+        return out;
+    }
+
+    public void setRunFileWriter(RunFileWriter out) {
+        this.out = out;
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
new file mode 100644
index 0000000..f57b3fa
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.util;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+import edu.uci.ics.pregelix.dataflow.context.StateKey;
+
+public class IterationUtils {
+    public static final String TMP_DIR = "/tmp/";
+
+    public static void setIterationState(IHyracksTaskContext ctx, int partition, IStateObject state) {
+        INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+        RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+        Map<StateKey, IStateObject> map = context.getAppStateStore();
+        map.put(new StateKey(ctx.getJobletContext().getJobId(), partition), state);
+    }
+
+    public static IStateObject getIterationState(IHyracksTaskContext ctx, int partition) {
+        JobId currentId = ctx.getJobletContext().getJobId();
+        JobId lastId = new JobId(currentId.getId() - 1);
+        INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+        RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+        Map<StateKey, IStateObject> map = context.getAppStateStore();
+        IStateObject state = map.get(new StateKey(lastId, partition));
+        return state;
+    }
+
+    public static void removeIterationState(IHyracksTaskContext ctx, int partition) {
+        JobId currentId = ctx.getJobletContext().getJobId();
+        JobId lastId = new JobId(currentId.getId() - 1);
+        INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+        RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+        Map<StateKey, IStateObject> map = context.getAppStateStore();
+        map.remove(new StateKey(lastId, partition));
+    }
+
+    public static void endSuperStep(String giraphJobId, IHyracksTaskContext ctx) {
+        INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+        RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+        context.endSuperStep(giraphJobId);
+    }
+
+    public static void setProperties(String giraphJobId, IHyracksTaskContext ctx, Configuration conf) {
+        INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+        RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+        context.setVertexProperties(giraphJobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
+                conf.getLong(PregelixJob.NUM_EDGES, -1));
+    }
+
+    public static void writeTerminationState(Configuration conf, String jobId, boolean terminate)
+            throws HyracksDataException {
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            String pathStr = IterationUtils.TMP_DIR + jobId;
+            Path path = new Path(pathStr);
+            FSDataOutputStream output = dfs.create(path, true);
+            output.writeBoolean(terminate);
+            output.flush();
+            output.close();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public static boolean readTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            String pathStr = IterationUtils.TMP_DIR + jobId;
+            Path path = new Path(pathStr);
+            FSDataInputStream input = dfs.open(path);
+            boolean terminate = input.readBoolean();
+            input.close();
+            return terminate;
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+}