merge from zheilbron/hyracks_msr
diff --git a/pregelix/pregelix-api/pom.xml b/pregelix/pregelix-api/pom.xml
index 305b50c..0152a15 100644
--- a/pregelix/pregelix-api/pom.xml
+++ b/pregelix/pregelix-api/pom.xml
@@ -21,7 +21,7 @@
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
</parent>
<properties>
@@ -82,7 +82,7 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
@@ -95,7 +95,7 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-hdfs-core</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
index 08c7151..5ea6413 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
/**
* This is the abstract class to implement for aggregating the state of all the vertices globally in the graph.
@@ -39,7 +40,7 @@
*/
@SuppressWarnings("rawtypes")
-public abstract class GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> {
+public abstract class GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable, F extends Writable> {
/**
* initialize aggregator
*/
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
index f5daf99..fa03c0c 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
@@ -19,6 +19,7 @@
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
/**
* This is the abstract class to implement for combining of messages that are sent to the same vertex.
@@ -36,7 +37,7 @@
* the type of the partially combined messages
*/
@SuppressWarnings("rawtypes")
-public abstract class MessageCombiner<I extends WritableComparable, M extends Writable, P extends Writable> {
+public abstract class MessageCombiner<I extends WritableComparable, M extends WritableSizable, P extends Writable> {
/**
* initialize combiner
@@ -82,4 +83,36 @@
* @return the final message List
*/
public abstract MsgList<M> finishFinal();
+
+ /**
+ * init the combiner for all segmented bags for one key
+ *
+ * @return the final message List
+ */
+ public void initAll(MsgList providedMsgList) {
+ init(providedMsgList);
+ }
+
+ /**
+ * finish final combiner for all segmented bags for one key
+ *
+ * @return the final message List
+ */
+ public MsgList<M> finishFinalAll() {
+ return finishFinal();
+ }
+
+ /**
+ * @return the accumulated byte size
+ */
+ public int estimateAccumulatedStateByteSizePartial(I vertexIndex, M msg) throws HyracksDataException {
+ return 0;
+ }
+
+ /**
+ * @return the accumulated byte size
+ */
+ public int estimateAccumulatedStateByteSizeFinal(I vertexIndex, P partialAggregate) throws HyracksDataException {
+ return 0;
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
index 104f396..51b62e4 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
@@ -15,8 +15,11 @@
package edu.uci.ics.pregelix.api.graph;
-import org.apache.hadoop.io.Writable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.api.util.ArrayListWritable;
import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -27,9 +30,11 @@
* @param <M>
* message type
*/
-public class MsgList<M extends Writable> extends ArrayListWritable<M> {
+public class MsgList<M extends WritableSizable> extends ArrayListWritable<M> {
/** Defining a layout version for a serializable class. */
private static final long serialVersionUID = 1L;
+ private byte start = 1;
+ private byte end = 2;
/**
* Default constructor.s
@@ -43,4 +48,34 @@
public void setClass() {
setClass((Class<M>) BspUtils.getMessageValueClass(getConf()));
}
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeByte(start | end);
+ super.write(output);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ byte startEnd = input.readByte();
+ this.start = (byte) (startEnd & 1);
+ this.end = (byte) (startEnd & 2);
+ super.readFields(input);
+ }
+
+ public final void setSegmentStart(boolean segStart) {
+ this.start = (byte) (segStart ? 1 : 0);
+ }
+
+ public final void setSegmentEnd(boolean segEnd) {
+ this.end = (byte) (segEnd ? 2 : 0);
+ }
+
+ public boolean segmentStart() {
+ return start == 1 ? true : false;
+ }
+
+ public boolean segmentEnd() {
+ return end == 2 ? true : false;
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 4175078..c52130d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -31,7 +32,9 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.JobStateUtils;
import edu.uci.ics.pregelix.api.util.SerDeUtils;
/**
@@ -48,7 +51,7 @@
* Message value type
*/
@SuppressWarnings("rawtypes")
-public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
implements Writable {
private static long superstep = 0;
/** Class-wide number of vertices */
@@ -75,6 +78,8 @@
private boolean hasMessage = false;
/** created new vertex */
private boolean createdNewLiveVertex = false;
+ /** terminate the partition */
+ private boolean terminatePartition = false;
/**
* use object pool for re-using objects
@@ -87,12 +92,23 @@
private int usedValue = 0;
/**
- * The key method that users need to implement
+ * The key method that users need to implement to process
+ * incoming messages in each superstep.
+ * 1. In a superstep, this method can be called multiple times in a continuous manner for a single
+ * vertex, each of which is to process a batch of messages. (Note that
+ * this only happens for the case when the mssages for a single vertex
+ * exceed one frame.)
+ * 2. In each superstep, before any invocation of this method for a vertex,
+ * open() is called; after all the invocations of this method for the vertex,
+ * close is called.
+ * 3. In each partition, the vertex Java object is reused
+ * for all the vertice to be processed in the same partition. (The model
+ * is the same as the key-value objects in hadoop map tasks.)
*
* @param msgIterator
* an iterator of incoming messages
*/
- public abstract void compute(Iterator<M> msgIterator);
+ public abstract void compute(Iterator<M> msgIterator) throws Exception;
/**
* Add an edge for the vertex.
@@ -569,4 +585,62 @@
Vertex.context = context;
}
+ @Override
+ public int hashCode() {
+ return vertexId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ Vertex vertex = (Vertex) object;
+ return vertexId.equals(vertex.getVertexId());
+ }
+
+ /**
+ * called immediately before invocations of compute() on a vertex
+ * Users can override this method to initiate the state for a vertex
+ * before the compute() invocations
+ */
+ public void open() {
+
+ }
+
+ /**
+ * called immediately after all the invocations of compute() on a vertex
+ * Users can override this method to initiate the state for a vertex
+ * before the compute() invocations
+ */
+ public void close() {
+
+ }
+
+ /**
+ * Terminate the current partition where the current vertex stays in.
+ * This will immediately take effect and the upcoming vertice in the
+ * same partition cannot be processed.
+ *
+ */
+ protected final void terminatePartition() {
+ voteToHalt();
+ terminatePartition = true;
+ }
+
+ /**
+ * Terminate the Pregelix job.
+ * This will take effect only when the current iteration completed.
+ *
+ * @throws Exception
+ */
+ protected void terminateJob() throws Exception {
+ Configuration conf = getContext().getConfiguration();
+ JobStateUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
+ }
+
+ /***
+ * @return true if the partition is terminated; false otherwise
+ */
+ public boolean isPartitionTerminated() {
+ return terminatePartition;
+ }
+
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/Sizable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/Sizable.java
new file mode 100644
index 0000000..568500b
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/Sizable.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io;
+
+/**
+ * @author yingyib
+ */
+public interface Sizable {
+
+ public int sizeInBytes();
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
index c841b1a..73af190 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
@@ -40,7 +40,7 @@
* Message data
*/
@SuppressWarnings("rawtypes")
-public abstract class VertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+public abstract class VertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> {
/**
* Logically split the vertices for a graph processing application.
* Each {@link InputSplit} is then assigned to a worker for processing.
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
index e6c62ba..ba8b561 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
@@ -39,7 +39,7 @@
* Message data
*/
@SuppressWarnings("rawtypes")
-public interface VertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+public interface VertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> {
/**
* Use the input split and context t o setup reading the vertices.
* Guaranteed to be called prior to any other function.
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/WritableSizable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/WritableSizable.java
new file mode 100644
index 0000000..ee13f76
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/WritableSizable.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.io;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * @author yingyib
+ */
+public interface WritableSizable extends Writable, Sizable {
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
index 985bcff..1d3c427 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
@@ -26,13 +26,14 @@
import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
/**
* This VertexInputFormat is meant for testing/debugging. It simply generates
* some vertex data that can be consumed by test applications.
*/
@SuppressWarnings("rawtypes")
-public abstract class GeneratedVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class GeneratedVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
extends VertexInputFormat<I, V, E, M> {
@Override
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
index 92c8728..376d45d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
@@ -25,6 +25,7 @@
import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
/**
* Used by GeneratedVertexInputFormat to read some generated data
@@ -37,7 +38,7 @@
* Edge value
*/
@SuppressWarnings("rawtypes")
-public abstract class GeneratedVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class GeneratedVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
implements VertexReader<I, V, E, M> {
/** Records read so far */
protected long recordsRead = 0;
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
index 2254ae4..0faf516 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
@@ -30,6 +30,7 @@
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
/**
* Abstract class that users should subclass to use their own text based vertex
@@ -45,7 +46,7 @@
* Message value
*/
@SuppressWarnings("rawtypes")
-public abstract class TextVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public abstract class TextVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
extends VertexInputFormat<I, V, E, M> {
/** Uses the TextInputFormat to do everything */
protected TextInputFormat textInputFormat = new TextInputFormat();
@@ -62,7 +63,7 @@
* @param <E>
* Edge value
*/
- public static abstract class TextVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+ public static abstract class TextVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
implements VertexReader<I, V, E, M> {
/** Internal line record reader */
private final RecordReader<LongWritable, Text> lineRecordReader;
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 4cddaf0..dae7818 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -198,7 +198,7 @@
*
* @param updateHeavyFlag
*/
- final public void setMutationOrVariableSizedUpdateHeavy(boolean variableSizedUpdateHeavyFlag) {
+ final public void setLSMStorage(boolean variableSizedUpdateHeavyFlag) {
getConfiguration().setBoolean(UPDATE_INTENSIVE, variableSizedUpdateHeavyFlag);
}
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
index 7a9e5d5..1683541 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
@@ -141,9 +141,6 @@
used = 0;
this.clear();
int numValues = in.readInt(); // read number of values
- if (numValues > 100) {
- System.out.println("num values: " + numValues);
- }
for (int i = 0; i < numValues; i++) {
M value = allocateValue();
value.readFields(in); // read a value
@@ -153,9 +150,6 @@
public void write(DataOutput out) throws IOException {
int numValues = size();
- if (numValues > 100) {
- System.out.println("write num values: " + numValues);
- }
out.writeInt(numValues); // write number of values
for (int i = 0; i < numValues; i++) {
get(i).write(out);
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 03c37dc..ff4ee91 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -28,6 +28,7 @@
import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.api.job.PregelixJob;
/**
@@ -49,7 +50,7 @@
* @return User's vertex input format class
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends VertexInputFormat<I, V, E, M>> getVertexInputFormatClass(
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> Class<? extends VertexInputFormat<I, V, E, M>> getVertexInputFormatClass(
Configuration conf) {
return (Class<? extends VertexInputFormat<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_INPUT_FORMAT_CLASS,
null, VertexInputFormat.class);
@@ -63,7 +64,7 @@
* @return Instantiated user vertex input format class
*/
@SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> VertexInputFormat<I, V, E, M> createVertexInputFormat(
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> VertexInputFormat<I, V, E, M> createVertexInputFormat(
Configuration conf) {
Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass = getVertexInputFormatClass(conf);
VertexInputFormat<I, V, E, M> inputFormat = ReflectionUtils.newInstance(vertexInputFormatClass, conf);
@@ -106,7 +107,7 @@
* @return User's vertex combiner class
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable, M extends Writable, P extends Writable> Class<? extends MessageCombiner<I, M, P>> getMessageCombinerClass(
+ public static <I extends WritableComparable, M extends WritableSizable, P extends Writable> Class<? extends MessageCombiner<I, M, P>> getMessageCombinerClass(
Configuration conf) {
return (Class<? extends MessageCombiner<I, M, P>>) conf.getClass(PregelixJob.Message_COMBINER_CLASS,
DefaultMessageCombiner.class, MessageCombiner.class);
@@ -120,7 +121,7 @@
* @return User's vertex combiner class
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> Class<? extends GlobalAggregator<I, V, E, M, P, F>> getGlobalAggregatorClass(
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable, F extends Writable> Class<? extends GlobalAggregator<I, V, E, M, P, F>> getGlobalAggregatorClass(
Configuration conf) {
return (Class<? extends GlobalAggregator<I, V, E, M, P, F>>) conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS,
GlobalCountAggregator.class, GlobalAggregator.class);
@@ -138,7 +139,7 @@
* @return Instantiated user vertex combiner class
*/
@SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, M extends Writable, P extends Writable> MessageCombiner<I, M, P> createMessageCombiner(
+ public static <I extends WritableComparable, M extends WritableSizable, P extends Writable> MessageCombiner<I, M, P> createMessageCombiner(
Configuration conf) {
Class<? extends MessageCombiner<I, M, P>> vertexCombinerClass = getMessageCombinerClass(conf);
return ReflectionUtils.newInstance(vertexCombinerClass, conf);
@@ -164,7 +165,7 @@
* @return Instantiated user vertex combiner class
*/
@SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> GlobalAggregator<I, V, E, M, P, F> createGlobalAggregator(
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable, F extends Writable> GlobalAggregator<I, V, E, M, P, F> createGlobalAggregator(
Configuration conf) {
Class<? extends GlobalAggregator<I, V, E, M, P, F>> globalAggregatorClass = getGlobalAggregatorClass(conf);
return ReflectionUtils.newInstance(globalAggregatorClass, conf);
@@ -178,7 +179,7 @@
* @return User's vertex class
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends Vertex<I, V, E, M>> getVertexClass(
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> Class<? extends Vertex<I, V, E, M>> getVertexClass(
Configuration conf) {
return (Class<? extends Vertex<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_CLASS, null, Vertex.class);
}
@@ -191,7 +192,7 @@
* @return Instantiated user vertex
*/
@SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Vertex<I, V, E, M> createVertex(
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable> Vertex<I, V, E, M> createVertex(
Configuration conf) {
Class<? extends Vertex<I, V, E, M>> vertexClass = getVertexClass(conf);
Vertex<I, V, E, M> vertex = ReflectionUtils.newInstance(vertexClass, conf);
@@ -299,7 +300,7 @@
* @return User's vertex message value class
*/
@SuppressWarnings("unchecked")
- public static <M extends Writable> Class<M> getMessageValueClass(Configuration conf) {
+ public static <M extends WritableSizable> Class<M> getMessageValueClass(Configuration conf) {
if (conf == null)
conf = defaultConf;
return (Class<M>) conf.getClass(PregelixJob.MESSAGE_VALUE_CLASS, Writable.class);
@@ -369,7 +370,7 @@
* Configuration to check
* @return Instantiated user vertex message value
*/
- public static <M extends Writable> M createMessageValue(Configuration conf) {
+ public static <M extends WritableSizable> M createMessageValue(Configuration conf) {
Class<M> messageValueClass = getMessageValueClass(conf);
try {
return messageValueClass.newInstance();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
index d2d90a2..feb9e2f 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
@@ -14,42 +14,82 @@
*/
package edu.uci.ics.pregelix.api.util;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
@SuppressWarnings({ "rawtypes", "unchecked" })
-public class DefaultMessageCombiner<I extends WritableComparable, M extends Writable> extends
+public class DefaultMessageCombiner<I extends WritableComparable, M extends WritableSizable> extends
MessageCombiner<I, M, MsgList> {
private MsgList<M> msgList;
+ private int metaSlot = 8;
+ private int accumulatedSize = metaSlot;
@Override
public void init(MsgList providedMsgList) {
+ realInit(providedMsgList);
+ this.msgList.setSegmentStart(false);
+ }
+
+ private void realInit(MsgList providedMsgList) {
this.msgList = providedMsgList;
this.msgList.clearElements();
+ this.accumulatedSize = metaSlot;
}
@Override
public void stepPartial(I vertexIndex, M msg) throws HyracksDataException {
msgList.addElement(msg);
+ accumulatedSize += msg.sizeInBytes();
}
@Override
public void stepFinal(I vertexIndex, MsgList partialAggregate) throws HyracksDataException {
msgList.addAllElements(partialAggregate);
+ for (int i = 0; i < partialAggregate.size(); i++) {
+ accumulatedSize += ((M) partialAggregate.get(i)).sizeInBytes();
+ }
}
@Override
public MsgList finishPartial() {
+ msgList.setSegmentEnd(false);
return msgList;
}
@Override
public MsgList<M> finishFinal() {
+ msgList.setSegmentEnd(false);
return msgList;
}
+ @Override
+ public void initAll(MsgList providedMsgList) {
+ realInit(providedMsgList);
+ msgList.setSegmentStart(true);
+ }
+
+ @Override
+ public MsgList<M> finishFinalAll() {
+ msgList.setSegmentEnd(true);
+ return msgList;
+ }
+
+ @Override
+ public int estimateAccumulatedStateByteSizePartial(I vertexIndex, M msg) throws HyracksDataException {
+ return accumulatedSize + msg.sizeInBytes();
+ }
+
+ @Override
+ public int estimateAccumulatedStateByteSizeFinal(I vertexIndex, MsgList partialAggregate)
+ throws HyracksDataException {
+ int size = accumulatedSize;
+ for (int i = 0; i < partialAggregate.size(); i++) {
+ size += ((M) partialAggregate.get(i)).sizeInBytes();
+ }
+ return size;
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
index ffc6526..9a95f09 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
@@ -21,9 +21,10 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
@SuppressWarnings("rawtypes")
-public class GlobalCountAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public class GlobalCountAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
extends GlobalAggregator<I, V, E, M, LongWritable, LongWritable> {
private LongWritable state = new LongWritable(0);
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/JobStateUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/JobStateUtils.java
new file mode 100644
index 0000000..4a98167
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/JobStateUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * @author yingyib
+ */
+public class JobStateUtils {
+
+ public static final String TMP_DIR = "/tmp/";
+
+ public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = TMP_DIR + jobId + "fterm";
+ Path path = new Path(pathStr);
+ if (!dfs.exists(path)) {
+ FSDataOutputStream output = dfs.create(path, true);
+ output.writeBoolean(true);
+ output.flush();
+ output.close();
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public static boolean readForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = TMP_DIR + jobId + "fterm";
+ Path path = new Path(pathStr);
+ if (dfs.exists(path)) {
+ return true;
+ } else {
+ return false;
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+}