address Vinayak's comments on open/close of vertex
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
index eb11c3a..fa03c0c 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
@@ -85,12 +85,30 @@
public abstract MsgList<M> finishFinal();
/**
+ * init the combiner for all segmented bags for one key
+ *
+ * @return the final message List
+ */
+ public void initAll(MsgList providedMsgList) {
+ init(providedMsgList);
+ }
+
+ /**
+ * finish final combiner for all segmented bags for one key
+ *
+ * @return the final message List
+ */
+ public MsgList<M> finishFinalAll() {
+ return finishFinal();
+ }
+
+ /**
* @return the accumulated byte size
*/
public int estimateAccumulatedStateByteSizePartial(I vertexIndex, M msg) throws HyracksDataException {
return 0;
}
-
+
/**
* @return the accumulated byte size
*/
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
index 2209200..51b62e4 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
@@ -15,6 +15,10 @@
package edu.uci.ics.pregelix.api.graph;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.api.util.ArrayListWritable;
import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -29,6 +33,8 @@
public class MsgList<M extends WritableSizable> extends ArrayListWritable<M> {
/** Defining a layout version for a serializable class. */
private static final long serialVersionUID = 1L;
+ private byte start = 1;
+ private byte end = 2;
/**
* Default constructor.s
@@ -42,4 +48,34 @@
public void setClass() {
setClass((Class<M>) BspUtils.getMessageValueClass(getConf()));
}
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeByte(start | end);
+ super.write(output);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ byte startEnd = input.readByte();
+ this.start = (byte) (startEnd & 1);
+ this.end = (byte) (startEnd & 2);
+ super.readFields(input);
+ }
+
+ public final void setSegmentStart(boolean segStart) {
+ this.start = (byte) (segStart ? 1 : 0);
+ }
+
+ public final void setSegmentEnd(boolean segEnd) {
+ this.end = (byte) (segEnd ? 2 : 0);
+ }
+
+ public boolean segmentStart() {
+ return start == 1 ? true : false;
+ }
+
+ public boolean segmentEnd() {
+ return end == 2 ? true : false;
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 9cdd8d0..3f0a3ec 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -581,4 +581,18 @@
return vertexId.equals(vertex.getVertexId());
}
+ /**
+ * called immediately before invocations of compute() on a vertex
+ */
+ public void open() {
+
+ }
+
+ /**
+ * called immediately after all the invocations of compute() on a vertex
+ */
+ public void close() {
+
+ }
+
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
index 50663b3..feb9e2f 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
@@ -30,6 +30,11 @@
@Override
public void init(MsgList providedMsgList) {
+ realInit(providedMsgList);
+ this.msgList.setSegmentStart(false);
+ }
+
+ private void realInit(MsgList providedMsgList) {
this.msgList = providedMsgList;
this.msgList.clearElements();
this.accumulatedSize = metaSlot;
@@ -51,11 +56,25 @@
@Override
public MsgList finishPartial() {
+ msgList.setSegmentEnd(false);
return msgList;
}
@Override
public MsgList<M> finishFinal() {
+ msgList.setSegmentEnd(false);
+ return msgList;
+ }
+
+ @Override
+ public void initAll(MsgList providedMsgList) {
+ realInit(providedMsgList);
+ msgList.setSegmentStart(true);
+ }
+
+ @Override
+ public MsgList<M> finishFinalAll() {
+ msgList.setSegmentEnd(true);
return msgList;
}
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
index 4a0a4e0..c544b31 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
@@ -25,5 +25,9 @@
public void finish() throws HyracksDataException;
+ public void initAll() throws HyracksDataException;
+
+ public void finishAll() throws HyracksDataException;
+
public int estimateStep(IFrameTupleReference tuple) throws HyracksDataException;
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java
index 463e35b..b20f6e4 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java
@@ -42,6 +42,14 @@
private Random rand = new Random(System.currentTimeMillis());
private VLongWritable tmpVertexValue = new VLongWritable(0);
private int numOfMsgClones = 10000;
+ private int numIncomingMsgs = 0;
+
+ @Override
+ public void open() {
+ if (getSuperstep() == 2) {
+ numIncomingMsgs = 0;
+ }
+ }
@Override
public void compute(Iterator<VLongWritable> msgIterator) {
@@ -54,12 +62,17 @@
setVertexValue(tmpVertexValue);
}
if (getSuperstep() == 2) {
- long numOfMsg = getVertexValue().get();
while (msgIterator.hasNext()) {
msgIterator.next();
- numOfMsg++;
+ numIncomingMsgs++;
}
- tmpVertexValue.set(numOfMsg);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (getSuperstep() == 2) {
+ tmpVertexValue.set(numIncomingMsgs);
setVertexValue(tmpVertexValue);
voteToHalt();
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 16ecf6c..1af8519 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -32,8 +32,8 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.util.ArrayListWritable;
import edu.uci.ics.pregelix.api.util.ArrayListWritable.ArrayIterator;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
@@ -172,7 +172,7 @@
vertex.setOutputAppenders(appenders);
vertex.setOutputTupleBuilders(tbs);
- ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
+ MsgList msgContentList = (MsgList) tuple[1];
msgContentList.reset(msgIterator);
if (!msgIterator.hasNext() && vertex.isHalted()) {
@@ -183,7 +183,13 @@
}
try {
+ if (msgContentList.segmentStart()) {
+ vertex.open();
+ }
vertex.compute(msgIterator);
+ if (msgContentList.segmentEnd()) {
+ vertex.close();
+ }
vertex.finishCompute();
} catch (IOException e) {
throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index fa7e0a1..8e9aff7 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -184,7 +184,9 @@
}
try {
+ vertex.open();
vertex.compute(msgIterator);
+ vertex.close();
vertex.finishCompute();
} catch (IOException e) {
throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index 3195619..3d52a45 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -76,7 +76,7 @@
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
setGroupKeySize(accessor, tIndex);
- initAggregateFunctions(state);
+ initAggregateFunctions(state, true);
int stateSize = estimateStep(accessor, tIndex, state);
if (stateSize > frameSize) {
throw new HyracksDataException(
@@ -93,7 +93,7 @@
int stateSize = estimateStep(accessor, tIndex, state);
if (stateSize > frameSize) {
emitResultTuple(accessor, tIndex, state);
- initAggregateFunctions(state);
+ initAggregateFunctions(state, false);
}
singleStep(accessor, tIndex, state);
}
@@ -101,7 +101,18 @@
@Override
public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- fillResultTupleBuilder(tupleBuilder, state);
+ Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+ ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+ IAggregateFunction[] agg = aggState.getRight();
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ agg[i].finishAll();
+ tupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
+ aggOutput[i].getLength());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
}
@Override
@@ -120,7 +131,7 @@
}
- private void initAggregateFunctions(AggregateState state) throws HyracksDataException {
+ private void initAggregateFunctions(AggregateState state, boolean all) throws HyracksDataException {
Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
IAggregateFunction[] agg = aggState.getRight();
@@ -131,7 +142,11 @@
for (int i = 0; i < agg.length; i++) {
aggOutput[i].reset();
try {
- agg[i].init();
+ if (all) {
+ agg[i].initAll();
+ } else {
+ agg[i].init();
+ }
} catch (Exception e) {
throw new HyracksDataException(e);
}
@@ -174,7 +189,18 @@
for (int j = 0; j < groupFields.length; j++) {
internalTupleBuilder.addField(accessor, tIndex, groupFields[j]);
}
- fillResultTupleBuilder(internalTupleBuilder, state);
+ Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+ ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+ IAggregateFunction[] agg = aggState.getRight();
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ agg[i].finish();
+ internalTupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
+ aggOutput[i].getLength());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
if (!appender.appendSkipEmptyField(internalTupleBuilder.getFieldEndOffsets(),
internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
FrameUtils.flushFrame(outputFrame, writer);
@@ -196,22 +222,6 @@
}
}
- private void fillResultTupleBuilder(ArrayTupleBuilder tupleBuilder, AggregateState state)
- throws HyracksDataException {
- Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
- ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
- IAggregateFunction[] agg = aggState.getRight();
- for (int i = 0; i < agg.length; i++) {
- try {
- agg[i].finish();
- tupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
- aggOutput[i].getLength());
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
- }
-
};
}
}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
index 6821e31..5bc30a2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -71,6 +71,12 @@
}
@Override
+ public void initAll() throws HyracksDataException {
+ keyRead = false;
+ combiner.initAll(msgList);
+ }
+
+ @Override
public void init() throws HyracksDataException {
keyRead = false;
combiner.init(msgList);
@@ -100,6 +106,20 @@
}
@Override
+ public void finishAll() throws HyracksDataException {
+ try {
+ if (!isFinalStage) {
+ combinedResult = combiner.finishPartial();
+ } else {
+ combinedResult = combiner.finishFinalAll();
+ }
+ combinedResult.write(output);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
public int estimateStep(IFrameTupleReference tuple) throws HyracksDataException {
FrameTupleReference ftr = (FrameTupleReference) tuple;
IFrameTupleAccessor fta = ftr.getFrameTupleAccessor();