Merge fullstack_asterix_stabilization into fullstack_hyracks_result_distribution branch.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2862 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/pom.xml b/pregelix/pregelix-api/pom.xml
index 66a0186..2caa93b 100644
--- a/pregelix/pregelix-api/pom.xml
+++ b/pregelix/pregelix-api/pom.xml
@@ -41,6 +41,7 @@
 			</plugin>
 			<plugin>
 				<artifactId>maven-clean-plugin</artifactId>
+				<version>2.5</version>
 				<configuration>
 					<filesets>
 						<fileset>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
index 4af35fe..e5f42fe 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
@@ -42,7 +42,7 @@
     private E edgeValue = null;
     /** Configuration - Used to instantiate classes */
     private Configuration conf = null;
-    /** Whether the edgeValue field is not null*/
+    /** Whether the edgeValue field is not null */
     private boolean hasEdgeValue = false;
 
     /**
@@ -115,8 +115,9 @@
         destVertexId.readFields(input);
         hasEdgeValue = input.readBoolean();
         if (hasEdgeValue) {
-            if (edgeValue == null)
+            if (edgeValue == null) {
                 edgeValue = (E) BspUtils.createEdgeValue(getConf());
+            }
             edgeValue.readFields(input);
         }
     }
@@ -128,8 +129,9 @@
         }
         destVertexId.write(output);
         output.writeBoolean(hasEdgeValue);
-        if (hasEdgeValue)
+        if (hasEdgeValue) {
             edgeValue.write(output);
+        }
     }
 
     @Override
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
index 734b1af..8d3d4c6 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
@@ -29,7 +29,7 @@
  */
 public class MsgList<M extends Writable> extends ArrayListWritable<M> {
     /** Defining a layout version for a serializable class. */
-    private static final long serialVersionUID = 100L;
+    private static final long serialVersionUID = 1L;
 
     /**
      * Default constructor.s
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 6856e9a..b7f9e3d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -142,45 +142,6 @@
         usedValue = 0;
     }
 
-    private Edge<I, E> allocateEdge() {
-        Edge<I, E> edge;
-        if (usedEdge < edgePool.size()) {
-            edge = edgePool.get(usedEdge);
-            usedEdge++;
-        } else {
-            edge = new Edge<I, E>();
-            edgePool.add(edge);
-            usedEdge++;
-        }
-        return edge;
-    }
-
-    private M allocateMessage() {
-        M message;
-        if (usedMessage < msgPool.size()) {
-            message = msgPool.get(usedEdge);
-            usedMessage++;
-        } else {
-            message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
-            msgPool.add(message);
-            usedMessage++;
-        }
-        return message;
-    }
-
-    private V allocateValue() {
-        V value;
-        if (usedValue < valuePool.size()) {
-            value = valuePool.get(usedEdge);
-            usedValue++;
-        } else {
-            value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
-            valuePool.add(value);
-            usedValue++;
-        }
-        return value;
-    }
-
     /**
      * Set the vertex id
      * 
@@ -201,60 +162,24 @@
     }
 
     /**
-     * Set the global superstep for all the vertices (internal use)
+     * Get the vertex value
      * 
-     * @param superstep
-     *            New superstep
+     * @return the vertex value
      */
-    public static void setSuperstep(long superstep) {
-        Vertex.superstep = superstep;
-    }
-
-    public static long getCurrentSuperstep() {
-        return superstep;
-    }
-
-    public final long getSuperstep() {
-        return superstep;
-    }
-
     public final V getVertexValue() {
         return vertexValue;
     }
 
+    /**
+     * Set the vertex value
+     * 
+     * @param vertexValue
+     */
     public final void setVertexValue(V vertexValue) {
         this.vertexValue = vertexValue;
         this.updated = true;
     }
 
-    /**
-     * Set the total number of vertices from the last superstep.
-     * 
-     * @param numVertices
-     *            Aggregate vertices in the last superstep
-     */
-    public static void setNumVertices(long numVertices) {
-        Vertex.numVertices = numVertices;
-    }
-
-    public final long getNumVertices() {
-        return numVertices;
-    }
-
-    /**
-     * Set the total number of edges from the last superstep.
-     * 
-     * @param numEdges
-     *            Aggregate edges in the last superstep
-     */
-    public static void setNumEdges(long numEdges) {
-        Vertex.numEdges = numEdges;
-    }
-
-    public final long getNumEdges() {
-        return numEdges;
-    }
-
     /***
      * Send a message to a specific vertex
      * 
@@ -309,6 +234,7 @@
         vertexId.readFields(in);
         delegate.setVertexId(vertexId);
         boolean hasVertexValue = in.readBoolean();
+
         if (hasVertexValue) {
             vertexValue = allocateValue();
             vertexValue.readFields(in);
@@ -352,12 +278,6 @@
         out.writeBoolean(halt);
     }
 
-    private boolean addEdge(Edge<I, E> edge) {
-        edge.setConf(getContext().getConfiguration());
-        destEdgeList.add(edge);
-        return true;
-    }
-
     /**
      * Get the list of incoming messages
      * 
@@ -376,14 +296,7 @@
         return this.destEdgeList;
     }
 
-    public final Mapper<?, ?, ?, ?>.Context getContext() {
-        return context;
-    }
-
-    public final static void setContext(Mapper<?, ?, ?, ?>.Context context) {
-        Vertex.context = context;
-    }
-
+    @Override
     @SuppressWarnings("unchecked")
     public String toString() {
         Collections.sort(destEdgeList);
@@ -396,37 +309,236 @@
         return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
     }
 
-    public void setOutputWriters(List<IFrameWriter> writers) {
-        delegate.setOutputWriters(writers);
-    }
-
-    public void setOutputAppenders(List<FrameTupleAppender> appenders) {
-        delegate.setOutputAppenders(appenders);
-    }
-
-    public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
-        delegate.setOutputTupleBuilders(tbs);
-    }
-
-    public void finishCompute() throws IOException {
-        delegate.finishCompute();
-    }
-
-    public boolean hasUpdate() {
-        return this.updated;
-    }
-
-    public boolean hasMessage() {
-        return this.hasMessage;
-    }
-
+    /**
+     * Get the number of outgoing edges
+     * 
+     * @return the number of outging edges
+     */
     public int getNumOutEdges() {
         return destEdgeList.size();
     }
 
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void setOutputWriters(List<IFrameWriter> writers) {
+        delegate.setOutputWriters(writers);
+    }
+
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+        delegate.setOutputAppenders(appenders);
+    }
+
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+        delegate.setOutputTupleBuilders(tbs);
+    }
+
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void finishCompute() throws IOException {
+        delegate.finishCompute();
+    }
+
+    /**
+     * Pregelix internal use only
+     */
+    public boolean hasUpdate() {
+        return this.updated;
+    }
+
+    /**
+     * Pregelix internal use only
+     */
+    public boolean hasMessage() {
+        return this.hasMessage;
+    }
+
+    /**
+     * sort the edges
+     */
     @SuppressWarnings("unchecked")
     public void sortEdges() {
-        Collections.sort((List) destEdgeList);
+        Collections.sort(destEdgeList);
+    }
+
+    /**
+     * Allocate a new edge from the edge pool
+     */
+    private Edge<I, E> allocateEdge() {
+        Edge<I, E> edge;
+        if (usedEdge < edgePool.size()) {
+            edge = edgePool.get(usedEdge);
+            usedEdge++;
+        } else {
+            edge = new Edge<I, E>();
+            edgePool.add(edge);
+            usedEdge++;
+        }
+        return edge;
+    }
+
+    /**
+     * Allocate a new message from the message pool
+     */
+    private M allocateMessage() {
+        M message;
+        if (usedMessage < msgPool.size()) {
+            message = msgPool.get(usedEdge);
+            usedMessage++;
+        } else {
+            message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
+            msgPool.add(message);
+            usedMessage++;
+        }
+        return message;
+    }
+
+    /**
+     * Set the global superstep for all the vertices (internal use)
+     * 
+     * @param superstep
+     *            New superstep
+     */
+    public static final void setSuperstep(long superstep) {
+        Vertex.superstep = superstep;
+    }
+
+    /**
+     * Add an outgoing edge into the vertex
+     * 
+     * @param edge
+     *            the edge to be added
+     * @return true if the edge list changed as a result of this call
+     */
+    public boolean addEdge(Edge<I, E> edge) {
+        edge.setConf(getContext().getConfiguration());
+        return destEdgeList.add(edge);
+    }
+
+    /**
+     * remove an outgoing edge in the graph
+     * 
+     * @param edge
+     *            the edge to be removed
+     * @return true if the edge is in the edge list of the vertex
+     */
+    public boolean removeEdge(Edge<I, E> edge) {
+        return destEdgeList.remove(edge);
+    }
+
+    /**
+     * Add a new vertex into the graph
+     * 
+     * @param vertexId the vertex id
+     * @param vertex the vertex
+     */
+    public final void addVertex(I vertexId, V vertex) {
+        delegate.addVertex(vertexId, vertex);
+    }
+
+    /**
+     * Delete a vertex from id
+     * 
+     * @param vertexId  the vertex id
+     */
+    public final void deleteVertex(I vertexId) {
+        delegate.deleteVertex(vertexId);
+    }
+
+    /**
+     * Allocate a vertex value from the object pool
+     * 
+     * @return a vertex value instance
+     */
+    private V allocateValue() {
+        V value;
+        if (usedValue < valuePool.size()) {
+            value = valuePool.get(usedValue);
+            usedValue++;
+        } else {
+            value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
+            valuePool.add(value);
+            usedValue++;
+        }
+        return value;
+    }
+
+    /**
+     * Get the current global superstep number
+     * 
+     * @return the current superstep number
+     */
+    public static final long getSuperstep() {
+        return superstep;
+    }
+
+    /**
+     * Set the total number of vertices from the last superstep.
+     * 
+     * @param numVertices
+     *            Aggregate vertices in the last superstep
+     */
+    public static final void setNumVertices(long numVertices) {
+        Vertex.numVertices = numVertices;
+    }
+
+    /**
+     * Get the number of vertexes in the graph
+     * 
+     * @return the number of vertexes in the graph
+     */
+    public static final long getNumVertices() {
+        return numVertices;
+    }
+
+    /**
+     * Set the total number of edges from the last superstep.
+     * 
+     * @param numEdges
+     *            Aggregate edges in the last superstep
+     */
+    public static void setNumEdges(long numEdges) {
+        Vertex.numEdges = numEdges;
+    }
+
+    /**
+     * Get the number of edges from this graph
+     * 
+     * @return the number of edges in the graph
+     */
+    public static final long getNumEdges() {
+        return numEdges;
+    }
+
+    /**
+     * Pregelix internal use only
+     */
+    public static final Mapper<?, ?, ?, ?>.Context getContext() {
+        return context;
+    }
+
+    /**
+     * Pregelix internal use only
+     * 
+     * @param context
+     */
+    public static final void setContext(Mapper<?, ?, ?, ?>.Context context) {
+        Vertex.context = context;
     }
 
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
index 7267f30..d949bc5 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
@@ -44,6 +44,16 @@
     private IFrameWriter aliveWriter;
     private FrameTupleAppender appenderAlive;
 
+    /** the tuple for insert */
+    private ArrayTupleBuilder insertTb;
+    private IFrameWriter insertWriter;
+    private FrameTupleAppender appenderInsert;
+
+    /** the tuple for insert */
+    private ArrayTupleBuilder deleteTb;
+    private IFrameWriter deleteWriter;
+    private FrameTupleAppender appenderDelete;
+
     /** message list */
     private MsgList dummyMessageList = new MsgList();
     /** whether alive message should be pushed out */
@@ -95,25 +105,57 @@
         this.vertexId = vertexId;
     }
 
+    public final void addVertex(I vertexId, V vertex) {
+        try {
+            insertTb.reset();
+            DataOutput outputInsert = insertTb.getDataOutput();
+            vertexId.write(outputInsert);
+            insertTb.addFieldEndOffset();
+            vertex.write(outputInsert);
+            insertTb.addFieldEndOffset();
+            FrameTupleUtils.flushTuple(appenderInsert, insertTb, insertWriter);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public final void deleteVertex(I vertexId) {
+        try {
+            deleteTb.reset();
+            DataOutput outputDelete = deleteTb.getDataOutput();
+            vertexId.write(outputDelete);
+            deleteTb.addFieldEndOffset();
+            FrameTupleUtils.flushTuple(appenderDelete, deleteTb, deleteWriter);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
     public final void setOutputWriters(List<IFrameWriter> outputs) {
         msgWriter = outputs.get(0);
-        if (outputs.size() > 1) {
-            aliveWriter = outputs.get(1);
+        insertWriter = outputs.get(1);
+        deleteWriter = outputs.get(2);
+        if (outputs.size() > 3) {
+            aliveWriter = outputs.get(outputs.size() - 1);
             pushAlive = true;
         }
     }
 
     public final void setOutputAppenders(List<FrameTupleAppender> appenders) {
         appenderMsg = appenders.get(0);
-        if (appenders.size() > 1) {
-            appenderAlive = appenders.get(1);
+        appenderInsert = appenders.get(1);
+        appenderDelete = appenders.get(2);
+        if (appenders.size() > 3) {
+            appenderAlive = appenders.get(appenders.size() - 1);
         }
     }
 
     public final void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
         message = tbs.get(0);
-        if (tbs.size() > 1) {
-            alive = tbs.get(1);
+        insertTb = tbs.get(1);
+        deleteTb = tbs.get(2);
+        if (tbs.size() > 3) {
+            alive = tbs.get(tbs.size() - 1);
         }
     }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
index 7179737..ea33691 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
@@ -21,60 +21,64 @@
 import java.io.Serializable;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
 /**
  * This InputSplit will not give any ordering or location data. It is used
  * internally by BspInputFormat (which determines how many tasks to run the
  * application on). Users should not use this directly.
  */
-public class BasicGenInputSplit extends InputSplit implements Writable, Serializable {
-    private static final long serialVersionUID = 1L;
-    /** Number of splits */
-    private int numSplits = -1;
-    /** Split index */
-    private int splitIndex = -1;
+public class BasicGenInputSplit extends FileSplit implements Writable,
+		Serializable {
+	private static final long serialVersionUID = 1L;
+	/** Number of splits */
+	private int numSplits = -1;
+	/** Split index */
+	private int splitIndex = -1;
 
-    public BasicGenInputSplit() {
-    }
+	public BasicGenInputSplit() {
+		super(null, 0, 0, null);
+	}
 
-    public BasicGenInputSplit(int splitIndex, int numSplits) {
-        this.splitIndex = splitIndex;
-        this.numSplits = numSplits;
-    }
+	public BasicGenInputSplit(int splitIndex, int numSplits) {
+		super(null, 0, 0, null);
+		this.splitIndex = splitIndex;
+		this.numSplits = numSplits;
+	}
 
-    @Override
-    public long getLength() throws IOException, InterruptedException {
-        return 0;
-    }
+	@Override
+	public long getLength() {
+		return 0;
+	}
 
-    @Override
-    public String[] getLocations() throws IOException, InterruptedException {
-        return new String[] {};
-    }
+	@Override
+	public String[] getLocations() throws IOException {
+		return new String[] {};
+	}
 
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        splitIndex = in.readInt();
-        numSplits = in.readInt();
-    }
+	@Override
+	public void readFields(DataInput in) throws IOException {
+		splitIndex = in.readInt();
+		numSplits = in.readInt();
+	}
 
-    @Override
-    public void write(DataOutput out) throws IOException {
-        out.writeInt(splitIndex);
-        out.writeInt(numSplits);
-    }
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeInt(splitIndex);
+		out.writeInt(numSplits);
+	}
 
-    public int getSplitIndex() {
-        return splitIndex;
-    }
+	public int getSplitIndex() {
+		return splitIndex;
+	}
 
-    public int getNumSplits() {
-        return numSplits;
-    }
+	public int getNumSplits() {
+		return numSplits;
+	}
 
-    @Override
-    public String toString() {
-        return "'" + getClass().getCanonicalName() + ", index=" + getSplitIndex() + ", num=" + getNumSplits();
-    }
+	@Override
+	public String toString() {
+		return "'" + getClass().getCanonicalName() + ", index="
+				+ getSplitIndex() + ", num=" + getNumSplits();
+	}
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 6ef7e13..8b6d1b6 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -60,8 +60,12 @@
     public static final String NUM_VERTICE = "pregelix.numVertices";
     /** num of edges */
     public static final String NUM_EDGES = "pregelix.numEdges";
+    /** increase state length */
+    public static final String INCREASE_STATE_LENGTH = "pregelix.incStateLength";
     /** job id */
     public static final String JOB_ID = "pregelix.jobid";
+    /** frame size */
+    public static final String FRAME_SIZE = "pregelix.framesize";
 
     /**
      * Constructor that will instantiate the configuration
@@ -130,8 +134,8 @@
     /**
      * Set the global aggregator class (optional)
      * 
-     * @param vertexCombinerClass
-     *            Determines how vertex messages are combined
+     * @param globalAggregatorClass
+     *            Determines how messages are globally aggregated
      */
     final public void setGlobalAggregatorClass(Class<?> globalAggregatorClass) {
         getConfiguration().setClass(GLOBAL_AGGREGATOR_CLASS, globalAggregatorClass, GlobalAggregator.class);
@@ -139,11 +143,27 @@
 
     /**
      * Set the job Id
-     * 
-     * @param vertexCombinerClass
-     *            Determines how vertex messages are combined
      */
     final public void setJobId(String jobId) {
         getConfiguration().set(JOB_ID, jobId);
     }
+
+    /**
+     * Set whether the vertex state length can be dynamically increased
+     * 
+     * @param jobId
+     */
+    final public void setDynamicVertexValueSize(boolean incStateLengthDynamically) {
+        getConfiguration().setBoolean(INCREASE_STATE_LENGTH, incStateLengthDynamically);
+    }
+
+    /**
+     * Set the frame size for a job
+     * 
+     * @param frameSize
+     *            the desired frame size
+     */
+    final public void setFrameSize(int frameSize) {
+        getConfiguration().setInt(FRAME_SIZE, frameSize);
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 7c4853f..ff9724d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -410,4 +410,26 @@
             throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
         }
     }
+
+    /**
+     * Get the job configuration parameter whether the vertex states will increase dynamically
+     * 
+     * @param conf
+     *            the job configuration
+     * @return the boolean setting of the parameter, by default it is false
+     */
+    public static boolean getDynamicVertexValueSize(Configuration conf) {
+        return conf.getBoolean(PregelixJob.INCREASE_STATE_LENGTH, false);
+    }
+
+    /**
+     * Get the specified frame size
+     * 
+     * @param conf
+     *            the job configuration
+     * @return the specified frame size; -1 if it is not set by users
+     */
+    public static int getFrameSize(Configuration conf) {
+        return conf.getInt(PregelixJob.FRAME_SIZE, -1);
+    }
 }