address Vinayak's comments on open/close of vertex
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
index eb11c3a..fa03c0c 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
@@ -85,12 +85,30 @@
     public abstract MsgList<M> finishFinal();
 
     /**
+     * init the combiner for all segmented bags for one key
+     * 
+     * @return the final message List
+     */
+    public void initAll(MsgList providedMsgList) {
+        init(providedMsgList);
+    }
+
+    /**
+     * finish final combiner for all segmented bags for one key
+     * 
+     * @return the final message List
+     */
+    public MsgList<M> finishFinalAll() {
+        return finishFinal();
+    }
+
+    /**
      * @return the accumulated byte size
      */
     public int estimateAccumulatedStateByteSizePartial(I vertexIndex, M msg) throws HyracksDataException {
         return 0;
     }
-    
+
     /**
      * @return the accumulated byte size
      */
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
index 2209200..51b62e4 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
@@ -15,6 +15,10 @@
 
 package edu.uci.ics.pregelix.api.graph;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.util.ArrayListWritable;
 import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -29,6 +33,8 @@
 public class MsgList<M extends WritableSizable> extends ArrayListWritable<M> {
     /** Defining a layout version for a serializable class. */
     private static final long serialVersionUID = 1L;
+    private byte start = 1;
+    private byte end = 2;
 
     /**
      * Default constructor.s
@@ -42,4 +48,34 @@
     public void setClass() {
         setClass((Class<M>) BspUtils.getMessageValueClass(getConf()));
     }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        output.writeByte(start | end);
+        super.write(output);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        byte startEnd = input.readByte();
+        this.start = (byte) (startEnd & 1);
+        this.end = (byte) (startEnd & 2);
+        super.readFields(input);
+    }
+
+    public final void setSegmentStart(boolean segStart) {
+        this.start = (byte) (segStart ? 1 : 0);
+    }
+
+    public final void setSegmentEnd(boolean segEnd) {
+        this.end = (byte) (segEnd ? 2 : 0);
+    }
+
+    public boolean segmentStart() {
+        return start == 1 ? true : false;
+    }
+
+    public boolean segmentEnd() {
+        return end == 2 ? true : false;
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 9cdd8d0..3f0a3ec 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -581,4 +581,18 @@
         return vertexId.equals(vertex.getVertexId());
     }
 
+    /**
+     * called immediately before invocations of compute() on a vertex
+     */
+    public void open() {
+
+    }
+
+    /**
+     * called immediately after all the invocations of compute() on a vertex
+     */
+    public void close() {
+
+    }
+
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
index 50663b3..feb9e2f 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
@@ -30,6 +30,11 @@
 
     @Override
     public void init(MsgList providedMsgList) {
+        realInit(providedMsgList);
+        this.msgList.setSegmentStart(false);
+    }
+
+    private void realInit(MsgList providedMsgList) {
         this.msgList = providedMsgList;
         this.msgList.clearElements();
         this.accumulatedSize = metaSlot;
@@ -51,11 +56,25 @@
 
     @Override
     public MsgList finishPartial() {
+        msgList.setSegmentEnd(false);
         return msgList;
     }
 
     @Override
     public MsgList<M> finishFinal() {
+        msgList.setSegmentEnd(false);
+        return msgList;
+    }
+
+    @Override
+    public void initAll(MsgList providedMsgList) {
+        realInit(providedMsgList);
+        msgList.setSegmentStart(true);
+    }
+
+    @Override
+    public MsgList<M> finishFinalAll() {
+        msgList.setSegmentEnd(true);
         return msgList;
     }
 
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
index 4a0a4e0..c544b31 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
@@ -25,5 +25,9 @@
 
     public void finish() throws HyracksDataException;
 
+    public void initAll() throws HyracksDataException;
+
+    public void finishAll() throws HyracksDataException;
+
     public int estimateStep(IFrameTupleReference tuple) throws HyracksDataException;
 }
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java
index 463e35b..b20f6e4 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java
@@ -42,6 +42,14 @@
     private Random rand = new Random(System.currentTimeMillis());
     private VLongWritable tmpVertexValue = new VLongWritable(0);
     private int numOfMsgClones = 10000;
+    private int numIncomingMsgs = 0;
+
+    @Override
+    public void open() {
+        if (getSuperstep() == 2) {
+            numIncomingMsgs = 0;
+        }
+    }
 
     @Override
     public void compute(Iterator<VLongWritable> msgIterator) {
@@ -54,12 +62,17 @@
             setVertexValue(tmpVertexValue);
         }
         if (getSuperstep() == 2) {
-            long numOfMsg = getVertexValue().get();
             while (msgIterator.hasNext()) {
                 msgIterator.next();
-                numOfMsg++;
+                numIncomingMsgs++;
             }
-            tmpVertexValue.set(numOfMsg);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (getSuperstep() == 2) {
+            tmpVertexValue.set(numIncomingMsgs);
             setVertexValue(tmpVertexValue);
             voteToHalt();
         }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 16ecf6c..1af8519 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -32,8 +32,8 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.util.ArrayListWritable;
 import edu.uci.ics.pregelix.api.util.ArrayListWritable.ArrayIterator;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
@@ -172,7 +172,7 @@
                 vertex.setOutputAppenders(appenders);
                 vertex.setOutputTupleBuilders(tbs);
 
-                ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
+                MsgList msgContentList = (MsgList) tuple[1];
                 msgContentList.reset(msgIterator);
 
                 if (!msgIterator.hasNext() && vertex.isHalted()) {
@@ -183,7 +183,13 @@
                 }
 
                 try {
+                    if (msgContentList.segmentStart()) {
+                        vertex.open();
+                    }
                     vertex.compute(msgIterator);
+                    if (msgContentList.segmentEnd()) {
+                        vertex.close();
+                    }
                     vertex.finishCompute();
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index fa7e0a1..8e9aff7 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -184,7 +184,9 @@
                 }
 
                 try {
+                    vertex.open();
                     vertex.compute(msgIterator);
+                    vertex.close();
                     vertex.finishCompute();
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index 3195619..3d52a45 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -76,7 +76,7 @@
             public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 setGroupKeySize(accessor, tIndex);
-                initAggregateFunctions(state);
+                initAggregateFunctions(state, true);
                 int stateSize = estimateStep(accessor, tIndex, state);
                 if (stateSize > frameSize) {
                     throw new HyracksDataException(
@@ -93,7 +93,7 @@
                 int stateSize = estimateStep(accessor, tIndex, state);
                 if (stateSize > frameSize) {
                     emitResultTuple(accessor, tIndex, state);
-                    initAggregateFunctions(state);
+                    initAggregateFunctions(state, false);
                 }
                 singleStep(accessor, tIndex, state);
             }
@@ -101,7 +101,18 @@
             @Override
             public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                fillResultTupleBuilder(tupleBuilder, state);
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+                IAggregateFunction[] agg = aggState.getRight();
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].finishAll();
+                        tupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
+                                aggOutput[i].getLength());
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
             }
 
             @Override
@@ -120,7 +131,7 @@
 
             }
 
-            private void initAggregateFunctions(AggregateState state) throws HyracksDataException {
+            private void initAggregateFunctions(AggregateState state, boolean all) throws HyracksDataException {
                 Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
                 ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
                 IAggregateFunction[] agg = aggState.getRight();
@@ -131,7 +142,11 @@
                 for (int i = 0; i < agg.length; i++) {
                     aggOutput[i].reset();
                     try {
-                        agg[i].init();
+                        if (all) {
+                            agg[i].initAll();
+                        } else {
+                            agg[i].init();
+                        }
                     } catch (Exception e) {
                         throw new HyracksDataException(e);
                     }
@@ -174,7 +189,18 @@
                 for (int j = 0; j < groupFields.length; j++) {
                     internalTupleBuilder.addField(accessor, tIndex, groupFields[j]);
                 }
-                fillResultTupleBuilder(internalTupleBuilder, state);
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+                IAggregateFunction[] agg = aggState.getRight();
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].finish();
+                        internalTupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
+                                aggOutput[i].getLength());
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
                 if (!appender.appendSkipEmptyField(internalTupleBuilder.getFieldEndOffsets(),
                         internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
                     FrameUtils.flushFrame(outputFrame, writer);
@@ -196,22 +222,6 @@
                 }
             }
 
-            private void fillResultTupleBuilder(ArrayTupleBuilder tupleBuilder, AggregateState state)
-                    throws HyracksDataException {
-                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
-                ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
-                IAggregateFunction[] agg = aggState.getRight();
-                for (int i = 0; i < agg.length; i++) {
-                    try {
-                        agg[i].finish();
-                        tupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
-                                aggOutput[i].getLength());
-                    } catch (Exception e) {
-                        throw new HyracksDataException(e);
-                    }
-                }
-            }
-
         };
     }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
index 6821e31..5bc30a2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -71,6 +71,12 @@
     }
 
     @Override
+    public void initAll() throws HyracksDataException {
+        keyRead = false;
+        combiner.initAll(msgList);
+    }
+
+    @Override
     public void init() throws HyracksDataException {
         keyRead = false;
         combiner.init(msgList);
@@ -100,6 +106,20 @@
     }
 
     @Override
+    public void finishAll() throws HyracksDataException {
+        try {
+            if (!isFinalStage) {
+                combinedResult = combiner.finishPartial();
+            } else {
+                combinedResult = combiner.finishFinalAll();
+            }
+            combinedResult.write(output);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
     public int estimateStep(IFrameTupleReference tuple) throws HyracksDataException {
         FrameTupleReference ftr = (FrameTupleReference) tuple;
         IFrameTupleAccessor fta = ftr.getFrameTupleAccessor();