add Pregelix codebase
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1960 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
new file mode 100644
index 0000000..0187924
--- /dev/null
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -0,0 +1,183 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>pregelix-runtime</artifactId>
+ <packaging>jar</packaging>
+ <name>pregelix-runtime</name>
+
+ <parent>
+ <groupId>edu.uci.ics.pregelix</groupId>
+ <artifactId>pregelix</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.7.2</version>
+ <configuration>
+ <forkMode>pertest</forkMode>
+ <argLine>-enableassertions -Xmx2047m -Dfile.encoding=UTF-8
+ -Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
+ <includes>
+ <include>**/*TestSuite.java</include>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>.</directory>
+ <includes>
+ <include>teststore*</include>
+ <include>edu*</include>
+ <include>actual*</include>
+ <include>build*</include>
+ <include>expect*</include>
+ <include>ClusterController*</include>
+ </includes>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.pregelix</groupId>
+ <artifactId>pregelix-api</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.pregelix</groupId>
+ <artifactId>pregelix-dataflow-std</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.pregelix</groupId>
+ <artifactId>pregelix-dataflow</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-hadoop</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-data-std</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-common</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-btree</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks.examples.btree</groupId>
+ <artifactId>btreehelper</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-cc</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hadoop-compat</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-ipc</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunction.java
new file mode 100644
index 0000000..4cd8f52
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.base;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IAggregateFunction {
+ /** should be called each time a new aggregate value is computed */
+ public void init() throws HyracksDataException;
+
+ public void step(IFrameTupleReference tuple) throws HyracksDataException;
+
+ public void finish() throws HyracksDataException;
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunctionFactory.java
new file mode 100644
index 0000000..b8f70b6
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunctionFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public interface IAggregateFunctionFactory extends Serializable {
+ public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws HyracksException;
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
new file mode 100644
index 0000000..e7caeaf
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.bootstrap;
+
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.INCBootstrap;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+public class NCBootstrapImpl implements INCBootstrap {
+ private static final Logger LOGGER = Logger.getLogger(NCBootstrapImpl.class.getName());
+ private INCApplicationContext appCtx;
+
+ @Override
+ public void start() throws Exception {
+ LOGGER.info("Starting NC Bootstrap");
+ RuntimeContext rCtx = new RuntimeContext(appCtx);
+ appCtx.setApplicationObject(rCtx);
+ LOGGER.info("Initialized RuntimeContext: " + rCtx);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ LOGGER.info("Stopping Giraph NC Bootstrap");
+ RuntimeContext rCtx = (RuntimeContext) appCtx.getApplicationObject();
+ rCtx.close();
+ }
+
+ @Override
+ public void setApplicationContext(INCApplicationContext appCtx) {
+ this.appCtx = appCtx;
+ }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java
new file mode 100644
index 0000000..57bbfbe
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.bootstrap;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+public class StorageManagerInterface implements IStorageManagerInterface {
+ private static final long serialVersionUID = 1L;
+
+ public static final StorageManagerInterface INSTANCE = new StorageManagerInterface();
+
+ private StorageManagerInterface() {
+ }
+
+ @Override
+ public IBufferCache getBufferCache(IHyracksTaskContext ctx) {
+ return RuntimeContext.get(ctx).getBufferCache();
+ }
+
+ @Override
+ public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx) {
+ return RuntimeContext.get(ctx).getFileMapManager();
+ }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java
new file mode 100644
index 0000000..7d66422
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.bootstrap;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+public class TreeIndexRegistryProvider implements IIndexRegistryProvider<IIndex> {
+ private static final long serialVersionUID = 1L;
+
+ public static final TreeIndexRegistryProvider INSTANCE = new TreeIndexRegistryProvider();
+
+ private TreeIndexRegistryProvider() {
+ }
+
+ @Override
+ public IndexRegistry<IIndex> getRegistry(IHyracksTaskContext ctx) {
+ return RuntimeContext.get(ctx).getTreeIndexRegistry();
+ }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
new file mode 100644
index 0000000..ed0ecb6
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.function;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.ArrayListWritable;
+import edu.uci.ics.pregelix.api.util.ArrayListWritable.ArrayIterator;
+import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.util.ResetableByteArrayOutputStream;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class ComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
+ private static final long serialVersionUID = 1L;
+ public static IUpdateFunctionFactory INSTANCE = new ComputeUpdateFunctionFactory();
+
+ @Override
+ public IUpdateFunction createFunction() {
+ return new IUpdateFunction() {
+ // for writing intermediate data
+ private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
+
+ // for writing out to message channel
+ private IFrameWriter writerMsg;
+ private FrameTupleAppender appenderMsg;
+ private ByteBuffer bufferMsg;
+
+ // for writing out to alive message channel
+ private IFrameWriter writerAlive;
+ private FrameTupleAppender appenderAlive;
+ private ByteBuffer bufferAlive;
+ private boolean pushAlive;
+
+ // for writing out termination detection control channel
+ private IFrameWriter writerTerminate;
+ private FrameTupleAppender appenderTerminate;
+ private ByteBuffer bufferTerminate;
+ private boolean terminate = true;
+
+ private Vertex vertex;
+ private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
+ private DataOutput output = new DataOutputStream(bbos);
+
+ private ArrayIterator msgIterator = new ArrayIterator();
+ private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
+ private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
+ private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+
+ @Override
+ public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
+ throws HyracksDataException {
+ this.writerMsg = writers[0];
+ this.bufferMsg = ctx.allocateFrame();
+ this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderMsg.reset(bufferMsg, true);
+ this.writers.add(writerMsg);
+ this.appenders.add(appenderMsg);
+
+ this.writerTerminate = writers[1];
+ this.bufferTerminate = ctx.allocateFrame();
+ this.appenderTerminate = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderTerminate.reset(bufferTerminate, true);
+
+ if (writers.length > 2) {
+ this.writerAlive = writers[2];
+ this.bufferAlive = ctx.allocateFrame();
+ this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderAlive.reset(bufferAlive, true);
+ this.pushAlive = true;
+ this.writers.add(writerAlive);
+ this.appenders.add(appenderAlive);
+ }
+
+ tbs.add(tbMsg);
+ tbs.add(tbAlive);
+ }
+
+ @Override
+ public void process(Object[] tuple) throws HyracksDataException {
+ // vertex Id, msg content List, vertex Id, vertex
+ tbMsg.reset();
+ tbAlive.reset();
+
+ vertex = (Vertex) tuple[3];
+ vertex.setOutputWriters(writers);
+ vertex.setOutputAppenders(appenders);
+ vertex.setOutputTupleBuilders(tbs);
+
+ ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
+ msgContentList.reset(msgIterator);
+
+ if (!msgIterator.hasNext() && vertex.isHalted())
+ return;
+
+ try {
+ vertex.compute(msgIterator);
+ vertex.finishCompute();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+
+ /**
+ * this partition should not terminate
+ */
+ if (terminate && (!vertex.isHalted() || vertex.hasMessage()))
+ terminate = false;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+ if (pushAlive)
+ FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
+ if (!terminate) {
+ writeOutTerminationState();
+ }
+ }
+
+ private void writeOutTerminationState() throws HyracksDataException {
+ try {
+ tbTerminate.getDataOutput().writeLong(0);
+ tbTerminate.addFieldEndOffset();
+ appenderTerminate.append(tbTerminate.getFieldEndOffsets(), tbTerminate.getByteArray(), 0,
+ tbTerminate.getSize());
+ FrameTupleUtils.flushTuplesFinal(appenderTerminate, writerTerminate);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void update(ITupleReference tupleRef) throws HyracksDataException {
+ try {
+ if (vertex != null && vertex.hasUpdate()) {
+ int fieldCount = tupleRef.getFieldCount();
+ for (int i = 1; i < fieldCount; i++) {
+ byte[] data = tupleRef.getFieldData(i);
+ int offset = tupleRef.getFieldStart(i);
+ bbos.setByteArray(data, offset);
+ vertex.write(output);
+ }
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
new file mode 100644
index 0000000..9ea8215
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.function;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.ArrayListWritable.ArrayIterator;
+import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.util.ResetableByteArrayOutputStream;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class StartComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
+ private static final long serialVersionUID = 1L;
+ public static IUpdateFunctionFactory INSTANCE = new StartComputeUpdateFunctionFactory();
+
+ @Override
+ public IUpdateFunction createFunction() {
+ return new IUpdateFunction() {
+ // for writing intermediate data
+ private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
+
+ // for writing out to message channel
+ private IFrameWriter writerMsg;
+ private FrameTupleAppender appenderMsg;
+ private ByteBuffer bufferMsg;
+
+ // for writing out to alive message channel
+ private IFrameWriter writerAlive;
+ private FrameTupleAppender appenderAlive;
+ private ByteBuffer bufferAlive;
+ private boolean pushAlive;
+
+ // for writing out termination detection control channel
+ private IFrameWriter writerTerminate;
+ private FrameTupleAppender appenderTerminate;
+ private ByteBuffer bufferTerminate;
+ private boolean terminate = true;
+
+ // dummy empty msgList
+ private MsgList msgList = new MsgList();
+ private ArrayIterator msgIterator = new ArrayIterator();
+
+ private Vertex vertex;
+ private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
+ private DataOutput output = new DataOutputStream(bbos);
+
+ private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
+ private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
+ private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+
+ @Override
+ public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
+ throws HyracksDataException {
+ this.writerMsg = writers[0];
+ this.bufferMsg = ctx.allocateFrame();
+ this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderMsg.reset(bufferMsg, true);
+ this.writers.add(writerMsg);
+ this.appenders.add(appenderMsg);
+
+ this.writerTerminate = writers[1];
+ this.bufferTerminate = ctx.allocateFrame();
+ this.appenderTerminate = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderTerminate.reset(bufferTerminate, true);
+
+ if (writers.length > 2) {
+ this.writerAlive = writers[2];
+ this.bufferAlive = ctx.allocateFrame();
+ this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderAlive.reset(bufferAlive, true);
+ this.pushAlive = true;
+ this.writers.add(writerAlive);
+ this.appenders.add(appenderAlive);
+ }
+ msgList.reset(msgIterator);
+
+ tbs.add(tbMsg);
+ tbs.add(tbAlive);
+ }
+
+ @Override
+ public void process(Object[] tuple) throws HyracksDataException {
+ // vertex Id, vertex
+ tbMsg.reset();
+ tbAlive.reset();
+
+ vertex = (Vertex) tuple[1];
+ vertex.setOutputWriters(writers);
+ vertex.setOutputAppenders(appenders);
+ vertex.setOutputTupleBuilders(tbs);
+
+ if (!msgIterator.hasNext() && vertex.isHalted())
+ return;
+
+ try {
+ vertex.compute(msgIterator);
+ vertex.finishCompute();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+
+ /**
+ * this partition should not terminate
+ */
+ if (terminate && (!vertex.isHalted() || vertex.hasMessage()))
+ terminate = false;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+ if (pushAlive)
+ FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
+ if (!terminate) {
+ writeOutTerminationState();
+ }
+ }
+
+ private void writeOutTerminationState() throws HyracksDataException {
+ try {
+ tbTerminate.getDataOutput().writeLong(0);
+ tbTerminate.addFieldEndOffset();
+ appenderTerminate.append(tbTerminate.getFieldEndOffsets(), tbTerminate.getByteArray(), 0,
+ tbTerminate.getSize());
+ FrameTupleUtils.flushTuplesFinal(appenderTerminate, writerTerminate);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void update(ITupleReference tupleRef) throws HyracksDataException {
+ try {
+ if (vertex != null && vertex.hasUpdate()) {
+ int fieldCount = tupleRef.getFieldCount();
+ for (int i = 1; i < fieldCount; i++) {
+ byte[] data = tupleRef.getFieldData(i);
+ int offset = tupleRef.getFieldStart(i);
+ bbos.setByteArray(data, offset);
+ vertex.write(output);
+ }
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
new file mode 100644
index 0000000..659100a
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.simpleagg;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.pregelix.runtime.base.IAggregateFunction;
+import edu.uci.ics.pregelix.runtime.base.IAggregateFunctionFactory;
+
+public class AccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+
+ private static final long serialVersionUID = 1L;
+ private IAggregateFunctionFactory[] aggFactories;
+
+ public AccumulatingAggregatorFactory(IAggregateFunctionFactory[] aggFactories, int[] keys, int[] fdColumns) {
+ this.aggFactories = aggFactories;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
+ RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException {
+
+ return new IAggregatorDescriptor() {
+
+ private FrameTupleReference ftr = new FrameTupleReference();
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+ ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+ IAggregateFunction[] agg = aggState.getRight();
+
+ // initialize aggregate functions
+ for (int i = 0; i < agg.length; i++) {
+ aggOutput[i].reset();
+ try {
+ agg[i].init();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ ftr.reset(accessor, tIndex);
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ agg[i].step(ftr);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+ IAggregateFunction[] agg = aggState.getRight();
+ ftr.reset(accessor, tIndex);
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ agg[i].step(ftr);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+ ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+ IAggregateFunction[] agg = aggState.getRight();
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ agg[i].finish();
+ tupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
+ aggOutput[i].getLength());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ IAggregateFunction[] agg = new IAggregateFunction[aggFactories.length];
+ ArrayBackedValueStorage[] aggOutput = new ArrayBackedValueStorage[aggFactories.length];
+ for (int i = 0; i < agg.length; i++) {
+ aggOutput[i] = new ArrayBackedValueStorage();
+ try {
+ agg[i] = aggFactories[i].createAggregateFunction(aggOutput[i]);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ return new AggregateState(Pair.of(aggOutput, agg));
+ }
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("this method should not be called");
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ };
+ }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
new file mode 100644
index 0000000..db40af8
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -0,0 +1,102 @@
+package edu.uci.ics.pregelix.runtime.simpleagg;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.VertexCombiner;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.runtime.base.IAggregateFunction;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class AggregationFunction implements IAggregateFunction {
+ private final Configuration conf;
+ private final boolean isFinalStage;
+ private final DataOutput output;
+ private VertexCombiner combiner;
+ private ByteBufferInputStream keyInputStream = new ByteBufferInputStream();
+ private ByteBufferInputStream valueInputStream = new ByteBufferInputStream();
+ private DataInput keyInput = new DataInputStream(keyInputStream);
+ private DataInput valueInput = new DataInputStream(valueInputStream);
+ private WritableComparable key;
+ private Writable value;
+ private Writable combinedResult;
+ private MsgList msgList = new MsgList();
+ private boolean keyRead = false;
+
+ public AggregationFunction(IConfigurationFactory confFactory, DataOutput output, boolean isFinalStage)
+ throws HyracksDataException {
+ this.conf = confFactory.createConfiguration();
+ this.output = output;
+ this.isFinalStage = isFinalStage;
+ msgList.setConf(this.conf);
+
+ combiner = BspUtils.createVertexCombiner(conf);
+ key = BspUtils.createVertexIndex(conf);
+ value = BspUtils.createMessageValue(conf);
+ combinedResult = BspUtils.createMessageValue(conf);
+ }
+
+ @Override
+ public void init() throws HyracksDataException {
+ msgList.clear();
+ keyRead = false;
+ combiner.init();
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws HyracksDataException {
+ FrameTupleReference ftr = (FrameTupleReference) tuple;
+ IFrameTupleAccessor fta = ftr.getFrameTupleAccessor();
+ ByteBuffer buffer = fta.getBuffer();
+ int tIndex = ftr.getTupleIndex();
+
+ int keyStart = fta.getFieldSlotsLength() + fta.getTupleStartOffset(tIndex) + fta.getFieldStartOffset(tIndex, 0);
+ int valueStart = fta.getFieldSlotsLength() + fta.getTupleStartOffset(tIndex)
+ + fta.getFieldStartOffset(tIndex, 1);
+
+ keyInputStream.setByteBuffer(buffer, keyStart);
+ valueInputStream.setByteBuffer(buffer, valueStart);
+
+ try {
+ if (!keyRead) {
+ key.readFields(keyInput);
+ keyRead = false;
+ }
+ value.readFields(valueInput);
+ combiner.step(key, value);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+
+ }
+
+ @Override
+ public void finish() throws HyracksDataException {
+ try {
+ combinedResult = combiner.finish();
+ if (!isFinalStage) {
+ combinedResult.write(output);
+ } else {
+ msgList.add(combinedResult);
+ msgList.write(output);
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
new file mode 100644
index 0000000..9feb56c
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
@@ -0,0 +1,26 @@
+package edu.uci.ics.pregelix.runtime.simpleagg;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.runtime.base.IAggregateFunction;
+import edu.uci.ics.pregelix.runtime.base.IAggregateFunctionFactory;
+
+public class AggregationFunctionFactory implements IAggregateFunctionFactory {
+ private static final long serialVersionUID = 1L;
+ private final IConfigurationFactory confFactory;
+ private final boolean isFinalStage;
+
+ public AggregationFunctionFactory(IConfigurationFactory confFactory, boolean isFinalStage) {
+ this.confFactory = confFactory;
+ this.isFinalStage = isFinalStage;
+ }
+
+ @Override
+ public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws HyracksException {
+ DataOutput output = provider.getDataOutput();
+ return new AggregationFunction(confFactory, output, isFinalStage);
+ }
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MergePartitionComputerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MergePartitionComputerFactory.java
new file mode 100644
index 0000000..5a38e9f
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MergePartitionComputerFactory.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class MergePartitionComputerFactory implements ITuplePartitionComputerFactory {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ return new ITuplePartitionComputer() {
+
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+ return 0;
+ }
+
+ };
+ }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java
new file mode 100644
index 0000000..97eac11
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/MsgListNullWriterFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class MsgListNullWriterFactory implements INullWriterFactory {
+ private static final long serialVersionUID = 1L;
+ public static INullWriterFactory INSTANCE = new MsgListNullWriterFactory();
+
+ @Override
+ public INullWriter createNullWriter() {
+ return new INullWriter() {
+
+ @Override
+ public void writeNull(DataOutput out) throws HyracksDataException {
+ try {
+ out.writeInt(0);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ };
+ }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PostSuperStepRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PostSuperStepRuntimeHookFactory.java
new file mode 100644
index 0000000..2150f2e
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PostSuperStepRuntimeHookFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHook;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+
+public class PostSuperStepRuntimeHookFactory implements IRuntimeHookFactory {
+ private static final long serialVersionUID = 1L;
+ private final String giraphJobId;
+
+ public PostSuperStepRuntimeHookFactory(String giraphJobId) {
+ this.giraphJobId = giraphJobId;
+ }
+
+ @Override
+ public IRuntimeHook createRuntimeHook() {
+ return new IRuntimeHook() {
+
+ @Override
+ public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
+ IterationUtils.endSuperStep(giraphJobId, ctx);
+ }
+
+ };
+ }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
new file mode 100644
index 0000000..5f0ed9e
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHook;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+
+public class PreSuperStepRuntimeHookFactory implements IRuntimeHookFactory {
+ private static final long serialVersionUID = 1L;
+ private final IConfigurationFactory confFactory;
+ private final String giraphJobId;
+
+ public PreSuperStepRuntimeHookFactory(String giraphJobId, IConfigurationFactory confFactory) {
+ this.confFactory = confFactory;
+ this.giraphJobId = giraphJobId;
+ }
+
+ @Override
+ public IRuntimeHook createRuntimeHook() {
+ return new IRuntimeHook() {
+
+ @Override
+ public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
+ Configuration conf = confFactory.createConfiguration();
+ IterationUtils.setProperties(giraphJobId, ctx, conf);
+ }
+
+ };
+ }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
new file mode 100644
index 0000000..f7d0018
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHook;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+
+public class RuntimeHookFactory implements IRuntimeHookFactory {
+
+ private static final long serialVersionUID = 1L;
+ private final IConfigurationFactory confFactory;
+
+ public RuntimeHookFactory(IConfigurationFactory confFactory) {
+ this.confFactory = confFactory;
+ }
+
+ @Override
+ public IRuntimeHook createRuntimeHook() {
+
+ return new IRuntimeHook() {
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
+ Configuration conf = confFactory.createConfiguration();
+ try {
+ Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
+ null);
+ Vertex.setContext(mapperContext);
+ BspUtils.setDefaultConfiguration(conf);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ };
+ }
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongAscNormalizedKeyComputerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongAscNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..9181691
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongAscNormalizedKeyComputerFactory.java
@@ -0,0 +1,56 @@
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.pregelix.api.util.SerDeUtils;
+
+public class VLongAscNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+ return new INormalizedKeyComputer() {
+ private static final int POSTIVE_LONG_MASK = (3 << 30);
+ private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
+ private static final int NEGATIVE_LONG_MASK = (0 << 30);
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ long value = SerDeUtils.readVLong(bytes, start, length);
+ int highValue = (int) (value >> 32);
+ if (highValue > 0) {
+ /**
+ * larger than Integer.MAX
+ */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= POSTIVE_LONG_MASK;
+ return highNmk;
+ } else if (highValue == 0) {
+ /**
+ * smaller than Integer.MAX but >=0
+ */
+ int lowNmk = (int) value;
+ lowNmk >>= 2;
+ lowNmk |= NON_NEGATIVE_INT_MASK;
+ return lowNmk;
+ } else {
+ /**
+ * less than 0; TODO: have not optimized for that
+ */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= NEGATIVE_LONG_MASK;
+ return highNmk;
+ }
+ }
+
+ private int getKey(int value) {
+ long unsignedFirstValue = (long) value;
+ int nmk = (int) ((unsignedFirstValue - ((long) Integer.MIN_VALUE)) & 0xffffffffL);
+ return nmk;
+ }
+
+ };
+ }
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongDescNormalizedKeyComputerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongDescNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..6b2738b
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongDescNormalizedKeyComputerFactory.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class VLongDescNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+ private static final long serialVersionUID = 1L;
+ private final INormalizedKeyComputerFactory ascNormalizedKeyComputerFactory = new VLongAscNormalizedKeyComputerFactory();
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+ return new INormalizedKeyComputer() {
+ private INormalizedKeyComputer nmkComputer = ascNormalizedKeyComputerFactory.createNormalizedKeyComputer();
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ int nk = nmkComputer.normalize(bytes, start, length);
+ return (int) ((long) Integer.MAX_VALUE - (long) (nk - Integer.MIN_VALUE));
+ }
+
+ };
+ }
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdNullWriterFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdNullWriterFactory.java
new file mode 100644
index 0000000..f0114dd
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdNullWriterFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class VertexIdNullWriterFactory implements INullWriterFactory {
+ private static final long serialVersionUID = 1L;
+ public static INullWriterFactory INSTANCE = new VertexIdNullWriterFactory();
+
+ @Override
+ public INullWriter createNullWriter() {
+ return new INullWriter() {
+
+ @Override
+ public void writeNull(DataOutput out) throws HyracksDataException {
+ try {
+ /***
+ * TODO: for now it only works for VLongWritable vertexId
+ */
+ out.write(0);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ };
+ }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
new file mode 100644
index 0000000..5eff497
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import java.io.DataInputStream;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+
+public class VertexIdPartitionComputerFactory<K extends Writable, V extends Writable> implements
+ ITuplePartitionComputerFactory {
+ private static final long serialVersionUID = 1L;
+ private final ISerializerDeserializer<K> keyIO;
+
+ public VertexIdPartitionComputerFactory(ISerializerDeserializer<K> keyIO) {
+ this.keyIO = keyIO;
+ }
+
+ public ITuplePartitionComputer createPartitioner() {
+ return new ITuplePartitionComputer() {
+ private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+ private final DataInputStream dis = new DataInputStream(bbis);
+
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+ int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, 0);
+ bbis.setByteBuffer(accessor.getBuffer(), keyStart);
+ K key = keyIO.deserialize(dis);
+ return Math.abs(key.hashCode() % nParts);
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableDescComparingBinaryComparatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableDescComparingBinaryComparatorFactory.java
new file mode 100644
index 0000000..2c7f4b2
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableDescComparingBinaryComparatorFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import org.apache.hadoop.io.RawComparator;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.util.ReflectionUtils;
+
+public class WritableDescComparingBinaryComparatorFactory<T> implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private Class<? extends RawComparator<T>> cmpClass;
+
+ public WritableDescComparingBinaryComparatorFactory(Class<? extends RawComparator<T>> cmpClass) {
+ this.cmpClass = cmpClass;
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ final RawComparator<T> instance = ReflectionUtils.createInstance(cmpClass);
+ return new IBinaryComparator() {
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return -instance.compare(b1, s1, l1, b2, s2, l2);
+ }
+ };
+ }
+}
\ No newline at end of file