merge from zheilbron/hyracks_msr
diff --git a/pregelix/build.sh b/pregelix/build.sh
new file mode 100644
index 0000000..e213181
--- /dev/null
+++ b/pregelix/build.sh
@@ -0,0 +1,12 @@
+rm -rf dist
+mkdir dist
+
+hadoop_versions=(0.20.2 0.23.1 0.23.6 1.0.4 cdh-4.1 cdh-4.2)
+cd ../
+for v in ${hadoop_versions[@]}
+do
+ #echo mvn clean package -DskipTests=true -Dhadoop=${v}
+ mvn clean package -DskipTests=true -Dhadoop=${v}
+ #echo mv pregelix/pregelix-dist/target/pregelix-dist-*-binary-assembly.zip pregelix/dist/pregelix-dist-binary-assembley-hdfs-${v}.zip
+ mv pregelix/pregelix-dist/target/pregelix-dist-*-binary-assembly.zip pregelix/dist/pregelix-dist-binary-assembley-hdfs-${v}.zip
+done
diff --git a/pregelix/pom.xml b/pregelix/pom.xml
index c0c3822..de5ef3b 100644
--- a/pregelix/pom.xml
+++ b/pregelix/pom.xml
@@ -17,7 +17,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<packaging>pom</packaging>
<name>pregelix</name>
diff --git a/pregelix/pregelix-api/pom.xml b/pregelix/pregelix-api/pom.xml
index 305b50c..0152a15 100644
--- a/pregelix/pregelix-api/pom.xml
+++ b/pregelix/pregelix-api/pom.xml
@@ -21,7 +21,7 @@
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
</parent>
<properties>
@@ -82,7 +82,7 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
@@ -95,7 +95,7 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-hdfs-core</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
index 08c7151..5ea6413 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
/**
* This is the abstract class to implement for aggregating the state of all the vertices globally in the graph.
@@ -39,7 +40,7 @@
*/
@SuppressWarnings("rawtypes")
-public abstract class GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> {
+public abstract class GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable, F extends Writable> {
/**
* initialize aggregator
*/
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
index f5daf99..fa03c0c 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
@@ -19,6 +19,7 @@
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
/**
* This is the abstract class to implement for combining of messages that are sent to the same vertex.
@@ -36,7 +37,7 @@
* the type of the partially combined messages
*/
@SuppressWarnings("rawtypes")
-public abstract class MessageCombiner<I extends WritableComparable, M extends Writable, P extends Writable> {
+public abstract class MessageCombiner<I extends WritableComparable, M extends WritableSizable, P extends Writable> {
/**
* initialize combiner
@@ -82,4 +83,36 @@
* @return the final message List
*/
public abstract MsgList<M> finishFinal();
+
+ /**
+ * init the combiner for all segmented bags for one key
+ *
+ * @return the final message List
+ */
+ public void initAll(MsgList providedMsgList) {
+ init(providedMsgList);
+ }
+
+ /**
+ * finish final combiner for all segmented bags for one key
+ *
+ * @return the final message List
+ */
+ public MsgList<M> finishFinalAll() {
+ return finishFinal();
+ }
+
+ /**
+ * @return the accumulated byte size
+ */
+ public int estimateAccumulatedStateByteSizePartial(I vertexIndex, M msg) throws HyracksDataException {
+ return 0;
+ }
+
+ /**
+ * @return the accumulated byte size
+ */
+ public int estimateAccumulatedStateByteSizeFinal(I vertexIndex, P partialAggregate) throws HyracksDataException {
+ return 0;
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
index 104f396..51b62e4 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
@@ -15,8 +15,11 @@
package edu.uci.ics.pregelix.api.graph;
-import org.apache.hadoop.io.Writable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.api.util.ArrayListWritable;
import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -27,9 +30,11 @@
* @param <M>
* message type
*/
-public class MsgList<M extends Writable> extends ArrayListWritable<M> {
+public class MsgList<M extends WritableSizable> extends ArrayListWritable<M> {
/** Defining a layout version for a serializable class. */
private static final long serialVersionUID = 1L;
+ private byte start = 1;
+ private byte end = 2;
/**
* Default constructor.s
@@ -43,4 +48,34 @@
public void setClass() {
setClass((Class<M>) BspUtils.getMessageValueClass(getConf()));
}
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeByte(start | end);
+ super.write(output);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ byte startEnd = input.readByte();
+ this.start = (byte) (startEnd & 1);
+ this.end = (byte) (startEnd & 2);
+ super.readFields(input);
+ }
+
+ public final void setSegmentStart(boolean segStart) {
+ this.start = (byte) (segStart ? 1 : 0);
+ }
+
+ public final void setSegmentEnd(boolean segEnd) {
+ this.end = (byte) (segEnd ? 2 : 0);
+ }
+
+ public boolean segmentStart() {
+ return start == 1 ? true : false;
+ }
+
+ public boolean segmentEnd() {
+ return end == 2 ? true : false;
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 4175078..c52130d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -31,7 +32,9 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.JobStateUtils;
import edu.uci.ics.pregelix.api.util.SerDeUtils;
/**
@@ -48,7 +51,7 @@
* Message value type
*/
@SuppressWarnings("rawtypes")
-public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
implements Writable {
private static long superstep = 0;
/** Class-wide number of vertices */
@@ -75,6 +78,8 @@
private boolean hasMessage = false;
/** created new vertex */
private boolean createdNewLiveVertex = false;
+ /** terminate the partition */
+ private boolean terminatePartition = false;
/**
* use object pool for re-using objects
@@ -87,12 +92,23 @@
private int usedValue = 0;
/**
- * The key method that users need to implement
+ * The key method that users need to implement to process
+ * incoming messages in each superstep.
+ * 1. In a superstep, this method can be called multiple times in a continuous manner for a single
+ * vertex, each of which is to process a batch of messages. (Note that
+ * this only happens for the case when the mssages for a single vertex
+ * exceed one frame.)
+ * 2. In each superstep, before any invocation of this method for a vertex,
+ * open() is called; after all the invocations of this method for the vertex,
+ * close is called.
+ * 3. In each partition, the vertex Java object is reused
+ * for all the vertice to be processed in the same partition. (The model
+ * is the same as the key-value objects in hadoop map tasks.)
*
* @param msgIterator
* an iterator of incoming messages
*/
- public abstract void compute(Iterator<M> msgIterator);
+ public abstract void compute(Iterator<M> msgIterator) throws Exception;
/**
* Add an edge for the vertex.
@@ -569,4 +585,62 @@
Vertex.context = context;
}
+ @Override
+ public int hashCode() {
+ return vertexId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ Vertex vertex = (Vertex) object;
+ return vertexId.equals(vertex.getVertexId());
+ }
+
+ /**
+ * called immediately before invocations of compute() on a vertex
+ * Users can override this method to initiate the state for a vertex
+ * before the compute() invocations
+ */
+ public void open() {
+
+ }
+
+ /**
+ * called immediately after all the invocations of compute() on a vertex
+ * Users can override this method to initiate the state for a vertex
+ * before the compute() invocations
+ */
+ public void close() {
+
+ }
+
+ /**
+ * Terminate the current partition where the current vertex stays in.
+ * This will immediately take effect and the upcoming vertice in the
+ * same partition cannot be processed.
+ *
+ */
+ protected final void terminatePartition() {
+ voteToHalt();
+ terminatePartition = true;
+ }
+
+ /**
+ * Terminate the Pregelix job.
+ * This will take effect only when the current iteration completed.
+ *
+ * @throws Exception
+ */
+ protected void terminateJob() throws Exception {
+ Configuration conf = getContext().getConfiguration();
+ JobStateUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
+ }
+
+ /***
+ * @return true if the partition is terminated; false otherwise
+ */
+ public boolean isPartitionTerminated() {
+ return terminatePartition;
+ }
+
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/Sizable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/Sizable.java
new file mode 100644
index 0000000..568500b
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/Sizable.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io;
+
+/**
+ * @author yingyib
+ */
+public interface Sizable {
+
+ public int sizeInBytes();
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
index c841b1a..73af190 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
@@ -40,7 +40,7 @@
* Message data
*/
@SuppressWarnings("rawtypes")
-public abstract class VertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+public abstract class VertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> {
/**
* Logically split the vertices for a graph processing application.
* Each {@link InputSplit} is then assigned to a worker for processing.
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
index e6c62ba..ba8b561 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
@@ -39,7 +39,7 @@
* Message data
*/
@SuppressWarnings("rawtypes")
-public interface VertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+public interface VertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> {
/**
* Use the input split and context t o setup reading the vertices.
* Guaranteed to be called prior to any other function.
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/WritableSizable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/WritableSizable.java
new file mode 100644
index 0000000..ee13f76
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/WritableSizable.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * @author yingyib
+ */
+public interface WritableSizable extends Writable, Sizable {
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
index 985bcff..1d3c427 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
@@ -26,13 +26,14 @@
import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
/**
* This VertexInputFormat is meant for testing/debugging. It simply generates
* some vertex data that can be consumed by test applications.
*/
@SuppressWarnings("rawtypes")
-public abstract class GeneratedVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class GeneratedVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
extends VertexInputFormat<I, V, E, M> {
@Override
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
index 92c8728..376d45d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
@@ -25,6 +25,7 @@
import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
/**
* Used by GeneratedVertexInputFormat to read some generated data
@@ -37,7 +38,7 @@
* Edge value
*/
@SuppressWarnings("rawtypes")
-public abstract class GeneratedVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class GeneratedVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
implements VertexReader<I, V, E, M> {
/** Records read so far */
protected long recordsRead = 0;
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
index 2254ae4..0faf516 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
@@ -30,6 +30,7 @@
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
/**
* Abstract class that users should subclass to use their own text based vertex
@@ -45,7 +46,7 @@
* Message value
*/
@SuppressWarnings("rawtypes")
-public abstract class TextVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class TextVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
extends VertexInputFormat<I, V, E, M> {
/** Uses the TextInputFormat to do everything */
protected TextInputFormat textInputFormat = new TextInputFormat();
@@ -62,7 +63,7 @@
* @param <E>
* Edge value
*/
- public static abstract class TextVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+ public static abstract class TextVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
implements VertexReader<I, V, E, M> {
/** Internal line record reader */
private final RecordReader<LongWritable, Text> lineRecordReader;
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 4cddaf0..dae7818 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -198,7 +198,7 @@
*
* @param updateHeavyFlag
*/
- final public void setMutationOrVariableSizedUpdateHeavy(boolean variableSizedUpdateHeavyFlag) {
+ final public void setLSMStorage(boolean variableSizedUpdateHeavyFlag) {
getConfiguration().setBoolean(UPDATE_INTENSIVE, variableSizedUpdateHeavyFlag);
}
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
index 7a9e5d5..1683541 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
@@ -141,9 +141,6 @@
used = 0;
this.clear();
int numValues = in.readInt(); // read number of values
- if (numValues > 100) {
- System.out.println("num values: " + numValues);
- }
for (int i = 0; i < numValues; i++) {
M value = allocateValue();
value.readFields(in); // read a value
@@ -153,9 +150,6 @@
public void write(DataOutput out) throws IOException {
int numValues = size();
- if (numValues > 100) {
- System.out.println("write num values: " + numValues);
- }
out.writeInt(numValues); // write number of values
for (int i = 0; i < numValues; i++) {
get(i).write(out);
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 03c37dc..ff4ee91 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -28,6 +28,7 @@
import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.api.job.PregelixJob;
/**
@@ -49,7 +50,7 @@
* @return User's vertex input format class
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends VertexInputFormat<I, V, E, M>> getVertexInputFormatClass(
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> Class<? extends VertexInputFormat<I, V, E, M>> getVertexInputFormatClass(
Configuration conf) {
return (Class<? extends VertexInputFormat<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_INPUT_FORMAT_CLASS,
null, VertexInputFormat.class);
@@ -63,7 +64,7 @@
* @return Instantiated user vertex input format class
*/
@SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> VertexInputFormat<I, V, E, M> createVertexInputFormat(
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> VertexInputFormat<I, V, E, M> createVertexInputFormat(
Configuration conf) {
Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass = getVertexInputFormatClass(conf);
VertexInputFormat<I, V, E, M> inputFormat = ReflectionUtils.newInstance(vertexInputFormatClass, conf);
@@ -106,7 +107,7 @@
* @return User's vertex combiner class
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable, M extends Writable, P extends Writable> Class<? extends MessageCombiner<I, M, P>> getMessageCombinerClass(
+ public static <I extends WritableComparable, M extends WritableSizable, P extends Writable> Class<? extends MessageCombiner<I, M, P>> getMessageCombinerClass(
Configuration conf) {
return (Class<? extends MessageCombiner<I, M, P>>) conf.getClass(PregelixJob.Message_COMBINER_CLASS,
DefaultMessageCombiner.class, MessageCombiner.class);
@@ -120,7 +121,7 @@
* @return User's vertex combiner class
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> Class<? extends GlobalAggregator<I, V, E, M, P, F>> getGlobalAggregatorClass(
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable, F extends Writable> Class<? extends GlobalAggregator<I, V, E, M, P, F>> getGlobalAggregatorClass(
Configuration conf) {
return (Class<? extends GlobalAggregator<I, V, E, M, P, F>>) conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS,
GlobalCountAggregator.class, GlobalAggregator.class);
@@ -138,7 +139,7 @@
* @return Instantiated user vertex combiner class
*/
@SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, M extends Writable, P extends Writable> MessageCombiner<I, M, P> createMessageCombiner(
+ public static <I extends WritableComparable, M extends WritableSizable, P extends Writable> MessageCombiner<I, M, P> createMessageCombiner(
Configuration conf) {
Class<? extends MessageCombiner<I, M, P>> vertexCombinerClass = getMessageCombinerClass(conf);
return ReflectionUtils.newInstance(vertexCombinerClass, conf);
@@ -164,7 +165,7 @@
* @return Instantiated user vertex combiner class
*/
@SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> GlobalAggregator<I, V, E, M, P, F> createGlobalAggregator(
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable, F extends Writable> GlobalAggregator<I, V, E, M, P, F> createGlobalAggregator(
Configuration conf) {
Class<? extends GlobalAggregator<I, V, E, M, P, F>> globalAggregatorClass = getGlobalAggregatorClass(conf);
return ReflectionUtils.newInstance(globalAggregatorClass, conf);
@@ -178,7 +179,7 @@
* @return User's vertex class
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends Vertex<I, V, E, M>> getVertexClass(
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> Class<? extends Vertex<I, V, E, M>> getVertexClass(
Configuration conf) {
return (Class<? extends Vertex<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_CLASS, null, Vertex.class);
}
@@ -191,7 +192,7 @@
* @return Instantiated user vertex
*/
@SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Vertex<I, V, E, M> createVertex(
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> Vertex<I, V, E, M> createVertex(
Configuration conf) {
Class<? extends Vertex<I, V, E, M>> vertexClass = getVertexClass(conf);
Vertex<I, V, E, M> vertex = ReflectionUtils.newInstance(vertexClass, conf);
@@ -299,7 +300,7 @@
* @return User's vertex message value class
*/
@SuppressWarnings("unchecked")
- public static <M extends Writable> Class<M> getMessageValueClass(Configuration conf) {
+ public static <M extends WritableSizable> Class<M> getMessageValueClass(Configuration conf) {
if (conf == null)
conf = defaultConf;
return (Class<M>) conf.getClass(PregelixJob.MESSAGE_VALUE_CLASS, Writable.class);
@@ -369,7 +370,7 @@
* Configuration to check
* @return Instantiated user vertex message value
*/
- public static <M extends Writable> M createMessageValue(Configuration conf) {
+ public static <M extends WritableSizable> M createMessageValue(Configuration conf) {
Class<M> messageValueClass = getMessageValueClass(conf);
try {
return messageValueClass.newInstance();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
index d2d90a2..feb9e2f 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
@@ -14,42 +14,82 @@
*/
package edu.uci.ics.pregelix.api.util;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
@SuppressWarnings({ "rawtypes", "unchecked" })
-public class DefaultMessageCombiner<I extends WritableComparable, M extends Writable> extends
+public class DefaultMessageCombiner<I extends WritableComparable, M extends WritableSizable> extends
MessageCombiner<I, M, MsgList> {
private MsgList<M> msgList;
+ private int metaSlot = 8;
+ private int accumulatedSize = metaSlot;
@Override
public void init(MsgList providedMsgList) {
+ realInit(providedMsgList);
+ this.msgList.setSegmentStart(false);
+ }
+
+ private void realInit(MsgList providedMsgList) {
this.msgList = providedMsgList;
this.msgList.clearElements();
+ this.accumulatedSize = metaSlot;
}
@Override
public void stepPartial(I vertexIndex, M msg) throws HyracksDataException {
msgList.addElement(msg);
+ accumulatedSize += msg.sizeInBytes();
}
@Override
public void stepFinal(I vertexIndex, MsgList partialAggregate) throws HyracksDataException {
msgList.addAllElements(partialAggregate);
+ for (int i = 0; i < partialAggregate.size(); i++) {
+ accumulatedSize += ((M) partialAggregate.get(i)).sizeInBytes();
+ }
}
@Override
public MsgList finishPartial() {
+ msgList.setSegmentEnd(false);
return msgList;
}
@Override
public MsgList<M> finishFinal() {
+ msgList.setSegmentEnd(false);
return msgList;
}
+ @Override
+ public void initAll(MsgList providedMsgList) {
+ realInit(providedMsgList);
+ msgList.setSegmentStart(true);
+ }
+
+ @Override
+ public MsgList<M> finishFinalAll() {
+ msgList.setSegmentEnd(true);
+ return msgList;
+ }
+
+ @Override
+ public int estimateAccumulatedStateByteSizePartial(I vertexIndex, M msg) throws HyracksDataException {
+ return accumulatedSize + msg.sizeInBytes();
+ }
+
+ @Override
+ public int estimateAccumulatedStateByteSizeFinal(I vertexIndex, MsgList partialAggregate)
+ throws HyracksDataException {
+ int size = accumulatedSize;
+ for (int i = 0; i < partialAggregate.size(); i++) {
+ size += ((M) partialAggregate.get(i)).sizeInBytes();
+ }
+ return size;
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
index ffc6526..9a95f09 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
@@ -21,9 +21,10 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
@SuppressWarnings("rawtypes")
-public class GlobalCountAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public class GlobalCountAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
extends GlobalAggregator<I, V, E, M, LongWritable, LongWritable> {
private LongWritable state = new LongWritable(0);
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/JobStateUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/JobStateUtils.java
new file mode 100644
index 0000000..4a98167
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/JobStateUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * @author yingyib
+ */
+public class JobStateUtils {
+
+ public static final String TMP_DIR = "/tmp/";
+
+ public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = TMP_DIR + jobId + "fterm";
+ Path path = new Path(pathStr);
+ if (!dfs.exists(path)) {
+ FSDataOutputStream output = dfs.create(path, true);
+ output.writeBoolean(true);
+ output.flush();
+ output.close();
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public static boolean readForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = TMP_DIR + jobId + "fterm";
+ Path path = new Path(pathStr);
+ if (dfs.exists(path)) {
+ return true;
+ } else {
+ return false;
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-core/pom.xml b/pregelix/pregelix-core/pom.xml
index 2a3efcf..a3f24f4 100644
--- a/pregelix/pregelix-core/pom.xml
+++ b/pregelix/pregelix-core/pom.xml
@@ -21,7 +21,7 @@
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
</parent>
@@ -209,84 +209,84 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-api</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-dataflow-std</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-dataflow</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-std</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-runtime</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-data-std</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-btree</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-cc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-nc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
@@ -300,7 +300,7 @@
<dependency>
<groupId>edu.uci.ics.hyracks.examples</groupId>
<artifactId>hyracks-integration-tests</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
@@ -320,7 +320,7 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-ipc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index db6c2c8..c144ddd 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -32,8 +32,6 @@
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -54,6 +52,8 @@
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.group.ClusteredGroupOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
@@ -178,18 +178,18 @@
/**
* construct local pre-clustered group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
- false);
- PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+ false, false);
+ ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localGby);
/**
* construct global group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
- true, true);
- PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+ conf, true, true);
+ ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -383,18 +383,18 @@
/**
* construct local pre-clustered group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
- false);
- PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+ false, false);
+ ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localGby);
/**
* construct global group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
- true, true);
- PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+ conf, true, true);
+ ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 3af8921..c29ea18 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -32,8 +32,6 @@
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -54,6 +52,8 @@
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.group.ClusteredGroupOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
@@ -144,9 +144,9 @@
/**
* construct local pre-clustered group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
- false);
- PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+ false, false);
+ ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localGby);
@@ -155,9 +155,9 @@
*/
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
- true, true);
- PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+ conf, true, true);
+ ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -198,8 +198,8 @@
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
- NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
@@ -208,8 +208,8 @@
int[] fieldPermutationDelete = new int[] { 0 };
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
- null, NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+ getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -323,9 +323,10 @@
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
- JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true,
- nullWriterFactories, inputRdFactory, 5, new ComputeUpdateFunctionFactory(confFactory), preHookFactory,
- null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
+ JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
+ getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
+ new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
+ rdPartialAggregate, rdInsert, rdDelete);
ClusterConfig.setLocationConstraint(spec, join);
/**
@@ -342,18 +343,18 @@
/**
* construct local pre-clustered group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
- false);
- PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+ false, false);
+ ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localGby);
/**
* construct global group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
- true, true);
- PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+ conf, true, true);
+ ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -394,8 +395,8 @@
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
- NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
@@ -404,8 +405,8 @@
int[] fieldPermutationDelete = new int[] { 0 };
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
- null, NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+ getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 50949aa..dc61971 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -31,8 +31,6 @@
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -53,6 +51,8 @@
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.group.ClusteredGroupOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
@@ -148,9 +148,9 @@
*/
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
- true, false);
- PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+ conf, true, false);
+ ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -190,8 +190,8 @@
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
- NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
@@ -200,8 +200,8 @@
int[] fieldPermutationDelete = new int[] { 0 };
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
- null, NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+ getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -312,9 +312,10 @@
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
- JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true,
- nullWriterFactories, inputRdFactory, 5, new ComputeUpdateFunctionFactory(confFactory), preHookFactory,
- null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
+ JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
+ getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
+ new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
+ rdPartialAggregate, rdInsert, rdDelete);
ClusterConfig.setLocationConstraint(spec, join);
/**
@@ -331,9 +332,9 @@
/**
* construct global group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
- true, false);
- PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+ conf, true, false);
+ ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -371,8 +372,8 @@
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
- NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
@@ -381,8 +382,8 @@
int[] fieldPermutationDelete = new int[] { 0 };
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
- null, NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+ getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 362e413..34f723f 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -31,8 +31,6 @@
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -53,6 +51,8 @@
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.group.ClusteredGroupOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
@@ -143,9 +143,9 @@
/**
* construct local pre-clustered group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
- false);
- PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+ false, false);
+ ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localGby);
@@ -161,9 +161,9 @@
*/
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
- true, true);
- PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+ conf, true, true);
+ ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -204,8 +204,8 @@
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
- NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
@@ -214,8 +214,8 @@
int[] fieldPermutationDelete = new int[] { 0 };
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
- null, NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+ getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -326,9 +326,10 @@
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
- JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true,
- nullWriterFactories, inputRdFactory, 5, new ComputeUpdateFunctionFactory(confFactory), preHookFactory,
- null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete);
+ JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
+ getIndexDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
+ new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
+ rdPartialAggregate, rdInsert, rdDelete);
ClusterConfig.setLocationConstraint(spec, join);
/**
@@ -345,9 +346,9 @@
/**
* construct local pre-clustered group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
- false);
- PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ IClusteredAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+ false, false);
+ ClusteredGroupOperatorDescriptor localGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localGby);
@@ -361,9 +362,9 @@
/**
* construct global group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
- true, true);
- PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ IClusteredAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(
+ conf, true, true);
+ ClusteredGroupOperatorDescriptor globalGby = new ClusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -404,8 +405,8 @@
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(), null,
- NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
@@ -414,8 +415,8 @@
int[] fieldPermutationDelete = new int[] { 0 };
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE, getIndexDataflowHelperFactory(),
- null, NoOpOperationCallbackFactory.INSTANCE);
+ comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
+ getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
index 0876893..3e01109 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
@@ -21,9 +21,9 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunctionFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.runtime.simpleagg.AccumulatingAggregatorFactory;
@@ -75,11 +75,11 @@
return rdFactory;
}
- public static IAggregatorDescriptorFactory getAccumulatingAggregatorFactory(Configuration conf, boolean isFinal,
- boolean partialAggAsInput) {
+ public static IClusteredAggregatorDescriptorFactory getAccumulatingAggregatorFactory(Configuration conf,
+ boolean isFinal, boolean partialAggAsInput) {
IAggregateFunctionFactory aggFuncFactory = new AggregationFunctionFactory(new ConfigurationFactory(conf),
isFinal, partialAggAsInput);
- IAggregatorDescriptorFactory aggregatorFactory = new AccumulatingAggregatorFactory(
+ IClusteredAggregatorDescriptorFactory aggregatorFactory = new AccumulatingAggregatorFactory(
new IAggregateFunctionFactory[] { aggFuncFactory });
return aggregatorFactory;
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index 73b053f..e8a6b5c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -14,8 +14,11 @@
*/
package edu.uci.ics.pregelix.core.util;
+import java.io.File;
import java.util.EnumSet;
+import org.apache.commons.io.FileUtils;
+
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -46,6 +49,10 @@
private static IHyracksClientConnection hcc;
public static void init() throws Exception {
+ FileUtils.forceMkdir(new File("dev1"));
+ FileUtils.forceMkdir(new File("dev2"));
+ FileUtils.forceMkdir(new File("dev3"));
+ FileUtils.forceMkdir(new File("dev4"));
CCConfig ccConfig = new CCConfig();
ccConfig.clientNetIpAddress = CC_HOST;
ccConfig.clusterNetIpAddress = CC_HOST;
@@ -80,7 +87,7 @@
ncConfig2.datasetIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
ncConfig2.appNCMainClass = NCApplicationEntryPoint.class.getName();
- ncConfig2.ioDevices="dev1,dev2";
+ ncConfig2.ioDevices="dev3,dev4";
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
diff --git a/pregelix/pregelix-dataflow-std-base/pom.xml b/pregelix/pregelix-dataflow-std-base/pom.xml
index 35a6c91..d4c0ee6 100644
--- a/pregelix/pregelix-dataflow-std-base/pom.xml
+++ b/pregelix/pregelix-dataflow-std-base/pom.xml
@@ -21,7 +21,7 @@
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
</parent>
@@ -87,15 +87,15 @@
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>pregelix-api</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
index 97db63f..c544b31 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
@@ -24,4 +24,10 @@
public void step(IFrameTupleReference tuple) throws HyracksDataException;
public void finish() throws HyracksDataException;
+
+ public void initAll() throws HyracksDataException;
+
+ public void finishAll() throws HyracksDataException;
+
+ public int estimateStep(IFrameTupleReference tuple) throws HyracksDataException;
}
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
index 58795d1..d5364da 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
@@ -16,11 +16,12 @@
import java.io.Serializable;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
public interface IAggregateFunctionFactory extends Serializable {
- public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx,
- IDataOutputProvider provider) throws HyracksException;
+ public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IDataOutputProvider provider,
+ IFrameWriter writer) throws HyracksException;
}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/pom.xml b/pregelix/pregelix-dataflow-std/pom.xml
index 3604e57..9ec8e1d 100644
--- a/pregelix/pregelix-dataflow-std/pom.xml
+++ b/pregelix/pregelix-dataflow-std/pom.xml
@@ -21,7 +21,7 @@
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
</parent>
@@ -88,84 +88,84 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-dataflow-std-base</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-std</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-data-std</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-hdfs-core</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-btree</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-lsm-btree</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-cc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-nc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-ipc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorDescriptor.java
new file mode 100644
index 0000000..bb41953
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorDescriptor.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.group;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class ClusteredGroupOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private final int[] groupFields;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final IClusteredAggregatorDescriptorFactory aggregatorFactory;
+
+ private static final long serialVersionUID = 1L;
+
+ public ClusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields,
+ IBinaryComparatorFactory[] comparatorFactories, IClusteredAggregatorDescriptorFactory aggregatorFactory,
+ RecordDescriptor recordDescriptor) {
+ super(spec, 1, 1);
+ this.groupFields = groupFields;
+ this.comparatorFactories = comparatorFactories;
+ this.aggregatorFactory = aggregatorFactory;
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ return new ClusteredGroupOperatorNodePushable(ctx, groupFields, comparatorFactories, aggregatorFactory,
+ recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), recordDescriptors[0]);
+ }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorNodePushable.java
new file mode 100644
index 0000000..a95a46e
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupOperatorNodePushable.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+class ClusteredGroupOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ private final IHyracksTaskContext ctx;
+ private final int[] groupFields;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final IClusteredAggregatorDescriptorFactory aggregatorFactory;
+ private final RecordDescriptor inRecordDescriptor;
+ private final RecordDescriptor outRecordDescriptor;
+ private ClusteredGroupWriter pgw;
+
+ ClusteredGroupOperatorNodePushable(IHyracksTaskContext ctx, int[] groupFields,
+ IBinaryComparatorFactory[] comparatorFactories, IClusteredAggregatorDescriptorFactory aggregatorFactory,
+ RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor) {
+ this.ctx = ctx;
+ this.groupFields = groupFields;
+ this.comparatorFactories = comparatorFactories;
+ this.aggregatorFactory = aggregatorFactory;
+ this.inRecordDescriptor = inRecordDescriptor;
+ this.outRecordDescriptor = outRecordDescriptor;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ final ByteBuffer copyFrame = ctx.allocateFrame();
+ final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
+ copyFrameAccessor.reset(copyFrame);
+ ByteBuffer outFrame = ctx.allocateFrame();
+ final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(outFrame, true);
+ pgw = new ClusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDescriptor,
+ outRecordDescriptor, writer);
+ pgw.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ pgw.nextFrame(buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ pgw.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ pgw.close();
+ }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupWriter.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupWriter.java
new file mode 100644
index 0000000..4b4a1c3
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/ClusteredGroupWriter.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+
+public class ClusteredGroupWriter implements IFrameWriter {
+ private final int[] groupFields;
+ private final IBinaryComparator[] comparators;
+ private final IAggregatorDescriptor aggregator;
+ private final AggregateState aggregateState;
+ private final IFrameWriter writer;
+ private final ByteBuffer copyFrame;
+ private final FrameTupleAccessor inFrameAccessor;
+ private final FrameTupleAccessor copyFrameAccessor;
+
+ private final ByteBuffer outFrame;
+ private final FrameTupleAppender appender;
+ private final ArrayTupleBuilder tupleBuilder;
+
+ private boolean first;
+
+ public ClusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
+ IClusteredAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
+ RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException {
+ this.groupFields = groupFields;
+ this.comparators = comparators;
+ this.writer = writer;
+ copyFrame = ctx.allocateFrame();
+ inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+ copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+ copyFrameAccessor.reset(copyFrame);
+
+ outFrame = ctx.allocateFrame();
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(outFrame, true);
+
+ tupleBuilder = new ArrayTupleBuilder(outRecordDesc.getFields().length);
+ this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields, writer, outFrame, appender);
+ this.aggregateState = aggregator.createAggregateStates();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ first = true;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ inFrameAccessor.reset(buffer);
+ int nTuples = inFrameAccessor.getTupleCount();
+ for (int i = 0; i < nTuples; ++i) {
+ if (first) {
+
+ tupleBuilder.reset();
+ for (int j = 0; j < groupFields.length; j++) {
+ tupleBuilder.addField(inFrameAccessor, i, groupFields[j]);
+ }
+ aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
+
+ first = false;
+
+ } else {
+ if (i == 0) {
+ switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
+ } else {
+ switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+ }
+
+ }
+ }
+ FrameUtils.copy(buffer, copyFrame);
+ }
+
+ private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
+ FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
+ if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+ writeOutput(prevTupleAccessor, prevTupleIndex);
+
+ tupleBuilder.reset();
+ for (int j = 0; j < groupFields.length; j++) {
+ tupleBuilder.addField(currTupleAccessor, currTupleIndex, groupFields[j]);
+ }
+ aggregator.init(tupleBuilder, currTupleAccessor, currTupleIndex, aggregateState);
+ } else {
+ aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0, aggregateState);
+ }
+ }
+
+ private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
+ throws HyracksDataException {
+ tupleBuilder.reset();
+ for (int j = 0; j < groupFields.length; j++) {
+ tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, groupFields[j]);
+ }
+ aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
+ if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outFrame, writer);
+ appender.reset(outFrame, true);
+ if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ throw new HyracksDataException("The output of size " + tupleBuilder.getSize()
+ + " cannot be fit into a frame of size " + outFrame.array().length);
+ }
+ }
+
+ }
+
+ private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+ for (int i = 0; i < comparators.length; ++i) {
+ int fIdx = groupFields[i];
+ int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
+ int l1 = a1.getFieldLength(t1Idx, fIdx);
+ int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+ int l2 = a2.getFieldLength(t2Idx, fIdx);
+ if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (!first) {
+ writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ }
+ }
+ aggregateState.close();
+ writer.close();
+ }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/IClusteredAggregatorDescriptorFactory.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/IClusteredAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..3256f08
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/group/IClusteredAggregatorDescriptorFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.group;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+
+/**
+ *
+ */
+public interface IClusteredAggregatorDescriptorFactory extends Serializable {
+
+ IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults,
+ IFrameWriter resultWriter, ByteBuffer outputFrame, FrameTupleAppender appender) throws HyracksDataException;
+
+}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 5156dbf..7221cb5 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -157,16 +157,34 @@
ITupleReference tupleRef = cursor.getTuple();
/**
+ * merge with updated tuple
+ */
+ ITupleReference indexEntryTuple = tupleRef;
+ ITupleReference cachedUpdatedLastTuple = updateBuffer.getLastTuple();
+ if (cachedUpdatedLastTuple != null) {
+ if (compare(cachedUpdatedLastTuple, tupleRef) == 0) {
+ indexEntryTuple = cachedUpdatedLastTuple;
+ }
+ }
+
+ /**
* call the update function
*/
- functionProxy.functionCall(leftAccessor, tIndex, tupleRef, cloneUpdateTb);
+ functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb);
- //doing copy update
- CopyUpdateUtil.copyUpdate(tempTupleReference, tupleRef, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
- rangePred);
+ /**
+ * doing copy update
+ */
+ CopyUpdateUtil.copyUpdate(tempTupleReference, indexEntryTuple, updateBuffer, cloneUpdateTb, indexAccessor,
+ cursor, rangePred);
}
}
+ /** compare tuples */
+ private int compare(ITupleReference left, ITupleReference right) throws Exception {
+ return lowKeySearchCmp.compare(left, right);
+ }
+
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index 4ca7533..b21cd2a 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -195,8 +195,10 @@
// TODO: currently use low key only, check what they mean
int cmp = compare(lowKey, currentTopTuple);
if (cmp <= 0) {
- if (cmp == 0)
+ if (cmp == 0) {
outputMatch(i);
+ currentTopTuple = cursor.getTuple();
+ }
i++;
} else {
moveTreeCursor();
@@ -262,16 +264,28 @@
}
//for the join match casesos
- private void writeResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
+ private void writeResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference indexTuple)
throws Exception {
/**
+ * merge with the cached tuple, if any
+ */
+ ITupleReference indexEntryTuple = indexTuple;
+ ITupleReference cachedUpdatedLastTuple = updateBuffer.getLastTuple();
+ if (cachedUpdatedLastTuple != null) {
+ if (compare(cachedUpdatedLastTuple, indexTuple) == 0) {
+ indexEntryTuple = cachedUpdatedLastTuple;
+ }
+ }
+ /**
* function call
*/
- functionProxy.functionCall(leftAccessor, tIndex, frameTuple, cloneUpdateTb);
+ functionProxy.functionCall(leftAccessor, tIndex, indexEntryTuple, cloneUpdateTb);
- //doing clone update
- CopyUpdateUtil.copyUpdate(tempTupleReference, frameTuple, updateBuffer, cloneUpdateTb, indexAccessor, cursor,
- rangePred);
+ /**
+ * doing clone update
+ */
+ CopyUpdateUtil.copyUpdate(tempTupleReference, indexEntryTuple, updateBuffer, cloneUpdateTb, indexAccessor,
+ cursor, rangePred);
}
/** write result for outer case */
@@ -290,4 +304,4 @@
public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
writers[index] = writer;
}
-}
\ No newline at end of file
+}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
index b2be366..ea1e02e 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
@@ -25,6 +25,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
@@ -41,6 +42,7 @@
private final FrameTupleAppender appender;
private final IHyracksTaskContext ctx;
private final FrameTupleReference tuple = new FrameTupleReference();
+ private final FrameTupleReference lastTuple = new FrameTupleReference();
private final int frameSize;
private IFrameTupleAccessor fta;
@@ -104,6 +106,21 @@
appender.reset(buffer, true);
}
+ /**
+ * return the last updated
+ *
+ * @throws HyracksDataException
+ */
+ public ITupleReference getLastTuple() throws HyracksDataException {
+ fta.reset(buffers.get(currentInUse));
+ int tupleIndex = fta.getTupleCount() - 1;
+ if (tupleIndex < 0) {
+ return null;
+ }
+ lastTuple.reset(fta, tupleIndex);
+ return lastTuple;
+ }
+
private void allocate(int index) throws HyracksDataException {
if (index >= buffers.size()) {
buffers.add(ctx.allocateFrame());
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
index 2828451..1df75ae 100644
--- a/pregelix/pregelix-dataflow/pom.xml
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -7,8 +7,7 @@
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the
License for the specific language governing permissions and ! limitations
under the License. ! -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>pregelix-dataflow</artifactId>
<packaging>jar</packaging>
@@ -17,7 +16,7 @@
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
</parent>
@@ -84,75 +83,75 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-api</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-dataflow-std-base</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-data-std</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-btree</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-lsm-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-cc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-nc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-ipc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 24a0a9e..e25a46a 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -79,7 +79,7 @@
bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000,
threadFactory);
- int numPagesInMemComponents = numPages / 4;
+ int numPagesInMemComponents = numPages / 8;
vBufferCache = new MultitenantVirtualBufferCache(new VirtualBufferCache(new HeapBufferAllocator(), pageSize,
numPagesInMemComponents));
ioManager = (IOManager) appCtx.getRootContext().getIOManager();
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 1d7c979..75f8ed8 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -31,6 +31,7 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.JobStateUtils;
import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
import edu.uci.ics.pregelix.dataflow.context.StateKey;
@@ -91,22 +92,6 @@
}
}
- public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
- try {
- FileSystem dfs = FileSystem.get(conf);
- String pathStr = IterationUtils.TMP_DIR + jobId + "fterm";
- Path path = new Path(pathStr);
- if (!dfs.exists(path)) {
- FSDataOutputStream output = dfs.create(path, true);
- output.writeBoolean(true);
- output.flush();
- output.close();
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
public static void writeGlobalAggregateValue(Configuration conf, String jobId, Writable agg)
throws HyracksDataException {
try {
@@ -136,19 +121,12 @@
}
}
+ public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+ JobStateUtils.writeForceTerminationState(conf, jobId);
+ }
+
public static boolean readForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
- try {
- FileSystem dfs = FileSystem.get(conf);
- String pathStr = IterationUtils.TMP_DIR + jobId + "fterm";
- Path path = new Path(pathStr);
- if (dfs.exists(path)) {
- return true;
- } else {
- return false;
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
+ return JobStateUtils.readForceTerminationState(conf, jobId);
}
public static Writable readGlobalAggregateValue(Configuration conf, String jobId) throws HyracksDataException {
diff --git a/pregelix/pregelix-dist/pom.xml b/pregelix/pregelix-dist/pom.xml
index a868ff2..f0551a6 100644
--- a/pregelix/pregelix-dist/pom.xml
+++ b/pregelix/pregelix-dist/pom.xml
@@ -1,24 +1,19 @@
<?xml version="1.0"?>
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
+<!-- ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License"); ! you may
+ not use this file except in compliance with the License. ! you may obtain
+ a copy of the License from ! ! http://www.apache.org/licenses/LICENSE-2.0
+ ! ! Unless required by applicable law or agreed to in writing, software !
+ distributed under the License is distributed on an "AS IS" BASIS, ! WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the
+ License for the specific language governing permissions and ! limitations
+ under the License. ! -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
</parent>
<artifactId>pregelix-dist</artifactId>
<name>pregelix-dist</name>
@@ -38,35 +33,35 @@
</configuration>
</plugin>
<plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>2.2-beta-5</version>
- <executions>
- <execution>
- <configuration>
- <descriptors>
- <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
- </descriptors>
- </configuration>
- <phase>package</phase>
- <goals>
- <goal>attached</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
+ <executions>
+ <execution>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-core</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-example</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
diff --git a/pregelix/pregelix-dist/src/main/assembly/binary-assembly.xml b/pregelix/pregelix-dist/src/main/assembly/binary-assembly.xml
index ab46338..a0fc2ab 100644
--- a/pregelix/pregelix-dist/src/main/assembly/binary-assembly.xml
+++ b/pregelix/pregelix-dist/src/main/assembly/binary-assembly.xml
@@ -1,17 +1,12 @@
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
+<!-- ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License"); ! you may
+ not use this file except in compliance with the License. ! you may obtain
+ a copy of the License from ! ! http://www.apache.org/licenses/LICENSE-2.0
+ ! ! Unless required by applicable law or agreed to in writing, software !
+ distributed under the License is distributed on an "AS IS" BASIS, ! WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the
+ License for the specific language governing permissions and ! limitations
+ under the License. ! -->
<assembly>
<id>binary-assembly</id>
<formats>
@@ -31,25 +26,25 @@
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
- <directory>../pregelix-core/target/appassembler/lib</directory>
- <outputDirectory>lib</outputDirectory>
- <includes>
- <include>*.jar</include>
- </includes>
- <fileMode>0755</fileMode>
- </fileSet>
- <fileSet>
- <directory>../pregelix-example/target</directory>
- <outputDirectory>examples</outputDirectory>
- <includes>
- <include>*with-dependencies.jar</include>
- </includes>
+ <directory>../pregelix-core/target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ <includes>
+ <include>*.jar</include>
+ </includes>
<fileMode>0755</fileMode>
- </fileSet>
+ </fileSet>
<fileSet>
- <directory>../pregelix-example/data</directory>
- <outputDirectory>data</outputDirectory>
- <fileMode>0755</fileMode>
- </fileSet>
+ <directory>../pregelix-example/target</directory>
+ <outputDirectory>examples</outputDirectory>
+ <includes>
+ <include>*with-dependencies.jar</include>
+ </includes>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>../pregelix-example/data</directory>
+ <outputDirectory>data</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
</fileSets>
</assembly>
diff --git a/pregelix/pregelix-dist/src/main/resources/scripts/copylog.sh b/pregelix/pregelix-dist/src/main/resources/scripts/copylog.sh
new file mode 100644
index 0000000..7767b2d
--- /dev/null
+++ b/pregelix/pregelix-dist/src/main/resources/scripts/copylog.sh
@@ -0,0 +1,7 @@
+. conf/cluster.properties
+
+NODEID=`hostname | cut -d '.' -f 1`
+#echo $NODEID
+
+#echo "rsync ${NCLOGS_DIR}/${NODEID}.log ${1}:${2}"
+rsync ${NCLOGS_DIR}/${NODEID}.log ${1}:${2}
diff --git a/pregelix/pregelix-dist/src/main/resources/scripts/dumpAll.sh b/pregelix/pregelix-dist/src/main/resources/scripts/dumpAll.sh
new file mode 100644
index 0000000..e7d45e8
--- /dev/null
+++ b/pregelix/pregelix-dist/src/main/resources/scripts/dumpAll.sh
@@ -0,0 +1,12 @@
+. conf/cluster.properties
+PREGELIX_PATH=`pwd`
+LOG_PATH=$PREGELIX_PATH/logs/
+rm -rf $LOG_PATH
+mkdir $LOG_PATH
+ccname=`hostname`
+
+for i in `cat conf/slaves`
+do
+ ssh $i "cd ${PREGELIX_PATH}; bin/dumptrace.sh; bin/copylog.sh ${ccname} ${LOG_PATH}"
+done
+
diff --git a/pregelix/pregelix-dist/src/main/resources/scripts/dumptrace.sh b/pregelix/pregelix-dist/src/main/resources/scripts/dumptrace.sh
new file mode 100644
index 0000000..9fe55f0
--- /dev/null
+++ b/pregelix/pregelix-dist/src/main/resources/scripts/dumptrace.sh
@@ -0,0 +1,15 @@
+echo `hostname`
+#Kill process
+PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=pregelixnc'|awk '{print $2}'`
+
+if [ "$PID" == "" ]; then
+ PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
+ USERID=`id | sed 's/^uid=//;s/(.*$//'`
+ PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=pregelixnc'|awk '{print $2}'`
+fi
+
+echo $PID
+kill -QUIT $PID
diff --git a/pregelix/pregelix-example/pom.xml b/pregelix/pregelix-example/pom.xml
index c2538b1..1066e3b 100644
--- a/pregelix/pregelix-example/pom.xml
+++ b/pregelix/pregelix-example/pom.xml
@@ -21,7 +21,7 @@
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
</parent>
<build>
@@ -107,6 +107,7 @@
<include>expect*</include>
<include>ClusterController*</include>
<include>edu.uci.*</include>
+ <include>dev*</include>
</includes>
</fileset>
</filesets>
@@ -119,7 +120,7 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-core</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/EarlyTerminationVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/EarlyTerminationVertex.java
new file mode 100644
index 0000000..e369d29
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/EarlyTerminationVertex.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+public class EarlyTerminationVertex extends Vertex<VLongWritable, VLongWritable, VLongWritable, VLongWritable> {
+ private VLongWritable tempValue = new VLongWritable();
+
+ @Override
+ public void compute(Iterator<VLongWritable> msgIterator) {
+ if (getSuperstep() == 1) {
+ if (getVertexId().get() % 4 == 2) {
+ terminatePartition();
+ } else {
+ tempValue.set(1);
+ setVertexValue(tempValue);
+ }
+ }
+ if (getSuperstep() == 2) {
+ if (getVertexId().get() % 4 == 3) {
+ terminatePartition();
+ } else {
+ tempValue.set(2);
+ setVertexValue(tempValue);
+ voteToHalt();
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getVertexId() + " " + getVertexValue();
+ }
+
+ /**
+ * Simple VertexWriter that support
+ */
+ public static class SimpleEarlyTerminattionVertexWriter extends
+ TextVertexWriter<VLongWritable, VLongWritable, VLongWritable> {
+ public SimpleEarlyTerminattionVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<VLongWritable, VLongWritable, VLongWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+ new Text(vertex.getVertexValue().toString()));
+ }
+ }
+
+ public static class SimpleEarlyTerminattionVertexOutputFormat extends
+ TextVertexOutputFormat<VLongWritable, VLongWritable, VLongWritable> {
+
+ @Override
+ public VertexWriter<VLongWritable, VLongWritable, VLongWritable> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+ return new SimpleEarlyTerminattionVertexWriter(recordWriter);
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(EarlyTerminationVertex.class.getSimpleName());
+ job.setVertexClass(EarlyTerminationVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleEarlyTerminattionVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setDynamicVertexValueSize(true);
+ Client.run(args, job);
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
index 7cf8408..7fae776 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
@@ -18,7 +18,6 @@
import java.io.IOException;
import java.util.Iterator;
-import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -32,6 +31,7 @@
import edu.uci.ics.pregelix.example.client.Client;
import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
import edu.uci.ics.pregelix.example.io.VLongWritable;
/**
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowFixedsizeVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowFixedsizeVertex.java
new file mode 100644
index 0000000..6c3c752
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowFixedsizeVertex.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.LongWritable;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+public class MessageOverflowFixedsizeVertex extends Vertex<VLongWritable, LongWritable, VLongWritable, LongWritable> {
+
+ private LongWritable outputMsg = new LongWritable(1);
+ private Random rand = new Random(System.currentTimeMillis());
+ private LongWritable tmpVertexValue = new LongWritable(0);
+ private int numOfMsgClones = 10000;
+
+ @Override
+ public void compute(Iterator<LongWritable> msgIterator) {
+ if (getSuperstep() == 1) {
+ for (int i = 0; i < numOfMsgClones; i++) {
+ outputMsg.set(Math.abs(rand.nextLong()));
+ sendMsgToAllEdges(outputMsg);
+ }
+ tmpVertexValue.set(0);
+ setVertexValue(tmpVertexValue);
+ }
+ if (getSuperstep() == 2) {
+ long numOfMsg = getVertexValue().get();
+ while (msgIterator.hasNext()) {
+ msgIterator.next();
+ numOfMsg++;
+ }
+ tmpVertexValue.set(numOfMsg);
+ setVertexValue(tmpVertexValue);
+ voteToHalt();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getVertexId() + " " + getVertexValue();
+ }
+
+ /**
+ * Simple VertexWriter that support
+ */
+ public static class SimpleMessageOverflowVertexWriter extends
+ TextVertexWriter<VLongWritable, LongWritable, VLongWritable> {
+ public SimpleMessageOverflowVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<VLongWritable, LongWritable, VLongWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+ new Text(vertex.getVertexValue().toString()));
+ }
+ }
+
+ public static class SimpleMessageOverflowVertexOutputFormat extends
+ TextVertexOutputFormat<VLongWritable, LongWritable, VLongWritable> {
+
+ @Override
+ public VertexWriter<VLongWritable, LongWritable, VLongWritable> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+ return new SimpleMessageOverflowVertexWriter(recordWriter);
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(MessageOverflowFixedsizeVertex.class.getSimpleName());
+ job.setVertexClass(MessageOverflowFixedsizeVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setDynamicVertexValueSize(true);
+ Client.run(args, job);
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java
new file mode 100644
index 0000000..d0221bf
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+public class MessageOverflowVertex extends Vertex<VLongWritable, VLongWritable, VLongWritable, VLongWritable> {
+
+ private VLongWritable outputMsg = new VLongWritable(1);
+ private Random rand = new Random(System.currentTimeMillis());
+ private VLongWritable tmpVertexValue = new VLongWritable(0);
+ private int numOfMsgClones = 10000;
+ private int numIncomingMsgs = 0;
+
+ @Override
+ public void open() {
+ if (getSuperstep() == 2) {
+ numIncomingMsgs = 0;
+ }
+ }
+
+ @Override
+ public void compute(Iterator<VLongWritable> msgIterator) {
+ if (getSuperstep() == 1) {
+ for (int i = 0; i < numOfMsgClones; i++) {
+ outputMsg.set(Math.abs(rand.nextLong()));
+ sendMsgToAllEdges(outputMsg);
+ }
+ tmpVertexValue.set(0);
+ setVertexValue(tmpVertexValue);
+ }
+ if (getSuperstep() == 2) {
+ while (msgIterator.hasNext()) {
+ msgIterator.next();
+ numIncomingMsgs++;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ if (getSuperstep() == 2) {
+ tmpVertexValue.set(numIncomingMsgs);
+ setVertexValue(tmpVertexValue);
+ voteToHalt();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getVertexId() + " " + getVertexValue();
+ }
+
+ /**
+ * Simple VertexWriter that support
+ */
+ public static class SimpleMessageOverflowVertexWriter extends
+ TextVertexWriter<VLongWritable, VLongWritable, VLongWritable> {
+ public SimpleMessageOverflowVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<VLongWritable, VLongWritable, VLongWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+ new Text(vertex.getVertexValue().toString()));
+ }
+ }
+
+ public static class SimpleMessageOverflowVertexOutputFormat extends
+ TextVertexOutputFormat<VLongWritable, VLongWritable, VLongWritable> {
+
+ @Override
+ public VertexWriter<VLongWritable, VLongWritable, VLongWritable> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+ return new SimpleMessageOverflowVertexWriter(recordWriter);
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(MessageOverflowVertex.class.getSimpleName());
+ job.setVertexClass(MessageOverflowVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setDynamicVertexValueSize(true);
+ Client.run(args, job);
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index 8664667..a866c1c 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -21,7 +21,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -45,6 +44,7 @@
import edu.uci.ics.pregelix.example.client.Client;
import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
import edu.uci.ics.pregelix.example.io.VLongWritable;
/**
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
index 6a42636..1bb33b8 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
@@ -22,7 +22,6 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -42,6 +41,7 @@
import edu.uci.ics.pregelix.example.client.Client;
import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
import edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat;
+import edu.uci.ics.pregelix.example.io.ByteWritable;
import edu.uci.ics.pregelix.example.io.VLongWritable;
/**
@@ -116,7 +116,7 @@
}
@Override
- public void compute(Iterator<ByteWritable> msgIterator) {
+ public void compute(Iterator<ByteWritable> msgIterator) throws Exception {
if (sourceId < 0) {
sourceId = getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
}
@@ -171,13 +171,20 @@
return getVertexId() + " " + getVertexValue();
}
- private void signalTerminate() {
- Configuration conf = getContext().getConfiguration();
- try {
- IterationUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
- writeReachibilityResult(conf, true);
- } catch (Exception e) {
- throw new IllegalStateException(e);
+ private void signalTerminate() throws Exception {
+ writeReachibilityResult(getContext().getConfiguration(), true);
+ terminateJob();
+ }
+
+ private void writeReachibilityResult(Configuration conf, boolean terminate) throws IOException {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "reachibility";
+ Path path = new Path(pathStr);
+ if (!dfs.exists(path)) {
+ FSDataOutputStream output = dfs.create(path, true);
+ output.writeBoolean(terminate);
+ output.flush();
+ output.close();
}
}
@@ -187,22 +194,6 @@
}
}
- private void writeReachibilityResult(Configuration conf, boolean terminate) {
- try {
- FileSystem dfs = FileSystem.get(conf);
- String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "reachibility";
- Path path = new Path(pathStr);
- if (!dfs.exists(path)) {
- FSDataOutputStream output = dfs.create(path, true);
- output.writeBoolean(terminate);
- output.flush();
- output.close();
- }
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
-
private static boolean readReachibilityResult(Configuration conf) {
try {
FileSystem dfs = FileSystem.get(conf);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
index 41c26b1..648f168 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
@@ -19,7 +19,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -32,6 +31,7 @@
import edu.uci.ics.pregelix.example.client.Client;
import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
import edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
import edu.uci.ics.pregelix.example.io.VLongWritable;
/**
@@ -127,7 +127,7 @@
}
voteToHalt();
}
-
+
@Override
public String toString() {
return getVertexId() + " " + getVertexValue();
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
index 6cb1f4a..f60387a 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
@@ -82,7 +82,7 @@
String[] inputs = options.inputPaths.split(";");
FileInputFormat.setInputPaths(job, inputs[0]);
for (int i = 1; i < inputs.length; i++)
- FileInputFormat.addInputPaths(job, inputs[0]);
+ FileInputFormat.addInputPaths(job, inputs[i]);
FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, options.numVertices);
job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
index f46d9c3..67681d3 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
@@ -18,7 +18,6 @@
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -31,6 +30,7 @@
import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
import edu.uci.ics.pregelix.example.io.VLongWritable;
public class TextPageRankInputFormat extends
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
index 013a063..3ea4a9f 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
@@ -18,7 +18,6 @@
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -31,6 +30,7 @@
import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
import edu.uci.ics.pregelix.example.io.VLongWritable;
public class TextShortestPathsInputFormat extends
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BooleanWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BooleanWritable.java
new file mode 100644
index 0000000..c943288
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BooleanWritable.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * Writable for Boolean values.
+ */
+public class BooleanWritable extends org.apache.hadoop.io.BooleanWritable implements WritableSizable {
+
+ public BooleanWritable(boolean value) {
+ super(value);
+ }
+
+ public BooleanWritable() {
+ super();
+ }
+
+ public int sizeInBytes() {
+ return 1;
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/ByteWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/ByteWritable.java
new file mode 100644
index 0000000..2a1fd22
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/ByteWritable.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * Writable for Byte values.
+ */
+public class ByteWritable extends org.apache.hadoop.io.ByteWritable implements WritableSizable {
+
+ public ByteWritable(byte value) {
+ super(value);
+ }
+
+ public ByteWritable() {
+ super();
+ }
+
+ public int sizeInBytes() {
+ return 1;
+ }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BytesWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BytesWritable.java
new file mode 100644
index 0000000..04a5549
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/BytesWritable.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * Writable for Bytes values.
+ */
+public class BytesWritable extends org.apache.hadoop.io.BytesWritable implements WritableSizable {
+
+ public BytesWritable(byte[] value) {
+ super(value);
+ }
+
+ public BytesWritable() {
+ super();
+ }
+
+ @Override
+ public int sizeInBytes() {
+ return getLength() + 4; // add the integer size slot
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/DoubleWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/DoubleWritable.java
new file mode 100644
index 0000000..ebc7fe4
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/DoubleWritable.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * Writable for Double values.
+ */
+public class DoubleWritable extends org.apache.hadoop.io.DoubleWritable implements WritableSizable {
+
+ public DoubleWritable(double value) {
+ super(value);
+ }
+
+ public DoubleWritable() {
+ super();
+ }
+
+ public int sizeInBytes() {
+ return 8;
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/FloatWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/FloatWritable.java
new file mode 100644
index 0000000..6772b0a
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/FloatWritable.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/** A WritableComparable for floats. */
+public class FloatWritable extends org.apache.hadoop.io.FloatWritable implements WritableSizable {
+
+ public FloatWritable(float value) {
+ super(value);
+ }
+
+ public FloatWritable() {
+ super();
+ }
+
+ public int sizeInBytes() {
+ return 4;
+ }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/IntWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/IntWritable.java
new file mode 100644
index 0000000..4944232
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/IntWritable.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/** A WritableComparable for ints. */
+public class IntWritable extends org.apache.hadoop.io.IntWritable implements WritableSizable {
+
+ public IntWritable(int value) {
+ super(value);
+ }
+
+ public IntWritable() {
+ super();
+ }
+
+ public int sizeInBytes() {
+ return 4;
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/LongWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/LongWritable.java
new file mode 100644
index 0000000..3ecab79
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/LongWritable.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/** A WritableComparable for longs. */
+public class LongWritable extends org.apache.hadoop.io.LongWritable implements WritableSizable {
+
+ public LongWritable(long value) {
+ super(value);
+ }
+
+ public LongWritable() {
+ super();
+ }
+
+ public int sizeInBytes() {
+ return 8;
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/NullWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/NullWritable.java
new file mode 100644
index 0000000..a2f184a
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/NullWritable.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/** Singleton Writable with no data. */
+@SuppressWarnings("rawtypes")
+public class NullWritable implements WritableComparable, WritableSizable {
+
+ private static final NullWritable THIS = new NullWritable();
+
+ private NullWritable() {
+ } // no public ctor
+
+ /** Returns the single instance of this class. */
+ public static NullWritable get() {
+ return THIS;
+ }
+
+ public String toString() {
+ return "(null)";
+ }
+
+ public int sizeInBytes() {
+ return 0;
+ }
+
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(Object other) {
+ if (!(other instanceof NullWritable)) {
+ throw new ClassCastException("can't compare " + other.getClass().getName() + " to NullWritable");
+ }
+ return 0;
+ }
+
+ public boolean equals(Object other) {
+ return other instanceof NullWritable;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ }
+
+ public void write(DataOutput out) throws IOException {
+ }
+
+ /** A Comparator "optimized" for NullWritable. */
+ public static class Comparator extends WritableComparator {
+ public Comparator() {
+ super(NullWritable.class);
+ }
+
+ /**
+ * Compare the buffers in serialized form.
+ */
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ assert 0 == l1;
+ assert 0 == l2;
+ return 0;
+ }
+ }
+
+ static { // register this comparator
+ WritableComparator.define(NullWritable.class, new Comparator());
+ }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VIntWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VIntWritable.java
new file mode 100644
index 0000000..94df74f
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VIntWritable.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * A WritableComparable for integer values stored in variable-length format.
+ * Such values take between one and five bytes. Smaller values take fewer bytes.
+ *
+ * @see org.apache.hadoop.io.WritableUtils#readVInt(DataInput)
+ */
+@SuppressWarnings("rawtypes")
+public class VIntWritable implements WritableComparable, WritableSizable {
+ private int value;
+
+ public VIntWritable() {
+ }
+
+ public VIntWritable(int value) {
+ set(value);
+ }
+
+ public int sizeInBytes() {
+ return 5;
+ }
+
+ /** Set the value of this VIntWritable. */
+ public void set(int value) {
+ this.value = value;
+ }
+
+ /** Return the value of this VIntWritable. */
+ public int get() {
+ return value;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ value = WritableUtils.readVInt(in);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, value);
+ }
+
+ /** Returns true iff <code>o</code> is a VIntWritable with the same value. */
+ public boolean equals(Object o) {
+ if (!(o instanceof VIntWritable))
+ return false;
+ VIntWritable other = (VIntWritable) o;
+ return this.value == other.value;
+ }
+
+ public int hashCode() {
+ return value;
+ }
+
+ /** Compares two VIntWritables. */
+ public int compareTo(Object o) {
+ int thisValue = this.value;
+ int thatValue = ((VIntWritable) o).value;
+ return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+ }
+
+ public String toString() {
+ return Integer.toString(value);
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
index e12d930..ec1109f 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
@@ -22,16 +22,26 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.api.util.SerDeUtils;
/**
* A WritableComparable for longs in a variable-length format. Such values take
- * between one and five bytes. Smaller values take fewer bytes.
+ * between one and nine bytes. Smaller values take fewer bytes.
*
* @see org.apache.hadoop.io.WritableUtils#readVLong(DataInput)
*/
@SuppressWarnings("rawtypes")
-public class VLongWritable implements WritableComparable {
+public class VLongWritable implements WritableComparable, WritableSizable {
+ private static long ONE_BYTE_MAX = 2 ^ 7 - 1;
+ private static long TWO_BYTE_MAX = 2 ^ 14 - 1;
+ private static long THREE_BYTE_MAX = 2 ^ 21 - 1;
+ private static long FOUR_BYTE_MAX = 2 ^ 28 - 1;
+ private static long FIVE_BYTE_MAX = 2 ^ 35 - 1;;
+ private static long SIX_BYTE_MAX = 2 ^ 42 - 1;;
+ private static long SEVEN_BYTE_MAX = 2 ^ 49 - 1;;
+ private static long EIGHT_BYTE_MAX = 2 ^ 54 - 1;;
+
private long value;
public VLongWritable() {
@@ -41,6 +51,28 @@
set(value);
}
+ public int sizeInBytes() {
+ if (value >= 0 && value <= ONE_BYTE_MAX) {
+ return 1;
+ } else if (value > ONE_BYTE_MAX && value <= TWO_BYTE_MAX) {
+ return 2;
+ } else if (value > TWO_BYTE_MAX && value <= THREE_BYTE_MAX) {
+ return 3;
+ } else if (value > THREE_BYTE_MAX && value <= FOUR_BYTE_MAX) {
+ return 4;
+ } else if (value > FOUR_BYTE_MAX && value <= FIVE_BYTE_MAX) {
+ return 5;
+ } else if (value > FIVE_BYTE_MAX && value <= SIX_BYTE_MAX) {
+ return 6;
+ } else if (value > SIX_BYTE_MAX && value <= SEVEN_BYTE_MAX) {
+ return 7;
+ } else if (value > SEVEN_BYTE_MAX && value <= EIGHT_BYTE_MAX) {
+ return 8;
+ } else {
+ return 9;
+ }
+ }
+
/** Set the value of this LongWritable. */
public void set(long value) {
this.value = value;
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java
index 0a58c00..dd86a45 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java
@@ -23,14 +23,13 @@
import java.util.Set;
import java.util.TreeSet;
-import org.apache.hadoop.io.Writable;
-
+import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.example.io.VLongWritable;
/**
* The adjacency list contains <src, list-of-neighbors>
*/
-public class AdjacencyListWritable implements Writable {
+public class AdjacencyListWritable implements WritableSizable {
private VLongWritable sourceVertex = new VLongWritable();
private Set<VLongWritable> destinationVertexes = new TreeSet<VLongWritable>();
@@ -96,4 +95,13 @@
return destinationVertexes.contains(v);
}
+ @Override
+ public int sizeInBytes() {
+ int size = 4; // the size of list bytes
+ for (VLongWritable dest : destinationVertexes) {
+ size += dest.sizeInBytes();
+ }
+ return size;
+ }
+
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index 15117a1..13cec61 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -26,8 +26,13 @@
import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
import edu.uci.ics.pregelix.example.ConnectedComponentsVertex;
import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
+import edu.uci.ics.pregelix.example.EarlyTerminationVertex;
+import edu.uci.ics.pregelix.example.EarlyTerminationVertex.SimpleEarlyTerminattionVertexOutputFormat;
import edu.uci.ics.pregelix.example.GraphMutationVertex;
import edu.uci.ics.pregelix.example.GraphMutationVertex.SimpleGraphMutationVertexOutputFormat;
+import edu.uci.ics.pregelix.example.MessageOverflowFixedsizeVertex;
+import edu.uci.ics.pregelix.example.MessageOverflowVertex;
+import edu.uci.ics.pregelix.example.MessageOverflowVertex.SimpleMessageOverflowVertexOutputFormat;
import edu.uci.ics.pregelix.example.PageRankVertex;
import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
import edu.uci.ics.pregelix.example.PageRankVertex.SimulatedPageRankVertexInputFormat;
@@ -233,7 +238,7 @@
job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
- job.setMutationOrVariableSizedUpdateHeavy(true);
+ job.setLSMStorage(true);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH3);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -247,7 +252,7 @@
job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
- job.setMutationOrVariableSizedUpdateHeavy(true);
+ job.setLSMStorage(true);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH4);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -262,7 +267,7 @@
job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
- job.setMutationOrVariableSizedUpdateHeavy(true);
+ job.setLSMStorage(true);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH5);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -280,6 +285,59 @@
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
+ private static void generateMessageOverflowFixedsizeJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(MessageOverflowFixedsizeVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(MessageOverflowFixedsizeVertex.SimpleMessageOverflowVertexOutputFormat.class);
+ job.setFrameSize(2048);
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void generateMessageOverflowJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(MessageOverflowVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setDynamicVertexValueSize(true);
+ job.setFrameSize(2048);
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void generateMessageOverflowJobLSM(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(MessageOverflowVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setDynamicVertexValueSize(true);
+ job.setFrameSize(2048);
+ job.setLSMStorage(true);
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void generateEarlyTerminationJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(EarlyTerminationVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleEarlyTerminattionVertexOutputFormat.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
private static void genPageRank() throws IOException {
generatePageRankJob("PageRank", outputBase + "PageRank.xml");
generatePageRankJobReal("PageRank", outputBase + "PageRankReal.xml");
@@ -319,6 +377,16 @@
generateGraphMutationJob("Graph Mutation", outputBase + "GraphMutation.xml");
}
+ private static void genMessageOverflow() throws IOException {
+ generateMessageOverflowJob("Message Overflow", outputBase + "MessageOverflow.xml");
+ generateMessageOverflowJobLSM("Message Overflow LSM", outputBase + "MessageOverflowLSM.xml");
+ generateMessageOverflowFixedsizeJob("Message Overflow Fixedsize", outputBase + "MessageOverflowFixedsize.xml");
+ }
+
+ private static void genEarlyTermination() throws IOException {
+ generateEarlyTerminationJob("Early Termination", outputBase + "EarlyTermination.xml");
+ }
+
public static void main(String[] args) throws IOException {
genPageRank();
genShortestPath();
@@ -327,5 +395,7 @@
genTriangleCounting();
genMaximalClique();
genGraphMutation();
+ genMessageOverflow();
+ genEarlyTermination();
}
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
new file mode 100644
index 0000000..196b114
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.lib.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.util.Random;
+
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+import edu.uci.ics.pregelix.example.io.BooleanWritable;
+import edu.uci.ics.pregelix.example.io.ByteWritable;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
+import edu.uci.ics.pregelix.example.io.IntWritable;
+import edu.uci.ics.pregelix.example.io.LongWritable;
+import edu.uci.ics.pregelix.example.io.NullWritable;
+import edu.uci.ics.pregelix.example.io.VIntWritable;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * @author yingyib
+ */
+public class SizeEstimationTest {
+
+ @Test
+ public void testVLong() throws Exception {
+ Random rand = new Random(System.currentTimeMillis());
+ MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+ for (int i = 0; i < 1000000; i++) {
+ msgList.add(new VLongWritable(Math.abs(rand.nextLong())));
+ }
+ verifySizeEstimation(msgList);
+ }
+
+ @Test
+ public void testLong() throws Exception {
+ Random rand = new Random(System.currentTimeMillis());
+ MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+ for (int i = 0; i < 1000000; i++) {
+ msgList.add(new LongWritable(rand.nextLong()));
+ }
+ verifySizeEstimation(msgList);
+ }
+
+ @Test
+ public void testBoolean() throws Exception {
+ Random rand = new Random(System.currentTimeMillis());
+ MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+ for (int i = 0; i < 1000000; i++) {
+ msgList.add(new BooleanWritable(rand.nextBoolean()));
+ }
+ verifySizeEstimation(msgList);
+ }
+
+ @Test
+ public void testByte() throws Exception {
+ Random rand = new Random(System.currentTimeMillis());
+ MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+ for (int i = 0; i < 1000000; i++) {
+ msgList.add(new ByteWritable((byte) rand.nextInt()));
+ }
+ verifySizeEstimation(msgList);
+ }
+
+ @Test
+ public void testDouble() throws Exception {
+ Random rand = new Random(System.currentTimeMillis());
+ MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+ for (int i = 0; i < 1000000; i++) {
+ msgList.add(new DoubleWritable(rand.nextDouble()));
+ }
+ verifySizeEstimation(msgList);
+ }
+
+ @Test
+ public void testFloat() throws Exception {
+ Random rand = new Random(System.currentTimeMillis());
+ MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+ for (int i = 0; i < 1000000; i++) {
+ msgList.add(new DoubleWritable(rand.nextFloat()));
+ }
+ verifySizeEstimation(msgList);
+ }
+
+ @Test
+ public void testNull() throws Exception {
+ MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+ for (int i = 0; i < 1000000; i++) {
+ msgList.add(NullWritable.get());
+ }
+ verifySizeEstimation(msgList);
+ }
+
+ @Test
+ public void testVInt() throws Exception {
+ Random rand = new Random(System.currentTimeMillis());
+ MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+ for (int i = 0; i < 1000000; i++) {
+ msgList.add(new VIntWritable(rand.nextInt()));
+ }
+ verifySizeEstimation(msgList);
+ }
+
+ @Test
+ public void testInt() throws Exception {
+ Random rand = new Random(System.currentTimeMillis());
+ MsgList<WritableSizable> msgList = new MsgList<WritableSizable>();
+ for (int i = 0; i < 1000000; i++) {
+ msgList.add(new IntWritable(rand.nextInt()));
+ }
+ verifySizeEstimation(msgList);
+ }
+
+ private void verifySizeEstimation(MsgList<WritableSizable> msgList) throws Exception {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutput dos = new DataOutputStream(bos);
+ int accumulatedSize = 5;
+ for (int i = 0; i < msgList.size(); i++) {
+ bos.reset();
+ WritableSizable value = msgList.get(i);
+ value.write(dos);
+ if (value.sizeInBytes() < bos.size()) {
+ throw new Exception(value + " estimated size (" + value.sizeInBytes()
+ + ") is smaller than the actual size" + bos.size());
+ }
+ accumulatedSize += value.sizeInBytes();
+ }
+ bos.reset();
+ msgList.write(dos);
+ if (accumulatedSize < bos.size()) {
+ throw new Exception("Estimated list size (" + accumulatedSize + ") is smaller than the actual size"
+ + bos.size());
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-0 b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-0
new file mode 100644
index 0000000..60a55af
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-0
@@ -0,0 +1,5 @@
+0 2
+4 2
+8 2
+12 2
+16 2
diff --git a/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-1 b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-1
new file mode 100644
index 0000000..32ee93f
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-1
@@ -0,0 +1,5 @@
+1 2
+5 2
+9 2
+13 2
+17 2
diff --git a/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-2 b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-2
new file mode 100644
index 0000000..542ccae
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-2
@@ -0,0 +1,5 @@
+2 0
+6 0
+10 0
+14 0
+18 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-3 b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-3
new file mode 100644
index 0000000..ff0e5b8
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/EarlyTermination/part-3
@@ -0,0 +1,5 @@
+3 1
+7 1
+11 1
+15 1
+19 1
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-0 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-0
new file mode 100644
index 0000000..db5f679
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-0
@@ -0,0 +1,5 @@
+0 10000
+4 70000
+8 30000
+12 90000
+16 50000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-1 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-1
new file mode 100644
index 0000000..3dc4629
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-1
@@ -0,0 +1,5 @@
+1 100000
+5 60000
+9 20000
+13 80000
+17 40000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-2 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-2
new file mode 100644
index 0000000..bc95831
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-2
@@ -0,0 +1,5 @@
+2 90000
+6 50000
+10 10000
+14 70000
+18 30000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-3 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-3
new file mode 100644
index 0000000..b619cd7
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflow/part-3
@@ -0,0 +1,5 @@
+3 80000
+7 40000
+11 100000
+15 60000
+19 20000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-0 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-0
new file mode 100644
index 0000000..db5f679
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-0
@@ -0,0 +1,5 @@
+0 10000
+4 70000
+8 30000
+12 90000
+16 50000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-1 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-1
new file mode 100644
index 0000000..3dc4629
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-1
@@ -0,0 +1,5 @@
+1 100000
+5 60000
+9 20000
+13 80000
+17 40000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-2 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-2
new file mode 100644
index 0000000..bc95831
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-2
@@ -0,0 +1,5 @@
+2 90000
+6 50000
+10 10000
+14 70000
+18 30000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-3 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-3
new file mode 100644
index 0000000..b619cd7
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowFixedsize/part-3
@@ -0,0 +1,5 @@
+3 80000
+7 40000
+11 100000
+15 60000
+19 20000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-0 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-0
new file mode 100644
index 0000000..db5f679
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-0
@@ -0,0 +1,5 @@
+0 10000
+4 70000
+8 30000
+12 90000
+16 50000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-1 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-1
new file mode 100644
index 0000000..3dc4629
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-1
@@ -0,0 +1,5 @@
+1 100000
+5 60000
+9 20000
+13 80000
+17 40000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-2 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-2
new file mode 100644
index 0000000..bc95831
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-2
@@ -0,0 +1,5 @@
+2 90000
+6 50000
+10 10000
+14 70000
+18 30000
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-3 b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-3
new file mode 100644
index 0000000..b619cd7
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MessageOverflowLSM/part-3
@@ -0,0 +1,5 @@
+3 80000
+7 40000
+11 100000
+15 60000
+19 20000
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml b/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
new file mode 100644
index 0000000..d908da8
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Early Termination</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.EarlyTerminationVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.EarlyTerminationVertex$SimpleEarlyTerminattionVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
new file mode 100644
index 0000000..8316c64
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflow.xml
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Message Overflow</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.framesize</name><value>2048</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
new file mode 100644
index 0000000..a894ccd
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowFixedsize.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Message Overflow Fixedsize</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowFixedsizeVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.framesize</name><value>2048</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowFixedsizeVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
new file mode 100644
index 0000000..a9f8925
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MessageOverflowLSM.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Message Overflow LSM</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>pregelix.updateIntensive</name><value>true</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.framesize</name><value>2048</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.MessageOverflowVertex$SimpleMessageOverflowVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index 54e2256..6564eb0 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -21,7 +21,7 @@
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
</parent>
@@ -88,89 +88,89 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-api</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-dataflow-std</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-dataflow</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-std</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-data-std</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-btree</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-lsm-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-cc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-nc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-ipc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 16ecf6c..d46457c 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -32,8 +32,8 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.util.ArrayListWritable;
import edu.uci.ics.pregelix.api.util.ArrayListWritable.ArrayIterator;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
@@ -168,11 +168,16 @@
tbAlive.reset();
vertex = (Vertex) tuple[3];
+
+ if (vertex.isPartitionTerminated()) {
+ vertex.voteToHalt();
+ return;
+ }
vertex.setOutputWriters(writers);
vertex.setOutputAppenders(appenders);
vertex.setOutputTupleBuilders(tbs);
- ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
+ MsgList msgContentList = (MsgList) tuple[1];
msgContentList.reset(msgIterator);
if (!msgIterator.hasNext() && vertex.isHalted()) {
@@ -183,9 +188,15 @@
}
try {
+ if (msgContentList.segmentStart()) {
+ vertex.open();
+ }
vertex.compute(msgIterator);
+ if (msgContentList.segmentEnd()) {
+ vertex.close();
+ }
vertex.finishCompute();
- } catch (IOException e) {
+ } catch (Exception e) {
throw new HyracksDataException(e);
}
@@ -194,7 +205,6 @@
*/
if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
terminate = false;
-
aggregator.step(vertex);
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index fa7e0a1..eba75c9 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -172,6 +172,10 @@
tbAlive.reset();
vertex = (Vertex) tuple[1];
+ if (vertex.isPartitionTerminated()) {
+ vertex.voteToHalt();
+ return;
+ }
vertex.setOutputWriters(writers);
vertex.setOutputAppenders(appenders);
vertex.setOutputTupleBuilders(tbs);
@@ -184,12 +188,13 @@
}
try {
+ vertex.open();
vertex.compute(msgIterator);
+ vertex.close();
vertex.finishCompute();
- } catch (IOException e) {
+ } catch (Exception e) {
throw new HyracksDataException(e);
}
-
/**
* this partition should not terminate
*/
@@ -200,6 +205,7 @@
* call the global aggregator
*/
aggregator.step(vertex);
+
}
@Override
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index 77f28e4..3d52a45 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -14,22 +14,27 @@
*/
package edu.uci.ics.pregelix.runtime.simpleagg;
+import java.nio.ByteBuffer;
+
import org.apache.commons.lang3.tuple.Pair;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunctionFactory;
-public class AccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+public class AccumulatingAggregatorFactory implements IClusteredAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private IAggregateFunctionFactory[] aggFactories;
@@ -41,52 +46,56 @@
@SuppressWarnings("unchecked")
@Override
public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
- RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException {
+ RecordDescriptor outRecordDescriptor, final int[] groupFields, int[] partialgroupFields,
+ final IFrameWriter writer, final ByteBuffer outputFrame, final FrameTupleAppender appender)
+ throws HyracksDataException {
+ final int frameSize = ctx.getFrameSize();
+ final ArrayTupleBuilder internalTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
return new IAggregatorDescriptor() {
-
private FrameTupleReference ftr = new FrameTupleReference();
+ private int groupKeySize = 0;
+ private int metaSlotSize = 4;
+
+ @Override
+ public AggregateState createAggregateStates() {
+ IAggregateFunction[] agg = new IAggregateFunction[aggFactories.length];
+ ArrayBackedValueStorage[] aggOutput = new ArrayBackedValueStorage[aggFactories.length];
+ for (int i = 0; i < agg.length; i++) {
+ aggOutput[i] = new ArrayBackedValueStorage();
+ try {
+ agg[i] = aggFactories[i].createAggregateFunction(ctx, aggOutput[i], writer);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ return new AggregateState(Pair.of(aggOutput, agg));
+ }
@Override
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
- ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
- IAggregateFunction[] agg = aggState.getRight();
-
- // initialize aggregate functions
- for (int i = 0; i < agg.length; i++) {
- aggOutput[i].reset();
- try {
- agg[i].init();
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
+ setGroupKeySize(accessor, tIndex);
+ initAggregateFunctions(state, true);
+ int stateSize = estimateStep(accessor, tIndex, state);
+ if (stateSize > frameSize) {
+ throw new HyracksDataException(
+ "Message combiner intermediate data size "
+ + stateSize
+ + " is larger than frame size! Check the size estimattion implementation in the message combiner.");
}
-
- ftr.reset(accessor, tIndex);
- for (int i = 0; i < agg.length; i++) {
- try {
- agg[i].step(ftr);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
+ singleStep(accessor, tIndex, state);
}
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
- Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
- IAggregateFunction[] agg = aggState.getRight();
- ftr.reset(accessor, tIndex);
- for (int i = 0; i < agg.length; i++) {
- try {
- agg[i].step(ftr);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
+ int stateSize = estimateStep(accessor, tIndex, state);
+ if (stateSize > frameSize) {
+ emitResultTuple(accessor, tIndex, state);
+ initAggregateFunctions(state, false);
}
+ singleStep(accessor, tIndex, state);
}
@Override
@@ -97,7 +106,7 @@
IAggregateFunction[] agg = aggState.getRight();
for (int i = 0; i < agg.length; i++) {
try {
- agg[i].finish();
+ agg[i].finishAll();
tupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
aggOutput[i].getLength());
} catch (Exception e) {
@@ -107,21 +116,6 @@
}
@Override
- public AggregateState createAggregateStates() {
- IAggregateFunction[] agg = new IAggregateFunction[aggFactories.length];
- ArrayBackedValueStorage[] aggOutput = new ArrayBackedValueStorage[aggFactories.length];
- for (int i = 0; i < agg.length; i++) {
- aggOutput[i] = new ArrayBackedValueStorage();
- try {
- agg[i] = aggFactories[i].createAggregateFunction(ctx, aggOutput[i]);
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- return new AggregateState(Pair.of(aggOutput, agg));
- }
-
- @Override
public void reset() {
}
@@ -137,6 +131,97 @@
}
+ private void initAggregateFunctions(AggregateState state, boolean all) throws HyracksDataException {
+ Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+ ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+ IAggregateFunction[] agg = aggState.getRight();
+
+ /**
+ * initialize aggregate functions
+ */
+ for (int i = 0; i < agg.length; i++) {
+ aggOutput[i].reset();
+ try {
+ if (all) {
+ agg[i].initAll();
+ } else {
+ agg[i].init();
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ private void singleStep(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+ IAggregateFunction[] agg = aggState.getRight();
+ ftr.reset(accessor, tIndex);
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ agg[i].step(ftr);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ private int estimateStep(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ int size = metaSlotSize + groupKeySize;
+ Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+ IAggregateFunction[] agg = aggState.getRight();
+ ftr.reset(accessor, tIndex);
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ size += agg[i].estimateStep(ftr) + metaSlotSize;
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ return size;
+ }
+
+ private void emitResultTuple(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ internalTupleBuilder.reset();
+ for (int j = 0; j < groupFields.length; j++) {
+ internalTupleBuilder.addField(accessor, tIndex, groupFields[j]);
+ }
+ Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+ ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+ IAggregateFunction[] agg = aggState.getRight();
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ agg[i].finish();
+ internalTupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
+ aggOutput[i].getLength());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ if (!appender.appendSkipEmptyField(internalTupleBuilder.getFieldEndOffsets(),
+ internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputFrame, writer);
+ appender.reset(outputFrame, true);
+ if (!appender.appendSkipEmptyField(internalTupleBuilder.getFieldEndOffsets(),
+ internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+ throw new HyracksDataException("The output cannot be fit into a frame.");
+ }
+ }
+ }
+
+ public void setGroupKeySize(IFrameTupleAccessor accessor, int tIndex) {
+ groupKeySize = 0;
+ for (int i = 0; i < groupFields.length; i++) {
+ int fIndex = groupFields[i];
+ int fStartOffset = accessor.getFieldStartOffset(tIndex, fIndex);
+ int fLen = accessor.getFieldEndOffset(tIndex, fIndex) - fStartOffset;
+ groupKeySize += fLen + metaSlotSize;
+ }
+ }
+
};
}
}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
index 8090dff..5bc30a2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
@@ -33,6 +34,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
@@ -54,10 +56,11 @@
private MsgList msgList = new MsgList();
private boolean keyRead = false;
- public AggregationFunction(IHyracksTaskContext ctx, IConfigurationFactory confFactory, DataOutput output,
- boolean isFinalStage, boolean partialAggAsInput) throws HyracksDataException {
+ public AggregationFunction(IHyracksTaskContext ctx, IConfigurationFactory confFactory, DataOutput tmpOutput,
+ IFrameWriter groupByOutputWriter, boolean isFinalStage, boolean partialAggAsInput)
+ throws HyracksDataException {
this.conf = confFactory.createConfiguration(ctx);
- this.output = output;
+ this.output = tmpOutput;
this.isFinalStage = isFinalStage;
this.partialAggAsInput = partialAggAsInput;
msgList.setConf(this.conf);
@@ -68,6 +71,12 @@
}
@Override
+ public void initAll() throws HyracksDataException {
+ keyRead = false;
+ combiner.initAll(msgList);
+ }
+
+ @Override
public void init() throws HyracksDataException {
keyRead = false;
combiner.init(msgList);
@@ -75,6 +84,43 @@
@Override
public void step(IFrameTupleReference tuple) throws HyracksDataException {
+ if (!partialAggAsInput) {
+ combiner.stepPartial(key, (WritableSizable) value);
+ } else {
+ combiner.stepFinal(key, value);
+ }
+ }
+
+ @Override
+ public void finish() throws HyracksDataException {
+ try {
+ if (!isFinalStage) {
+ combinedResult = combiner.finishPartial();
+ } else {
+ combinedResult = combiner.finishFinal();
+ }
+ combinedResult.write(output);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void finishAll() throws HyracksDataException {
+ try {
+ if (!isFinalStage) {
+ combinedResult = combiner.finishPartial();
+ } else {
+ combinedResult = combiner.finishFinalAll();
+ }
+ combinedResult.write(output);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public int estimateStep(IFrameTupleReference tuple) throws HyracksDataException {
FrameTupleReference ftr = (FrameTupleReference) tuple;
IFrameTupleAccessor fta = ftr.getFrameTupleAccessor();
ByteBuffer buffer = fta.getBuffer();
@@ -94,28 +140,13 @@
}
value.readFields(valueInput);
if (!partialAggAsInput) {
- combiner.stepPartial(key, value);
+ return combiner.estimateAccumulatedStateByteSizePartial(key, (WritableSizable) value);
} else {
- combiner.stepFinal(key, value);
+ return combiner.estimateAccumulatedStateByteSizeFinal(key, value);
}
} catch (IOException e) {
throw new HyracksDataException(e);
}
-
- }
-
- @Override
- public void finish() throws HyracksDataException {
- try {
- if (!isFinalStage) {
- combinedResult = combiner.finishPartial();
- } else {
- combinedResult = combiner.finishFinal();
- }
- combinedResult.write(output);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
}
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
index 33dfa5d..54eccf5 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
@@ -17,6 +17,7 @@
import java.io.DataOutput;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
@@ -37,9 +38,9 @@
}
@Override
- public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IDataOutputProvider provider)
- throws HyracksException {
+ public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IDataOutputProvider provider,
+ IFrameWriter writer) throws HyracksException {
DataOutput output = provider.getDataOutput();
- return new AggregationFunction(ctx, confFactory, output, isFinalStage, partialAggAsInput);
+ return new AggregationFunction(ctx, confFactory, output, writer, isFinalStage, partialAggAsInput);
}
}