add Pregelix codebase

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1960 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
new file mode 100644
index 0000000..0187924
--- /dev/null
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -0,0 +1,183 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<artifactId>pregelix-runtime</artifactId>
+	<packaging>jar</packaging>
+	<name>pregelix-runtime</name>
+
+	<parent>
+    		<groupId>edu.uci.ics.pregelix</groupId>
+    		<artifactId>pregelix</artifactId>
+    		<version>0.0.1-SNAPSHOT</version>
+  	</parent>
+
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>2.0.2</version>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.7.2</version>
+				<configuration>
+					<forkMode>pertest</forkMode>
+					<argLine>-enableassertions -Xmx2047m -Dfile.encoding=UTF-8
+						-Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
+					<includes>
+						<include>**/*TestSuite.java</include>
+						<include>**/*Test.java</include>
+					</includes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-clean-plugin</artifactId>
+				<configuration>
+					<filesets>
+						<fileset>
+							<directory>.</directory>
+							<includes>
+								<include>teststore*</include>
+								<include>edu*</include>
+								<include>actual*</include>
+								<include>build*</include>
+								<include>expect*</include>
+								<include>ClusterController*</include>
+							</includes>
+						</fileset>
+					</filesets>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+	<dependencies>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.8.1</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.pregelix</groupId>
+			<artifactId>pregelix-api</artifactId>
+			<version>0.0.1-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.pregelix</groupId>
+			<artifactId>pregelix-dataflow-std</artifactId>
+			<version>0.0.1-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.pregelix</groupId>
+			<artifactId>pregelix-dataflow</artifactId>
+			<version>0.0.1-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-dataflow-std</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-api</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-dataflow-common</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-dataflow-hadoop</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-data-std</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-core</artifactId>
+			<version>0.20.2</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-storage-am-common</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-storage-am-btree</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks.examples.btree</groupId>
+			<artifactId>btreehelper</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-control-cc</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-control-nc</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-hadoop-compat</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-ipc</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+</project>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunction.java
new file mode 100644
index 0000000..4cd8f52
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.base;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IAggregateFunction {
+    /** should be called each time a new aggregate value is computed */
+    public void init() throws HyracksDataException;
+
+    public void step(IFrameTupleReference tuple) throws HyracksDataException;
+
+    public void finish() throws HyracksDataException;
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunctionFactory.java
new file mode 100644
index 0000000..b8f70b6
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunctionFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public interface IAggregateFunctionFactory extends Serializable {
+    public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws HyracksException;
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
new file mode 100644
index 0000000..e7caeaf
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.bootstrap;
+
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.INCBootstrap;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+public class NCBootstrapImpl implements INCBootstrap {
+    private static final Logger LOGGER = Logger.getLogger(NCBootstrapImpl.class.getName());
+    private INCApplicationContext appCtx;
+
+    @Override
+    public void start() throws Exception {
+        LOGGER.info("Starting NC Bootstrap");
+        RuntimeContext rCtx = new RuntimeContext(appCtx);
+        appCtx.setApplicationObject(rCtx);
+        LOGGER.info("Initialized RuntimeContext: " + rCtx);
+    }
+
+    @Override
+    public void stop() throws Exception {
+        LOGGER.info("Stopping Giraph NC Bootstrap");
+        RuntimeContext rCtx = (RuntimeContext) appCtx.getApplicationObject();
+        rCtx.close();
+    }
+
+    @Override
+    public void setApplicationContext(INCApplicationContext appCtx) {
+        this.appCtx = appCtx;
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java
new file mode 100644
index 0000000..57bbfbe
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.bootstrap;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+public class StorageManagerInterface implements IStorageManagerInterface {
+    private static final long serialVersionUID = 1L;
+
+    public static final StorageManagerInterface INSTANCE = new StorageManagerInterface();
+
+    private StorageManagerInterface() {
+    }
+
+    @Override
+    public IBufferCache getBufferCache(IHyracksTaskContext ctx) {
+        return RuntimeContext.get(ctx).getBufferCache();
+    }
+
+    @Override
+    public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx) {
+        return RuntimeContext.get(ctx).getFileMapManager();
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java
new file mode 100644
index 0000000..7d66422
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.bootstrap;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+public class TreeIndexRegistryProvider implements IIndexRegistryProvider<IIndex> {
+    private static final long serialVersionUID = 1L;
+
+    public static final TreeIndexRegistryProvider INSTANCE = new TreeIndexRegistryProvider();
+
+    private TreeIndexRegistryProvider() {
+    }
+
+    @Override
+    public IndexRegistry<IIndex> getRegistry(IHyracksTaskContext ctx) {
+        return RuntimeContext.get(ctx).getTreeIndexRegistry();
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
new file mode 100644
index 0000000..ed0ecb6
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.function;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.ArrayListWritable;
+import edu.uci.ics.pregelix.api.util.ArrayListWritable.ArrayIterator;
+import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.util.ResetableByteArrayOutputStream;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class ComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
+    private static final long serialVersionUID = 1L;
+    public static IUpdateFunctionFactory INSTANCE = new ComputeUpdateFunctionFactory();
+
+    @Override
+    public IUpdateFunction createFunction() {
+        return new IUpdateFunction() {
+            // for writing intermediate data
+            private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
+            private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
+            private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
+
+            // for writing out to message channel
+            private IFrameWriter writerMsg;
+            private FrameTupleAppender appenderMsg;
+            private ByteBuffer bufferMsg;
+
+            // for writing out to alive message channel
+            private IFrameWriter writerAlive;
+            private FrameTupleAppender appenderAlive;
+            private ByteBuffer bufferAlive;
+            private boolean pushAlive;
+
+            // for writing out termination detection control channel
+            private IFrameWriter writerTerminate;
+            private FrameTupleAppender appenderTerminate;
+            private ByteBuffer bufferTerminate;
+            private boolean terminate = true;
+
+            private Vertex vertex;
+            private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
+            private DataOutput output = new DataOutputStream(bbos);
+
+            private ArrayIterator msgIterator = new ArrayIterator();
+            private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
+            private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
+            private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+
+            @Override
+            public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
+                    throws HyracksDataException {
+                this.writerMsg = writers[0];
+                this.bufferMsg = ctx.allocateFrame();
+                this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderMsg.reset(bufferMsg, true);
+                this.writers.add(writerMsg);
+                this.appenders.add(appenderMsg);
+
+                this.writerTerminate = writers[1];
+                this.bufferTerminate = ctx.allocateFrame();
+                this.appenderTerminate = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderTerminate.reset(bufferTerminate, true);
+
+                if (writers.length > 2) {
+                    this.writerAlive = writers[2];
+                    this.bufferAlive = ctx.allocateFrame();
+                    this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+                    this.appenderAlive.reset(bufferAlive, true);
+                    this.pushAlive = true;
+                    this.writers.add(writerAlive);
+                    this.appenders.add(appenderAlive);
+                }
+
+                tbs.add(tbMsg);
+                tbs.add(tbAlive);
+            }
+
+            @Override
+            public void process(Object[] tuple) throws HyracksDataException {
+                // vertex Id, msg content List, vertex Id, vertex
+                tbMsg.reset();
+                tbAlive.reset();
+
+                vertex = (Vertex) tuple[3];
+                vertex.setOutputWriters(writers);
+                vertex.setOutputAppenders(appenders);
+                vertex.setOutputTupleBuilders(tbs);
+
+                ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
+                msgContentList.reset(msgIterator);
+
+                if (!msgIterator.hasNext() && vertex.isHalted())
+                    return;
+
+                try {
+                    vertex.compute(msgIterator);
+                    vertex.finishCompute();
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+
+                /**
+                 * this partition should not terminate
+                 */
+                if (terminate && (!vertex.isHalted() || vertex.hasMessage()))
+                    terminate = false;
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+                if (pushAlive)
+                    FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
+                if (!terminate) {
+                    writeOutTerminationState();
+                }
+            }
+
+            private void writeOutTerminationState() throws HyracksDataException {
+                try {
+                    tbTerminate.getDataOutput().writeLong(0);
+                    tbTerminate.addFieldEndOffset();
+                    appenderTerminate.append(tbTerminate.getFieldEndOffsets(), tbTerminate.getByteArray(), 0,
+                            tbTerminate.getSize());
+                    FrameTupleUtils.flushTuplesFinal(appenderTerminate, writerTerminate);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void update(ITupleReference tupleRef) throws HyracksDataException {
+                try {
+                    if (vertex != null && vertex.hasUpdate()) {
+                        int fieldCount = tupleRef.getFieldCount();
+                        for (int i = 1; i < fieldCount; i++) {
+                            byte[] data = tupleRef.getFieldData(i);
+                            int offset = tupleRef.getFieldStart(i);
+                            bbos.setByteArray(data, offset);
+                            vertex.write(output);
+                        }
+                    }
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
new file mode 100644
index 0000000..9ea8215
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.function;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.ArrayListWritable.ArrayIterator;
+import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.util.ResetableByteArrayOutputStream;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class StartComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
+    private static final long serialVersionUID = 1L;
+    public static IUpdateFunctionFactory INSTANCE = new StartComputeUpdateFunctionFactory();
+
+    @Override
+    public IUpdateFunction createFunction() {
+        return new IUpdateFunction() {
+            // for writing intermediate data
+            private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
+            private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
+            private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
+
+            // for writing out to message channel
+            private IFrameWriter writerMsg;
+            private FrameTupleAppender appenderMsg;
+            private ByteBuffer bufferMsg;
+
+            // for writing out to alive message channel
+            private IFrameWriter writerAlive;
+            private FrameTupleAppender appenderAlive;
+            private ByteBuffer bufferAlive;
+            private boolean pushAlive;
+
+            // for writing out termination detection control channel
+            private IFrameWriter writerTerminate;
+            private FrameTupleAppender appenderTerminate;
+            private ByteBuffer bufferTerminate;
+            private boolean terminate = true;
+
+            // dummy empty msgList
+            private MsgList msgList = new MsgList();
+            private ArrayIterator msgIterator = new ArrayIterator();
+
+            private Vertex vertex;
+            private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
+            private DataOutput output = new DataOutputStream(bbos);
+
+            private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
+            private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
+            private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+
+            @Override
+            public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
+                    throws HyracksDataException {
+                this.writerMsg = writers[0];
+                this.bufferMsg = ctx.allocateFrame();
+                this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderMsg.reset(bufferMsg, true);
+                this.writers.add(writerMsg);
+                this.appenders.add(appenderMsg);
+
+                this.writerTerminate = writers[1];
+                this.bufferTerminate = ctx.allocateFrame();
+                this.appenderTerminate = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderTerminate.reset(bufferTerminate, true);
+
+                if (writers.length > 2) {
+                    this.writerAlive = writers[2];
+                    this.bufferAlive = ctx.allocateFrame();
+                    this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+                    this.appenderAlive.reset(bufferAlive, true);
+                    this.pushAlive = true;
+                    this.writers.add(writerAlive);
+                    this.appenders.add(appenderAlive);
+                }
+                msgList.reset(msgIterator);
+
+                tbs.add(tbMsg);
+                tbs.add(tbAlive);
+            }
+
+            @Override
+            public void process(Object[] tuple) throws HyracksDataException {
+                // vertex Id, vertex
+                tbMsg.reset();
+                tbAlive.reset();
+
+                vertex = (Vertex) tuple[1];
+                vertex.setOutputWriters(writers);
+                vertex.setOutputAppenders(appenders);
+                vertex.setOutputTupleBuilders(tbs);
+
+                if (!msgIterator.hasNext() && vertex.isHalted())
+                    return;
+
+                try {
+                    vertex.compute(msgIterator);
+                    vertex.finishCompute();
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+
+                /**
+                 * this partition should not terminate
+                 */
+                if (terminate && (!vertex.isHalted() || vertex.hasMessage()))
+                    terminate = false;
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+                if (pushAlive)
+                    FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
+                if (!terminate) {
+                    writeOutTerminationState();
+                }
+            }
+
+            private void writeOutTerminationState() throws HyracksDataException {
+                try {
+                    tbTerminate.getDataOutput().writeLong(0);
+                    tbTerminate.addFieldEndOffset();
+                    appenderTerminate.append(tbTerminate.getFieldEndOffsets(), tbTerminate.getByteArray(), 0,
+                            tbTerminate.getSize());
+                    FrameTupleUtils.flushTuplesFinal(appenderTerminate, writerTerminate);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void update(ITupleReference tupleRef) throws HyracksDataException {
+                try {
+                    if (vertex != null && vertex.hasUpdate()) {
+                        int fieldCount = tupleRef.getFieldCount();
+                        for (int i = 1; i < fieldCount; i++) {
+                            byte[] data = tupleRef.getFieldData(i);
+                            int offset = tupleRef.getFieldStart(i);
+                            bbos.setByteArray(data, offset);
+                            vertex.write(output);
+                        }
+                    }
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
new file mode 100644
index 0000000..659100a
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.simpleagg;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.pregelix.runtime.base.IAggregateFunction;
+import edu.uci.ics.pregelix.runtime.base.IAggregateFunctionFactory;
+
+public class AccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+    private IAggregateFunctionFactory[] aggFactories;
+
+    public AccumulatingAggregatorFactory(IAggregateFunctionFactory[] aggFactories, int[] keys, int[] fdColumns) {
+        this.aggFactories = aggFactories;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
+            RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException {
+
+        return new IAggregatorDescriptor() {
+
+            private FrameTupleReference ftr = new FrameTupleReference();
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+                IAggregateFunction[] agg = aggState.getRight();
+
+                // initialize aggregate functions
+                for (int i = 0; i < agg.length; i++) {
+                    aggOutput[i].reset();
+                    try {
+                        agg[i].init();
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].step(ftr);
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                IAggregateFunction[] agg = aggState.getRight();
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].step(ftr);
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+                IAggregateFunction[] agg = aggState.getRight();
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].finish();
+                        tupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
+                                aggOutput[i].getLength());
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public AggregateState createAggregateStates() {
+                IAggregateFunction[] agg = new IAggregateFunction[aggFactories.length];
+                ArrayBackedValueStorage[] aggOutput = new ArrayBackedValueStorage[aggFactories.length];
+                for (int i = 0; i < agg.length; i++) {
+                    aggOutput[i] = new ArrayBackedValueStorage();
+                    try {
+                        agg[i] = aggFactories[i].createAggregateFunction(aggOutput[i]);
+                    } catch (Exception e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+                return new AggregateState(Pair.of(aggOutput, agg));
+            }
+
+            @Override
+            public void reset() {
+
+            }
+
+            @Override
+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                throw new IllegalStateException("this method should not be called");
+            }
+
+            @Override
+            public void close() {
+
+            }
+
+        };
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
new file mode 100644
index 0000000..db40af8
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -0,0 +1,102 @@
+package edu.uci.ics.pregelix.runtime.simpleagg;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.VertexCombiner;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.runtime.base.IAggregateFunction;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class AggregationFunction implements IAggregateFunction {
+    private final Configuration conf;
+    private final boolean isFinalStage;
+    private final DataOutput output;
+    private VertexCombiner combiner;
+    private ByteBufferInputStream keyInputStream = new ByteBufferInputStream();
+    private ByteBufferInputStream valueInputStream = new ByteBufferInputStream();
+    private DataInput keyInput = new DataInputStream(keyInputStream);
+    private DataInput valueInput = new DataInputStream(valueInputStream);
+    private WritableComparable key;
+    private Writable value;
+    private Writable combinedResult;
+    private MsgList msgList = new MsgList();
+    private boolean keyRead = false;
+
+    public AggregationFunction(IConfigurationFactory confFactory, DataOutput output, boolean isFinalStage)
+            throws HyracksDataException {
+        this.conf = confFactory.createConfiguration();
+        this.output = output;
+        this.isFinalStage = isFinalStage;
+        msgList.setConf(this.conf);
+
+        combiner = BspUtils.createVertexCombiner(conf);
+        key = BspUtils.createVertexIndex(conf);
+        value = BspUtils.createMessageValue(conf);
+        combinedResult = BspUtils.createMessageValue(conf);
+    }
+
+    @Override
+    public void init() throws HyracksDataException {
+        msgList.clear();
+        keyRead = false;
+        combiner.init();
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws HyracksDataException {
+        FrameTupleReference ftr = (FrameTupleReference) tuple;
+        IFrameTupleAccessor fta = ftr.getFrameTupleAccessor();
+        ByteBuffer buffer = fta.getBuffer();
+        int tIndex = ftr.getTupleIndex();
+
+        int keyStart = fta.getFieldSlotsLength() + fta.getTupleStartOffset(tIndex) + fta.getFieldStartOffset(tIndex, 0);
+        int valueStart = fta.getFieldSlotsLength() + fta.getTupleStartOffset(tIndex)
+                + fta.getFieldStartOffset(tIndex, 1);
+
+        keyInputStream.setByteBuffer(buffer, keyStart);
+        valueInputStream.setByteBuffer(buffer, valueStart);
+
+        try {
+            if (!keyRead) {
+                key.readFields(keyInput);
+                keyRead = false;
+            }
+            value.readFields(valueInput);
+            combiner.step(key, value);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+
+    }
+
+    @Override
+    public void finish() throws HyracksDataException {
+        try {
+            combinedResult = combiner.finish();
+            if (!isFinalStage) {
+                combinedResult.write(output);
+            } else {
+                msgList.add(combinedResult);
+                msgList.write(output);
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
new file mode 100644
index 0000000..9feb56c
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
@@ -0,0 +1,26 @@
+package edu.uci.ics.pregelix.runtime.simpleagg;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.runtime.base.IAggregateFunction;
+import edu.uci.ics.pregelix.runtime.base.IAggregateFunctionFactory;
+
+public class AggregationFunctionFactory implements IAggregateFunctionFactory {
+    private static final long serialVersionUID = 1L;
+    private final IConfigurationFactory confFactory;
+    private final boolean isFinalStage;
+
+    public AggregationFunctionFactory(IConfigurationFactory confFactory, boolean isFinalStage) {
+        this.confFactory = confFactory;
+        this.isFinalStage = isFinalStage;
+    }
+
+    @Override
+    public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws HyracksException {
+        DataOutput output = provider.getDataOutput();
+        return new AggregationFunction(confFactory, output, isFinalStage);
+    }
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MergePartitionComputerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MergePartitionComputerFactory.java
new file mode 100644
index 0000000..5a38e9f
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MergePartitionComputerFactory.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class MergePartitionComputerFactory implements ITuplePartitionComputerFactory {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ITuplePartitionComputer createPartitioner() {
+        return new ITuplePartitionComputer() {
+
+            @Override
+            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+                return 0;
+            }
+
+        };
+    }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java
new file mode 100644
index 0000000..97eac11
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class MsgListNullWriterFactory implements INullWriterFactory {
+    private static final long serialVersionUID = 1L;
+    public static INullWriterFactory INSTANCE = new MsgListNullWriterFactory();
+
+    @Override
+    public INullWriter createNullWriter() {
+        return new INullWriter() {
+
+            @Override
+            public void writeNull(DataOutput out) throws HyracksDataException {
+                try {
+                    out.writeInt(0);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+        };
+    }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PostSuperStepRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PostSuperStepRuntimeHookFactory.java
new file mode 100644
index 0000000..2150f2e
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PostSuperStepRuntimeHookFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHook;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+
+public class PostSuperStepRuntimeHookFactory implements IRuntimeHookFactory {
+    private static final long serialVersionUID = 1L;
+    private final String giraphJobId;
+
+    public PostSuperStepRuntimeHookFactory(String giraphJobId) {
+        this.giraphJobId = giraphJobId;
+    }
+
+    @Override
+    public IRuntimeHook createRuntimeHook() {
+        return new IRuntimeHook() {
+
+            @Override
+            public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
+                IterationUtils.endSuperStep(giraphJobId, ctx);
+            }
+
+        };
+    }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
new file mode 100644
index 0000000..5f0ed9e
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHook;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+
+public class PreSuperStepRuntimeHookFactory implements IRuntimeHookFactory {
+    private static final long serialVersionUID = 1L;
+    private final IConfigurationFactory confFactory;
+    private final String giraphJobId;
+
+    public PreSuperStepRuntimeHookFactory(String giraphJobId, IConfigurationFactory confFactory) {
+        this.confFactory = confFactory;
+        this.giraphJobId = giraphJobId;
+    }
+
+    @Override
+    public IRuntimeHook createRuntimeHook() {
+        return new IRuntimeHook() {
+
+            @Override
+            public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
+                Configuration conf = confFactory.createConfiguration();
+                IterationUtils.setProperties(giraphJobId, ctx, conf);
+            }
+
+        };
+    }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
new file mode 100644
index 0000000..f7d0018
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHook;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+
+public class RuntimeHookFactory implements IRuntimeHookFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final IConfigurationFactory confFactory;
+
+    public RuntimeHookFactory(IConfigurationFactory confFactory) {
+        this.confFactory = confFactory;
+    }
+
+    @Override
+    public IRuntimeHook createRuntimeHook() {
+
+        return new IRuntimeHook() {
+
+            @SuppressWarnings({ "rawtypes", "unchecked" })
+            @Override
+            public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
+                Configuration conf = confFactory.createConfiguration();
+                try {
+                    Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
+                            null);
+                    Vertex.setContext(mapperContext);
+                    BspUtils.setDefaultConfiguration(conf);
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+        };
+    }
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongAscNormalizedKeyComputerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongAscNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..9181691
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongAscNormalizedKeyComputerFactory.java
@@ -0,0 +1,56 @@
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.pregelix.api.util.SerDeUtils;
+
+public class VLongAscNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public INormalizedKeyComputer createNormalizedKeyComputer() {
+        return new INormalizedKeyComputer() {
+            private static final int POSTIVE_LONG_MASK = (3 << 30);
+            private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
+            private static final int NEGATIVE_LONG_MASK = (0 << 30);
+
+            @Override
+            public int normalize(byte[] bytes, int start, int length) {
+                long value = SerDeUtils.readVLong(bytes, start, length);
+                int highValue = (int) (value >> 32);
+                if (highValue > 0) {
+                    /**
+                     * larger than Integer.MAX
+                     */
+                    int highNmk = getKey(highValue);
+                    highNmk >>= 2;
+                    highNmk |= POSTIVE_LONG_MASK;
+                    return highNmk;
+                } else if (highValue == 0) {
+                    /**
+                     * smaller than Integer.MAX but >=0
+                     */
+                    int lowNmk = (int) value;
+                    lowNmk >>= 2;
+                    lowNmk |= NON_NEGATIVE_INT_MASK;
+                    return lowNmk;
+                } else {
+                    /**
+                     * less than 0; TODO: have not optimized for that
+                     */
+                    int highNmk = getKey(highValue);
+                    highNmk >>= 2;
+                    highNmk |= NEGATIVE_LONG_MASK;
+                    return highNmk;
+                }
+            }
+
+            private int getKey(int value) {
+                long unsignedFirstValue = (long) value;
+                int nmk = (int) ((unsignedFirstValue - ((long) Integer.MIN_VALUE)) & 0xffffffffL);
+                return nmk;
+            }
+
+        };
+    }
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongDescNormalizedKeyComputerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongDescNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..6b2738b
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongDescNormalizedKeyComputerFactory.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class VLongDescNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+    private static final long serialVersionUID = 1L;
+    private final INormalizedKeyComputerFactory ascNormalizedKeyComputerFactory = new VLongAscNormalizedKeyComputerFactory();
+
+    @Override
+    public INormalizedKeyComputer createNormalizedKeyComputer() {
+        return new INormalizedKeyComputer() {
+            private INormalizedKeyComputer nmkComputer = ascNormalizedKeyComputerFactory.createNormalizedKeyComputer();
+
+            @Override
+            public int normalize(byte[] bytes, int start, int length) {
+                int nk = nmkComputer.normalize(bytes, start, length);
+                return (int) ((long) Integer.MAX_VALUE - (long) (nk - Integer.MIN_VALUE));
+            }
+
+        };
+    }
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdNullWriterFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdNullWriterFactory.java
new file mode 100644
index 0000000..f0114dd
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdNullWriterFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class VertexIdNullWriterFactory implements INullWriterFactory {
+    private static final long serialVersionUID = 1L;
+    public static INullWriterFactory INSTANCE = new VertexIdNullWriterFactory();
+
+    @Override
+    public INullWriter createNullWriter() {
+        return new INullWriter() {
+
+            @Override
+            public void writeNull(DataOutput out) throws HyracksDataException {
+                try {
+                    /***
+                     * TODO: for now it only works for VLongWritable vertexId
+                     */
+                    out.write(0);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+        };
+    }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
new file mode 100644
index 0000000..5eff497
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import java.io.DataInputStream;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+
+public class VertexIdPartitionComputerFactory<K extends Writable, V extends Writable> implements
+        ITuplePartitionComputerFactory {
+    private static final long serialVersionUID = 1L;
+    private final ISerializerDeserializer<K> keyIO;
+
+    public VertexIdPartitionComputerFactory(ISerializerDeserializer<K> keyIO) {
+        this.keyIO = keyIO;
+    }
+
+    public ITuplePartitionComputer createPartitioner() {
+        return new ITuplePartitionComputer() {
+            private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+            private final DataInputStream dis = new DataInputStream(bbis);
+
+            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+                int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                        + accessor.getFieldStartOffset(tIndex, 0);
+                bbis.setByteBuffer(accessor.getBuffer(), keyStart);
+                K key = keyIO.deserialize(dis);
+                return Math.abs(key.hashCode() % nParts);
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableDescComparingBinaryComparatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableDescComparingBinaryComparatorFactory.java
new file mode 100644
index 0000000..2c7f4b2
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableDescComparingBinaryComparatorFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import org.apache.hadoop.io.RawComparator;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.util.ReflectionUtils;
+
+public class WritableDescComparingBinaryComparatorFactory<T> implements IBinaryComparatorFactory {
+    private static final long serialVersionUID = 1L;
+
+    private Class<? extends RawComparator<T>> cmpClass;
+
+    public WritableDescComparingBinaryComparatorFactory(Class<? extends RawComparator<T>> cmpClass) {
+        this.cmpClass = cmpClass;
+    }
+
+    @Override
+    public IBinaryComparator createBinaryComparator() {
+        final RawComparator<T> instance = ReflectionUtils.createInstance(cmpClass);
+        return new IBinaryComparator() {
+            @Override
+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+                return -instance.compare(b1, s1, l1, b2, s2, l2);
+            }
+        };
+    }
+}
\ No newline at end of file