add/remove edge support in pregelix

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2678 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
index 4af35fe..09a4727 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
@@ -42,7 +42,7 @@
     private E edgeValue = null;
     /** Configuration - Used to instantiate classes */
     private Configuration conf = null;
-    /** Whether the edgeValue field is not null*/
+    /** Whether the edgeValue field is not null */
     private boolean hasEdgeValue = false;
 
     /**
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 6856e9a..da31edc 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -142,45 +142,6 @@
         usedValue = 0;
     }
 
-    private Edge<I, E> allocateEdge() {
-        Edge<I, E> edge;
-        if (usedEdge < edgePool.size()) {
-            edge = edgePool.get(usedEdge);
-            usedEdge++;
-        } else {
-            edge = new Edge<I, E>();
-            edgePool.add(edge);
-            usedEdge++;
-        }
-        return edge;
-    }
-
-    private M allocateMessage() {
-        M message;
-        if (usedMessage < msgPool.size()) {
-            message = msgPool.get(usedEdge);
-            usedMessage++;
-        } else {
-            message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
-            msgPool.add(message);
-            usedMessage++;
-        }
-        return message;
-    }
-
-    private V allocateValue() {
-        V value;
-        if (usedValue < valuePool.size()) {
-            value = valuePool.get(usedEdge);
-            usedValue++;
-        } else {
-            value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
-            valuePool.add(value);
-            usedValue++;
-        }
-        return value;
-    }
-
     /**
      * Set the vertex id
      * 
@@ -201,60 +162,24 @@
     }
 
     /**
-     * Set the global superstep for all the vertices (internal use)
+     * Get the vertex value
      * 
-     * @param superstep
-     *            New superstep
+     * @return the vertex value
      */
-    public static void setSuperstep(long superstep) {
-        Vertex.superstep = superstep;
-    }
-
-    public static long getCurrentSuperstep() {
-        return superstep;
-    }
-
-    public final long getSuperstep() {
-        return superstep;
-    }
-
     public final V getVertexValue() {
         return vertexValue;
     }
 
+    /**
+     * Set the vertex value
+     * 
+     * @param vertexValue
+     */
     public final void setVertexValue(V vertexValue) {
         this.vertexValue = vertexValue;
         this.updated = true;
     }
 
-    /**
-     * Set the total number of vertices from the last superstep.
-     * 
-     * @param numVertices
-     *            Aggregate vertices in the last superstep
-     */
-    public static void setNumVertices(long numVertices) {
-        Vertex.numVertices = numVertices;
-    }
-
-    public final long getNumVertices() {
-        return numVertices;
-    }
-
-    /**
-     * Set the total number of edges from the last superstep.
-     * 
-     * @param numEdges
-     *            Aggregate edges in the last superstep
-     */
-    public static void setNumEdges(long numEdges) {
-        Vertex.numEdges = numEdges;
-    }
-
-    public final long getNumEdges() {
-        return numEdges;
-    }
-
     /***
      * Send a message to a specific vertex
      * 
@@ -352,12 +277,6 @@
         out.writeBoolean(halt);
     }
 
-    private boolean addEdge(Edge<I, E> edge) {
-        edge.setConf(getContext().getConfiguration());
-        destEdgeList.add(edge);
-        return true;
-    }
-
     /**
      * Get the list of incoming messages
      * 
@@ -376,14 +295,7 @@
         return this.destEdgeList;
     }
 
-    public final Mapper<?, ?, ?, ?>.Context getContext() {
-        return context;
-    }
-
-    public final static void setContext(Mapper<?, ?, ?, ?>.Context context) {
-        Vertex.context = context;
-    }
-
+    @Override
     @SuppressWarnings("unchecked")
     public String toString() {
         Collections.sort(destEdgeList);
@@ -396,37 +308,217 @@
         return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
     }
 
-    public void setOutputWriters(List<IFrameWriter> writers) {
-        delegate.setOutputWriters(writers);
-    }
-
-    public void setOutputAppenders(List<FrameTupleAppender> appenders) {
-        delegate.setOutputAppenders(appenders);
-    }
-
-    public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
-        delegate.setOutputTupleBuilders(tbs);
-    }
-
-    public void finishCompute() throws IOException {
-        delegate.finishCompute();
-    }
-
-    public boolean hasUpdate() {
-        return this.updated;
-    }
-
-    public boolean hasMessage() {
-        return this.hasMessage;
-    }
-
+    /**
+     * Get the number of outgoing edges
+     * 
+     * @return the number of outging edges
+     */
     public int getNumOutEdges() {
         return destEdgeList.size();
     }
 
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void setOutputWriters(List<IFrameWriter> writers) {
+        delegate.setOutputWriters(writers);
+    }
+
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+        delegate.setOutputAppenders(appenders);
+    }
+
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+        delegate.setOutputTupleBuilders(tbs);
+    }
+
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void finishCompute() throws IOException {
+        delegate.finishCompute();
+    }
+
+    /**
+     * Pregelix internal use only
+     */
+    public boolean hasUpdate() {
+        return this.updated;
+    }
+
+    /**
+     * Pregelix internal use only
+     */
+    public boolean hasMessage() {
+        return this.hasMessage;
+    }
+
+    /**
+     * sort the edges
+     */
     @SuppressWarnings("unchecked")
     public void sortEdges() {
-        Collections.sort((List) destEdgeList);
+        Collections.sort(destEdgeList);
+    }
+
+    /**
+     * Allocate a new edge from the edge pool
+     */
+    private Edge<I, E> allocateEdge() {
+        Edge<I, E> edge;
+        if (usedEdge < edgePool.size()) {
+            edge = edgePool.get(usedEdge);
+            usedEdge++;
+        } else {
+            edge = new Edge<I, E>();
+            edgePool.add(edge);
+            usedEdge++;
+        }
+        return edge;
+    }
+
+    /**
+     * Allocate a new message from the message pool
+     */
+    private M allocateMessage() {
+        M message;
+        if (usedMessage < msgPool.size()) {
+            message = msgPool.get(usedEdge);
+            usedMessage++;
+        } else {
+            message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
+            msgPool.add(message);
+            usedMessage++;
+        }
+        return message;
+    }
+
+    /**
+     * Set the global superstep for all the vertices (internal use)
+     * 
+     * @param superstep
+     *            New superstep
+     */
+    public static final void setSuperstep(long superstep) {
+        Vertex.superstep = superstep;
+    }
+
+    /**
+     * Add an outgoing edge into the vertex
+     * 
+     * @param edge
+     *            the edge to be added
+     * @return true if the edge list changed as a result of this call
+     */
+    public boolean addEdge(Edge<I, E> edge) {
+        edge.setConf(getContext().getConfiguration());
+        return destEdgeList.add(edge);
+    }
+
+    /**
+     * remove an outgoing edge in the graph
+     * 
+     * @param edge
+     *            the edge to be removed
+     * @return true if the edge is in the edge list of the vertex
+     */
+    public boolean removeEdge(Edge<I, E> edge) {
+        return destEdgeList.remove(edge);
+    }
+
+    /**
+     * Allocate a vertex value from the object pool
+     * 
+     * @return a vertex value instance
+     */
+    private V allocateValue() {
+        V value;
+        if (usedValue < valuePool.size()) {
+            value = valuePool.get(usedValue);
+            usedValue++;
+        } else {
+            value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
+            valuePool.add(value);
+            usedValue++;
+        }
+        return value;
+    }
+
+    /**
+     * Get the current global superstep number
+     * 
+     * @return the current superstep number
+     */
+    public static final long getSuperstep() {
+        return superstep;
+    }
+
+    /**
+     * Set the total number of vertices from the last superstep.
+     * 
+     * @param numVertices
+     *            Aggregate vertices in the last superstep
+     */
+    public static final void setNumVertices(long numVertices) {
+        Vertex.numVertices = numVertices;
+    }
+
+    /**
+     * Get the number of vertexes in the graph
+     * 
+     * @return the number of vertexes in the graph
+     */
+    public static final long getNumVertices() {
+        return numVertices;
+    }
+
+    /**
+     * Set the total number of edges from the last superstep.
+     * 
+     * @param numEdges
+     *            Aggregate edges in the last superstep
+     */
+    public static void setNumEdges(long numEdges) {
+        Vertex.numEdges = numEdges;
+    }
+
+    /**
+     * Get the number of edges from this graph
+     * 
+     * @return the number of edges in the graph
+     */
+    public static final long getNumEdges() {
+        return numEdges;
+    }
+
+    /**
+     * Pregelix internal use only
+     */
+    public static final Mapper<?, ?, ?, ?>.Context getContext() {
+        return context;
+    }
+
+    /**
+     * Pregelix internal use only
+     * 
+     * @param context
+     */
+    public static final void setContext(Mapper<?, ?, ?, ?>.Context context) {
+        Vertex.context = context;
     }
 
 }
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
index 9389ab6..3938613 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
@@ -128,6 +128,7 @@
             indexAccessor = btree.createAccessor();
 
             cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
+            updateBuffer.setFieldCount(btree.getFieldCount());
         } catch (Exception e) {
             treeIndexHelper.deinit();
             throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 69bb6a2..37029f3 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -146,6 +146,7 @@
 
             indexAccessor = btree.createAccessor();
             cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
+            updateBuffer.setFieldCount(btree.getFieldCount());
         } catch (Exception e) {
             treeIndexOpHelper.deinit();
             throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index fbab036..f7b3d62 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -176,7 +176,7 @@
             }
 
             cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
-
+            updateBuffer.setFieldCount(btree.getFieldCount());
         } catch (Exception e) {
             treeIndexOpHelper.deinit();
             throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index de083df..6af60a8 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -139,7 +139,7 @@
                 match = false;
             }
             cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
-
+            updateBuffer.setFieldCount(btree.getFieldCount());
         } catch (Exception e) {
             treeIndexOpHelper.deinit();
             throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
index 85503aa..9a30647 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
@@ -41,7 +41,8 @@
     private final FrameTupleAppender appender;
     private final IHyracksTaskContext ctx;
     private final FrameTupleReference tuple = new FrameTupleReference();
-    private final IFrameTupleAccessor fta;
+    private final int frameSize;
+    private IFrameTupleAccessor fta;
 
     public UpdateBuffer(int numPages, IHyracksTaskContext ctx, int fieldCount) {
         this.appender = new FrameTupleAppender(ctx.getFrameSize());
@@ -50,7 +51,8 @@
         this.appender.reset(buffer, true);
         this.pageLimit = numPages;
         this.ctx = ctx;
-        this.fta = new UpdateBufferTupleAccessor(ctx.getFrameSize(), fieldCount);
+        this.frameSize = ctx.getFrameSize();
+        this.fta = new UpdateBufferTupleAccessor(frameSize, fieldCount);
     }
 
     public UpdateBuffer(IHyracksTaskContext ctx, int fieldCount) {
@@ -58,6 +60,12 @@
         this(1000, ctx, fieldCount);
     }
 
+    public void setFieldCount(int fieldCount) {
+        if (fta.getFieldCount() != fieldCount) {
+            this.fta = new UpdateBufferTupleAccessor(frameSize, fieldCount);
+        }
+    }
+
     public boolean appendTuple(ArrayTupleBuilder tb) throws HyracksDataException {
         if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
             if (currentInUse + 1 < pageLimit) {
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 43b6d17..567e220 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -119,23 +119,23 @@
             Vertex.setNumEdges(numEdges);
             giraphJobIdToSuperStep.put(giraphJobId, superStep);
             giraphJobIdToMove.put(giraphJobId, false);
-            LOGGER.info("start iteration " + Vertex.getCurrentSuperstep());
+            LOGGER.info("start iteration " + Vertex.getSuperstep());
         }
         System.gc();
     }
 
     public synchronized void endSuperStep(String giraphJobId) {
         giraphJobIdToMove.put(giraphJobId, true);
-        LOGGER.info("end iteration " + Vertex.getCurrentSuperstep());
+        LOGGER.info("end iteration " + Vertex.getSuperstep());
     }
 
     @Override
     public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
         final FileReference fRef = ioManager.createWorkspaceFile(prefix);
-        List<FileReference> files = iterationToFiles.get(Vertex.getCurrentSuperstep());
+        List<FileReference> files = iterationToFiles.get(Vertex.getSuperstep());
         if (files == null) {
             files = new ArrayList<FileReference>();
-            iterationToFiles.put(Vertex.getCurrentSuperstep(), files);
+            iterationToFiles.put(Vertex.getSuperstep(), files);
         }
         files.add(fRef);
         return fRef;