add Pregelix codebase
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1960 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
new file mode 100644
index 0000000..0d9b059
--- /dev/null
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -0,0 +1,169 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>pregelix-dataflow</artifactId>
+ <packaging>jar</packaging>
+ <name>pregelix-dataflow</name>
+
+ <parent>
+ <groupId>edu.uci.ics.pregelix</groupId>
+ <artifactId>pregelix</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.7.2</version>
+ <configuration>
+ <forkMode>pertest</forkMode>
+ <argLine>-enableassertions -Xmx2047m -Dfile.encoding=UTF-8
+ -Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
+ <includes>
+ <include>**/*TestSuite.java</include>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>.</directory>
+ <includes>
+ <include>teststore*</include>
+ <include>edu*</include>
+ <include>actual*</include>
+ <include>build*</include>
+ <include>expect*</include>
+ <include>ClusterController*</include>
+ </includes>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.pregelix</groupId>
+ <artifactId>pregelix-api</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.pregelix</groupId>
+ <artifactId>pregelix-dataflow-std-base</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-hadoop</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-data-std</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-common</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-btree</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks.examples.btree</groupId>
+ <artifactId>btreehelper</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-cc</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hadoop-compat</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-ipc</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..0b06dcf
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.pregelix.dataflow;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
+ private static final long serialVersionUID = 1L;
+ private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+ private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+
+ @Override
+ public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+ int[] fanouts) {
+ if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
+ return senderSideMaterializePolicy;
+ } else {
+ return pipeliningPolicy;
+ }
+ }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/CountTupleOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/CountTupleOperatorDescriptor.java
new file mode 100644
index 0000000..7ad6aa5
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/CountTupleOperatorDescriptor.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class CountTupleOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public CountTupleOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc) {
+ super(spec, 1, 1);
+ this.recordDescriptors[0] = rDesc;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+ private final FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
+ private int tupleCount = 0;
+
+ @Override
+ public void close() throws HyracksDataException {
+ System.out.println(this.toString() + " tuple count " + tupleCount);
+ writer.close();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+ fta.reset(frame);
+ tupleCount += fta.getTupleCount();
+ writer.nextFrame(frame);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ }
+
+ };
+ }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/EmptySinkOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/EmptySinkOperatorDescriptor.java
new file mode 100644
index 0000000..fab7198
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/EmptySinkOperatorDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class EmptySinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public EmptySinkOperatorDescriptor(JobSpecification spec) {
+ super(spec, 1, 0);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+
+ @Override
+ public void open() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+
+ };
+ }
+
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/EmptyTupleSourceOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/EmptyTupleSourceOperatorDescriptor.java
new file mode 100644
index 0000000..5276c82
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/EmptyTupleSourceOperatorDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public class EmptyTupleSourceOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public EmptyTupleSourceOperatorDescriptor(JobSpecification spec) {
+ super(spec, 0, 1);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ private ByteBuffer frame = ctx.allocateFrame();
+ private ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
+ private FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ try {
+ writer.open();
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ FrameUtils.flushFrame(frame, writer);
+ writer.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
new file mode 100644
index 0000000..c25e4c6
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+
+public class HDFSFileWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private final IConfigurationFactory confFactory;
+ private final IRecordDescriptorFactory inputRdFactory;
+
+ public HDFSFileWriteOperatorDescriptor(JobSpecification spec, IConfigurationFactory confFactory,
+ IRecordDescriptorFactory inputRdFactory) {
+ super(spec, 1, 0);
+ this.confFactory = confFactory;
+ this.inputRdFactory = inputRdFactory;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+ private RecordDescriptor rd0;
+ private FrameDeserializer frameDeserializer;
+ private Configuration conf = confFactory.createConfiguration();
+ private VertexWriter vertexWriter;
+ private TaskAttemptContext context;
+ private String TEMP_DIR = "_temporary";
+
+ @Override
+ public void open() throws HyracksDataException {
+ rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+ : inputRdFactory.createRecordDescriptor();
+ frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+
+ VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
+ TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
+ context = new TaskAttemptContext(conf, tid);
+ try {
+ vertexWriter = outputFormat.createVertexWriter(context);
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+ frameDeserializer.reset(frame);
+ try {
+ while (!frameDeserializer.done()) {
+ Object[] tuple = frameDeserializer.deserializeRecord();
+ Vertex value = (Vertex) tuple[1];
+ vertexWriter.writeVertex(value);
+ }
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ vertexWriter.close(context);
+ moveFilesToFinalPath();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void moveFilesToFinalPath() throws HyracksDataException {
+ try {
+ JobContext job = new JobContext(conf, new JobID("0", 0));
+ Path outputPath = FileOutputFormat.getOutputPath(job);
+ FileSystem dfs = FileSystem.get(conf);
+ Path filePath = new Path(outputPath, "part-" + new Integer(partition).toString());
+ FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
+ @Override
+ public boolean accept(Path dir) {
+ return dir.getName().endsWith(TEMP_DIR);
+ }
+ });
+ Path tempDir = tempPaths[0].getPath();
+ FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
+ @Override
+ public boolean accept(Path dir) {
+ return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
+ }
+ });
+ Path srcDir = results[0].getPath();
+ if (!dfs.exists(srcDir))
+ throw new HyracksDataException("file " + srcDir.toString() + " does not exist!");
+
+ FileStatus[] srcFiles = dfs.listStatus(srcDir);
+ Path srcFile = srcFiles[0].getPath();
+ dfs.delete(filePath, true);
+ dfs.rename(srcFile, filePath);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ };
+ }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
new file mode 100644
index 0000000..b1bb555
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.pregelix.dataflow.state.MaterializerTaskState;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+
+public class MaterializingReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public MaterializingReadOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private ByteBuffer frame = ctx.allocateFrame();
+ private boolean complete = false;
+
+ @Override
+ public void open() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ if (!complete) {
+ MaterializerTaskState state = (MaterializerTaskState) IterationUtils.getIterationState(ctx,
+ partition);
+ RunFileReader in = state.getRunFileWriter().createReader();
+ writer.open();
+ try {
+ in.open();
+ while (in.nextFrame(frame)) {
+ frame.flip();
+ writer.nextFrame(frame);
+ frame.clear();
+ }
+ in.close();
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ /**
+ * remove last iteration's state
+ */
+ IterationUtils.removeIterationState(ctx, partition);
+ writer.close();
+ }
+ complete = true;
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
new file mode 100644
index 0000000..efe5f1b
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+import edu.uci.ics.pregelix.dataflow.state.MaterializerTaskState;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+
+public class MaterializingWriteOperatorDescriptor extends AbstractOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private final static int MATERIALIZER_ACTIVITY_ID = 0;
+
+ public MaterializingWriteOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, MATERIALIZER_ACTIVITY_ID));
+
+ builder.addActivity(this, ma);
+ builder.addSourceEdge(0, ma, 0);
+ builder.addTargetEdge(0, ma, 0);
+ }
+
+ private final class MaterializerActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public MaterializerActivityNode(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private MaterializerTaskState state;
+
+ @Override
+ public void open() throws HyracksDataException {
+ state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+ partition));
+ INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+ RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+ FileReference file = context.createManagedWorkspaceFile(MaterializingWriteOperatorDescriptor.class
+ .getSimpleName());
+ state.setRunFileWriter(new RunFileWriter(file, ctx.getIOManager()));
+ state.getRunFileWriter().open();
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ state.getRunFileWriter().nextFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ state.getRunFileWriter().close();
+ /**
+ * set iteration state
+ */
+ IterationUtils.setIterationState(ctx, partition, state);
+ writer.close();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+ };
+ }
+ }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/NonCombinerConnectorPolicyAssignmentPolicy.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/NonCombinerConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..8023fe5
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/NonCombinerConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.pregelix.dataflow;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+
+public class NonCombinerConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
+ private static final long serialVersionUID = 1L;
+ private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+ private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+
+ @Override
+ public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+ int[] fanouts) {
+ if (c instanceof MToNPartitioningConnectorDescriptor) {
+ return senderSideMaterializePolicy;
+ } else {
+ return pipeliningPolicy;
+ }
+ }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java
new file mode 100644
index 0000000..22251fd
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java
@@ -0,0 +1,64 @@
+package edu.uci.ics.pregelix.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+
+public class TerminationStateWriterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(TerminationStateWriterOperatorDescriptor.class.getName());
+
+ private final IConfigurationFactory confFactory;
+ private final String jobId;
+
+ public TerminationStateWriterOperatorDescriptor(JobSpecification spec, IConfigurationFactory confFactory,
+ String jobId) {
+ super(spec, 1, 0);
+ this.confFactory = confFactory;
+ this.jobId = jobId;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+ private Configuration conf = confFactory.createConfiguration();
+ private boolean terminate = true;
+
+ @Override
+ public void open() throws HyracksDataException {
+ terminate = true;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ terminate = false;
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ IterationUtils.writeTerminationState(conf, jobId, terminate);
+ LOGGER.info("close termination state");
+ if (terminate)
+ LOGGER.info("write termination to HDFS");
+ }
+
+ };
+ }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
new file mode 100644
index 0000000..4795718
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+
+@SuppressWarnings("rawtypes")
+public class VertexFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final Logger LOGGER = Logger.getLogger(VertexFileScanOperatorDescriptor.class.getName());
+ private static final long serialVersionUID = 1L;
+ private final List<InputSplit> splits;
+ private final IConfigurationFactory confFactory;
+ private final int fieldSize = 2;
+
+ /**
+ * @param spec
+ */
+ public VertexFileScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, List<InputSplit> splits,
+ IConfigurationFactory confFactory) throws HyracksException {
+ super(spec, 0, 1);
+ this.splits = splits;
+ this.confFactory = confFactory;
+ this.recordDescriptors[0] = rd;
+ }
+
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ private Configuration conf = confFactory.createConfiguration();
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ try {
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ writer.open();
+ loadVertices(ctx, partition);
+ writer.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ /**
+ * Load the vertices
+ *
+ * @parameter IHyracks ctx
+ *
+ * @throws IOException
+ * @throws IllegalAccessException
+ * @throws InstantiationException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @SuppressWarnings("unchecked")
+ private void loadVertices(final IHyracksTaskContext ctx, int partitionId) throws IOException,
+ ClassNotFoundException, InterruptedException, InstantiationException, IllegalAccessException {
+ ByteBuffer frame = ctx.allocateFrame();
+ FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(frame, true);
+
+ VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
+ TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
+ InputSplit split = splits.get(partition);
+
+ if (split instanceof FileSplit) {
+ FileSplit fileSplit = (FileSplit) split;
+ LOGGER.info("read file split: " + fileSplit.getPath() + " location:" + fileSplit.getLocations()[0]
+ + " start:" + fileSplit.getStart() + " length:" + split.getLength() + " partition:"
+ + partition);
+ }
+ VertexReader vertexReader = vertexInputFormat.createVertexReader(split, context);
+ vertexReader.initialize(split, context);
+ Vertex readerVertex = (Vertex) BspUtils.createVertex(conf);
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldSize);
+ DataOutput dos = tb.getDataOutput();
+
+ /**
+ * set context
+ */
+ Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
+ splits.get(partition));
+ Vertex.setContext(mapperContext);
+
+ /**
+ * empty vertex value
+ */
+ Writable emptyVertexValue = (Writable) BspUtils.createVertexValue(conf);
+
+ while (vertexReader.nextVertex()) {
+ readerVertex = vertexReader.getCurrentVertex();
+ tb.reset();
+ if (readerVertex.getVertexId() == null) {
+ throw new IllegalArgumentException("loadVertices: Vertex reader returned a vertex "
+ + "without an id! - " + readerVertex);
+ }
+ if (readerVertex.getVertexValue() == null) {
+ readerVertex.setVertexValue(emptyVertexValue);
+ }
+ WritableComparable vertexId = readerVertex.getVertexId();
+ vertexId.write(dos);
+ tb.addFieldEndOffset();
+
+ readerVertex.write(dos);
+ tb.addFieldEndOffset();
+
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ if (appender.getTupleCount() <= 0)
+ throw new IllegalStateException("zero tuples in a frame!");
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ vertexReader.close();
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame, writer);
+ }
+ System.gc();
+ }
+ };
+ }
+
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
new file mode 100644
index 0000000..b31f376
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.base;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IConfigurationFactory extends Serializable {
+
+ public Configuration createConfiguration() throws HyracksDataException;
+
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
new file mode 100644
index 0000000..f18be9e
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.context;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
+import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+import edu.uci.ics.hyracks.storage.common.smi.TransientFileMapManager;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+
+public class RuntimeContext implements IWorkspaceFileFactory {
+ private static final Logger LOGGER = Logger.getLogger(RuntimeContext.class.getName());
+
+ private IndexRegistry<IIndex> treeIndexRegistry;
+ private IBufferCache bufferCache;
+ private IFileMapManager fileMapManager;
+ private Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
+ private Map<String, Long> giraphJobIdToSuperStep = new ConcurrentHashMap<String, Long>();
+ private Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
+ private IOManager ioManager;
+ private Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
+
+ public RuntimeContext(INCApplicationContext appCtx) {
+ fileMapManager = new TransientFileMapManager();
+ ICacheMemoryAllocator allocator = new HeapBufferAllocator();
+ IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
+ int pageSize = 64 * 1024;
+ long memSize = Runtime.getRuntime().maxMemory();
+ long bufferSize = memSize / 4;
+ int numPages = (int) (bufferSize / pageSize);
+ bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs, fileMapManager, pageSize,
+ numPages, 1000000);
+ treeIndexRegistry = new IndexRegistry<IIndex>();
+ ioManager = (IOManager) appCtx.getRootContext().getIOManager();
+ }
+
+ public void close() {
+ for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
+ for (FileReference fileRef : entry.getValue())
+ fileRef.delete();
+
+ iterationToFiles.clear();
+ bufferCache.close();
+ appStateMap.clear();
+
+ System.gc();
+ }
+
+ public IBufferCache getBufferCache() {
+ return bufferCache;
+ }
+
+ public IFileMapProvider getFileMapManager() {
+ return fileMapManager;
+ }
+
+ public IndexRegistry<IIndex> getTreeIndexRegistry() {
+ return treeIndexRegistry;
+ }
+
+ public Map<StateKey, IStateObject> getAppStateStore() {
+ return appStateMap;
+ }
+
+ public static RuntimeContext get(IHyracksTaskContext ctx) {
+ return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
+ }
+
+ public synchronized void setVertexProperties(String giraphJobId, long numVertices, long numEdges) {
+ Boolean toMove = giraphJobIdToMove.get(giraphJobId);
+ if (toMove == null || toMove == true) {
+ if (giraphJobIdToSuperStep.get(giraphJobId) == null) {
+ giraphJobIdToSuperStep.put(giraphJobId, 0L);
+ }
+
+ long superStep = giraphJobIdToSuperStep.get(giraphJobId);
+ List<FileReference> files = iterationToFiles.remove(superStep - 1);
+ if (files != null) {
+ for (FileReference fileRef : files)
+ fileRef.delete();
+ }
+
+ Vertex.setSuperstep(++superStep);
+ Vertex.setNumVertices(numVertices);
+ Vertex.setNumEdges(numEdges);
+ giraphJobIdToSuperStep.put(giraphJobId, superStep);
+ giraphJobIdToMove.put(giraphJobId, false);
+ LOGGER.info("start iteration " + Vertex.getCurrentSuperstep());
+ }
+ System.gc();
+ }
+
+ public synchronized void endSuperStep(String giraphJobId) {
+ giraphJobIdToMove.put(giraphJobId, true);
+ LOGGER.info("end iteration " + Vertex.getCurrentSuperstep());
+ }
+
+ @Override
+ public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
+ final FileReference fRef = ioManager.createWorkspaceFile(prefix);
+ List<FileReference> files = iterationToFiles.get(Vertex.getCurrentSuperstep());
+ if (files == null) {
+ files = new ArrayList<FileReference>();
+ iterationToFiles.put(Vertex.getCurrentSuperstep(), files);
+ }
+ files.add(fRef);
+ return fRef;
+ }
+
+ @Override
+ public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
+ return ioManager.createWorkspaceFile(prefix);
+ }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/StateKey.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/StateKey.java
new file mode 100644
index 0000000..ae58802
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/StateKey.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.context;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class StateKey {
+ private final JobId jobId;
+ private final int partition;
+
+ public StateKey(JobId jobId, int partition) {
+ this.jobId = jobId;
+ this.partition = partition;
+ }
+
+ @Override
+ public int hashCode() {
+ return jobId.hashCode() * partition;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof StateKey))
+ return false;
+ StateKey key = (StateKey) o;
+ return key.jobId.equals(jobId) && key.partition == partition;
+ }
+
+ @Override
+ public String toString() {
+ return jobId.toString() + ":" + partition;
+ }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/state/MaterializerTaskState.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/state/MaterializerTaskState.java
new file mode 100644
index 0000000..058a1bc
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/state/MaterializerTaskState.java
@@ -0,0 +1,39 @@
+package edu.uci.ics.pregelix.dataflow.state;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+
+public class MaterializerTaskState extends AbstractStateObject {
+ private RunFileWriter out;
+
+ public MaterializerTaskState() {
+ }
+
+ public MaterializerTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public RunFileWriter getRunFileWriter() {
+ return out;
+ }
+
+ public void setRunFileWriter(RunFileWriter out) {
+ this.out = out;
+ }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
new file mode 100644
index 0000000..f57b3fa
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.util;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+import edu.uci.ics.pregelix.dataflow.context.StateKey;
+
+public class IterationUtils {
+ public static final String TMP_DIR = "/tmp/";
+
+ public static void setIterationState(IHyracksTaskContext ctx, int partition, IStateObject state) {
+ INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+ RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+ Map<StateKey, IStateObject> map = context.getAppStateStore();
+ map.put(new StateKey(ctx.getJobletContext().getJobId(), partition), state);
+ }
+
+ public static IStateObject getIterationState(IHyracksTaskContext ctx, int partition) {
+ JobId currentId = ctx.getJobletContext().getJobId();
+ JobId lastId = new JobId(currentId.getId() - 1);
+ INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+ RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+ Map<StateKey, IStateObject> map = context.getAppStateStore();
+ IStateObject state = map.get(new StateKey(lastId, partition));
+ return state;
+ }
+
+ public static void removeIterationState(IHyracksTaskContext ctx, int partition) {
+ JobId currentId = ctx.getJobletContext().getJobId();
+ JobId lastId = new JobId(currentId.getId() - 1);
+ INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+ RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+ Map<StateKey, IStateObject> map = context.getAppStateStore();
+ map.remove(new StateKey(lastId, partition));
+ }
+
+ public static void endSuperStep(String giraphJobId, IHyracksTaskContext ctx) {
+ INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+ RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+ context.endSuperStep(giraphJobId);
+ }
+
+ public static void setProperties(String giraphJobId, IHyracksTaskContext ctx, Configuration conf) {
+ INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+ RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+ context.setVertexProperties(giraphJobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
+ conf.getLong(PregelixJob.NUM_EDGES, -1));
+ }
+
+ public static void writeTerminationState(Configuration conf, String jobId, boolean terminate)
+ throws HyracksDataException {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = IterationUtils.TMP_DIR + jobId;
+ Path path = new Path(pathStr);
+ FSDataOutputStream output = dfs.create(path, true);
+ output.writeBoolean(terminate);
+ output.flush();
+ output.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public static boolean readTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = IterationUtils.TMP_DIR + jobId;
+ Path path = new Path(pathStr);
+ FSDataInputStream input = dfs.open(path);
+ boolean terminate = input.readBoolean();
+ input.close();
+ return terminate;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+}