Merge fullstack_asterix_stabilization into fullstack_hyracks_result_distribution branch.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2862 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/pom.xml b/pregelix/pregelix-api/pom.xml
index 66a0186..2caa93b 100644
--- a/pregelix/pregelix-api/pom.xml
+++ b/pregelix/pregelix-api/pom.xml
@@ -41,6 +41,7 @@
</plugin>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
+ <version>2.5</version>
<configuration>
<filesets>
<fileset>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
index 4af35fe..e5f42fe 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
@@ -42,7 +42,7 @@
private E edgeValue = null;
/** Configuration - Used to instantiate classes */
private Configuration conf = null;
- /** Whether the edgeValue field is not null*/
+ /** Whether the edgeValue field is not null */
private boolean hasEdgeValue = false;
/**
@@ -115,8 +115,9 @@
destVertexId.readFields(input);
hasEdgeValue = input.readBoolean();
if (hasEdgeValue) {
- if (edgeValue == null)
+ if (edgeValue == null) {
edgeValue = (E) BspUtils.createEdgeValue(getConf());
+ }
edgeValue.readFields(input);
}
}
@@ -128,8 +129,9 @@
}
destVertexId.write(output);
output.writeBoolean(hasEdgeValue);
- if (hasEdgeValue)
+ if (hasEdgeValue) {
edgeValue.write(output);
+ }
}
@Override
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
index 734b1af..8d3d4c6 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
@@ -29,7 +29,7 @@
*/
public class MsgList<M extends Writable> extends ArrayListWritable<M> {
/** Defining a layout version for a serializable class. */
- private static final long serialVersionUID = 100L;
+ private static final long serialVersionUID = 1L;
/**
* Default constructor.s
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 6856e9a..b7f9e3d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -142,45 +142,6 @@
usedValue = 0;
}
- private Edge<I, E> allocateEdge() {
- Edge<I, E> edge;
- if (usedEdge < edgePool.size()) {
- edge = edgePool.get(usedEdge);
- usedEdge++;
- } else {
- edge = new Edge<I, E>();
- edgePool.add(edge);
- usedEdge++;
- }
- return edge;
- }
-
- private M allocateMessage() {
- M message;
- if (usedMessage < msgPool.size()) {
- message = msgPool.get(usedEdge);
- usedMessage++;
- } else {
- message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
- msgPool.add(message);
- usedMessage++;
- }
- return message;
- }
-
- private V allocateValue() {
- V value;
- if (usedValue < valuePool.size()) {
- value = valuePool.get(usedEdge);
- usedValue++;
- } else {
- value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
- valuePool.add(value);
- usedValue++;
- }
- return value;
- }
-
/**
* Set the vertex id
*
@@ -201,60 +162,24 @@
}
/**
- * Set the global superstep for all the vertices (internal use)
+ * Get the vertex value
*
- * @param superstep
- * New superstep
+ * @return the vertex value
*/
- public static void setSuperstep(long superstep) {
- Vertex.superstep = superstep;
- }
-
- public static long getCurrentSuperstep() {
- return superstep;
- }
-
- public final long getSuperstep() {
- return superstep;
- }
-
public final V getVertexValue() {
return vertexValue;
}
+ /**
+ * Set the vertex value
+ *
+ * @param vertexValue
+ */
public final void setVertexValue(V vertexValue) {
this.vertexValue = vertexValue;
this.updated = true;
}
- /**
- * Set the total number of vertices from the last superstep.
- *
- * @param numVertices
- * Aggregate vertices in the last superstep
- */
- public static void setNumVertices(long numVertices) {
- Vertex.numVertices = numVertices;
- }
-
- public final long getNumVertices() {
- return numVertices;
- }
-
- /**
- * Set the total number of edges from the last superstep.
- *
- * @param numEdges
- * Aggregate edges in the last superstep
- */
- public static void setNumEdges(long numEdges) {
- Vertex.numEdges = numEdges;
- }
-
- public final long getNumEdges() {
- return numEdges;
- }
-
/***
* Send a message to a specific vertex
*
@@ -309,6 +234,7 @@
vertexId.readFields(in);
delegate.setVertexId(vertexId);
boolean hasVertexValue = in.readBoolean();
+
if (hasVertexValue) {
vertexValue = allocateValue();
vertexValue.readFields(in);
@@ -352,12 +278,6 @@
out.writeBoolean(halt);
}
- private boolean addEdge(Edge<I, E> edge) {
- edge.setConf(getContext().getConfiguration());
- destEdgeList.add(edge);
- return true;
- }
-
/**
* Get the list of incoming messages
*
@@ -376,14 +296,7 @@
return this.destEdgeList;
}
- public final Mapper<?, ?, ?, ?>.Context getContext() {
- return context;
- }
-
- public final static void setContext(Mapper<?, ?, ?, ?>.Context context) {
- Vertex.context = context;
- }
-
+ @Override
@SuppressWarnings("unchecked")
public String toString() {
Collections.sort(destEdgeList);
@@ -396,37 +309,236 @@
return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
}
- public void setOutputWriters(List<IFrameWriter> writers) {
- delegate.setOutputWriters(writers);
- }
-
- public void setOutputAppenders(List<FrameTupleAppender> appenders) {
- delegate.setOutputAppenders(appenders);
- }
-
- public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
- delegate.setOutputTupleBuilders(tbs);
- }
-
- public void finishCompute() throws IOException {
- delegate.finishCompute();
- }
-
- public boolean hasUpdate() {
- return this.updated;
- }
-
- public boolean hasMessage() {
- return this.hasMessage;
- }
-
+ /**
+ * Get the number of outgoing edges
+ *
+ * @return the number of outging edges
+ */
public int getNumOutEdges() {
return destEdgeList.size();
}
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void setOutputWriters(List<IFrameWriter> writers) {
+ delegate.setOutputWriters(writers);
+ }
+
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+ delegate.setOutputAppenders(appenders);
+ }
+
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+ delegate.setOutputTupleBuilders(tbs);
+ }
+
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void finishCompute() throws IOException {
+ delegate.finishCompute();
+ }
+
+ /**
+ * Pregelix internal use only
+ */
+ public boolean hasUpdate() {
+ return this.updated;
+ }
+
+ /**
+ * Pregelix internal use only
+ */
+ public boolean hasMessage() {
+ return this.hasMessage;
+ }
+
+ /**
+ * sort the edges
+ */
@SuppressWarnings("unchecked")
public void sortEdges() {
- Collections.sort((List) destEdgeList);
+ Collections.sort(destEdgeList);
+ }
+
+ /**
+ * Allocate a new edge from the edge pool
+ */
+ private Edge<I, E> allocateEdge() {
+ Edge<I, E> edge;
+ if (usedEdge < edgePool.size()) {
+ edge = edgePool.get(usedEdge);
+ usedEdge++;
+ } else {
+ edge = new Edge<I, E>();
+ edgePool.add(edge);
+ usedEdge++;
+ }
+ return edge;
+ }
+
+ /**
+ * Allocate a new message from the message pool
+ */
+ private M allocateMessage() {
+ M message;
+ if (usedMessage < msgPool.size()) {
+ message = msgPool.get(usedEdge);
+ usedMessage++;
+ } else {
+ message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
+ msgPool.add(message);
+ usedMessage++;
+ }
+ return message;
+ }
+
+ /**
+ * Set the global superstep for all the vertices (internal use)
+ *
+ * @param superstep
+ * New superstep
+ */
+ public static final void setSuperstep(long superstep) {
+ Vertex.superstep = superstep;
+ }
+
+ /**
+ * Add an outgoing edge into the vertex
+ *
+ * @param edge
+ * the edge to be added
+ * @return true if the edge list changed as a result of this call
+ */
+ public boolean addEdge(Edge<I, E> edge) {
+ edge.setConf(getContext().getConfiguration());
+ return destEdgeList.add(edge);
+ }
+
+ /**
+ * remove an outgoing edge in the graph
+ *
+ * @param edge
+ * the edge to be removed
+ * @return true if the edge is in the edge list of the vertex
+ */
+ public boolean removeEdge(Edge<I, E> edge) {
+ return destEdgeList.remove(edge);
+ }
+
+ /**
+ * Add a new vertex into the graph
+ *
+ * @param vertexId the vertex id
+ * @param vertex the vertex
+ */
+ public final void addVertex(I vertexId, V vertex) {
+ delegate.addVertex(vertexId, vertex);
+ }
+
+ /**
+ * Delete a vertex from id
+ *
+ * @param vertexId the vertex id
+ */
+ public final void deleteVertex(I vertexId) {
+ delegate.deleteVertex(vertexId);
+ }
+
+ /**
+ * Allocate a vertex value from the object pool
+ *
+ * @return a vertex value instance
+ */
+ private V allocateValue() {
+ V value;
+ if (usedValue < valuePool.size()) {
+ value = valuePool.get(usedValue);
+ usedValue++;
+ } else {
+ value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
+ valuePool.add(value);
+ usedValue++;
+ }
+ return value;
+ }
+
+ /**
+ * Get the current global superstep number
+ *
+ * @return the current superstep number
+ */
+ public static final long getSuperstep() {
+ return superstep;
+ }
+
+ /**
+ * Set the total number of vertices from the last superstep.
+ *
+ * @param numVertices
+ * Aggregate vertices in the last superstep
+ */
+ public static final void setNumVertices(long numVertices) {
+ Vertex.numVertices = numVertices;
+ }
+
+ /**
+ * Get the number of vertexes in the graph
+ *
+ * @return the number of vertexes in the graph
+ */
+ public static final long getNumVertices() {
+ return numVertices;
+ }
+
+ /**
+ * Set the total number of edges from the last superstep.
+ *
+ * @param numEdges
+ * Aggregate edges in the last superstep
+ */
+ public static void setNumEdges(long numEdges) {
+ Vertex.numEdges = numEdges;
+ }
+
+ /**
+ * Get the number of edges from this graph
+ *
+ * @return the number of edges in the graph
+ */
+ public static final long getNumEdges() {
+ return numEdges;
+ }
+
+ /**
+ * Pregelix internal use only
+ */
+ public static final Mapper<?, ?, ?, ?>.Context getContext() {
+ return context;
+ }
+
+ /**
+ * Pregelix internal use only
+ *
+ * @param context
+ */
+ public static final void setContext(Mapper<?, ?, ?, ?>.Context context) {
+ Vertex.context = context;
}
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
index 7267f30..d949bc5 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
@@ -44,6 +44,16 @@
private IFrameWriter aliveWriter;
private FrameTupleAppender appenderAlive;
+ /** the tuple for insert */
+ private ArrayTupleBuilder insertTb;
+ private IFrameWriter insertWriter;
+ private FrameTupleAppender appenderInsert;
+
+ /** the tuple for insert */
+ private ArrayTupleBuilder deleteTb;
+ private IFrameWriter deleteWriter;
+ private FrameTupleAppender appenderDelete;
+
/** message list */
private MsgList dummyMessageList = new MsgList();
/** whether alive message should be pushed out */
@@ -95,25 +105,57 @@
this.vertexId = vertexId;
}
+ public final void addVertex(I vertexId, V vertex) {
+ try {
+ insertTb.reset();
+ DataOutput outputInsert = insertTb.getDataOutput();
+ vertexId.write(outputInsert);
+ insertTb.addFieldEndOffset();
+ vertex.write(outputInsert);
+ insertTb.addFieldEndOffset();
+ FrameTupleUtils.flushTuple(appenderInsert, insertTb, insertWriter);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public final void deleteVertex(I vertexId) {
+ try {
+ deleteTb.reset();
+ DataOutput outputDelete = deleteTb.getDataOutput();
+ vertexId.write(outputDelete);
+ deleteTb.addFieldEndOffset();
+ FrameTupleUtils.flushTuple(appenderDelete, deleteTb, deleteWriter);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
public final void setOutputWriters(List<IFrameWriter> outputs) {
msgWriter = outputs.get(0);
- if (outputs.size() > 1) {
- aliveWriter = outputs.get(1);
+ insertWriter = outputs.get(1);
+ deleteWriter = outputs.get(2);
+ if (outputs.size() > 3) {
+ aliveWriter = outputs.get(outputs.size() - 1);
pushAlive = true;
}
}
public final void setOutputAppenders(List<FrameTupleAppender> appenders) {
appenderMsg = appenders.get(0);
- if (appenders.size() > 1) {
- appenderAlive = appenders.get(1);
+ appenderInsert = appenders.get(1);
+ appenderDelete = appenders.get(2);
+ if (appenders.size() > 3) {
+ appenderAlive = appenders.get(appenders.size() - 1);
}
}
public final void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
message = tbs.get(0);
- if (tbs.size() > 1) {
- alive = tbs.get(1);
+ insertTb = tbs.get(1);
+ deleteTb = tbs.get(2);
+ if (tbs.size() > 3) {
+ alive = tbs.get(tbs.size() - 1);
}
}
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
index 7179737..ea33691 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
@@ -21,60 +21,64 @@
import java.io.Serializable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* This InputSplit will not give any ordering or location data. It is used
* internally by BspInputFormat (which determines how many tasks to run the
* application on). Users should not use this directly.
*/
-public class BasicGenInputSplit extends InputSplit implements Writable, Serializable {
- private static final long serialVersionUID = 1L;
- /** Number of splits */
- private int numSplits = -1;
- /** Split index */
- private int splitIndex = -1;
+public class BasicGenInputSplit extends FileSplit implements Writable,
+ Serializable {
+ private static final long serialVersionUID = 1L;
+ /** Number of splits */
+ private int numSplits = -1;
+ /** Split index */
+ private int splitIndex = -1;
- public BasicGenInputSplit() {
- }
+ public BasicGenInputSplit() {
+ super(null, 0, 0, null);
+ }
- public BasicGenInputSplit(int splitIndex, int numSplits) {
- this.splitIndex = splitIndex;
- this.numSplits = numSplits;
- }
+ public BasicGenInputSplit(int splitIndex, int numSplits) {
+ super(null, 0, 0, null);
+ this.splitIndex = splitIndex;
+ this.numSplits = numSplits;
+ }
- @Override
- public long getLength() throws IOException, InterruptedException {
- return 0;
- }
+ @Override
+ public long getLength() {
+ return 0;
+ }
- @Override
- public String[] getLocations() throws IOException, InterruptedException {
- return new String[] {};
- }
+ @Override
+ public String[] getLocations() throws IOException {
+ return new String[] {};
+ }
- @Override
- public void readFields(DataInput in) throws IOException {
- splitIndex = in.readInt();
- numSplits = in.readInt();
- }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ splitIndex = in.readInt();
+ numSplits = in.readInt();
+ }
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(splitIndex);
- out.writeInt(numSplits);
- }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(splitIndex);
+ out.writeInt(numSplits);
+ }
- public int getSplitIndex() {
- return splitIndex;
- }
+ public int getSplitIndex() {
+ return splitIndex;
+ }
- public int getNumSplits() {
- return numSplits;
- }
+ public int getNumSplits() {
+ return numSplits;
+ }
- @Override
- public String toString() {
- return "'" + getClass().getCanonicalName() + ", index=" + getSplitIndex() + ", num=" + getNumSplits();
- }
+ @Override
+ public String toString() {
+ return "'" + getClass().getCanonicalName() + ", index="
+ + getSplitIndex() + ", num=" + getNumSplits();
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 6ef7e13..8b6d1b6 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -60,8 +60,12 @@
public static final String NUM_VERTICE = "pregelix.numVertices";
/** num of edges */
public static final String NUM_EDGES = "pregelix.numEdges";
+ /** increase state length */
+ public static final String INCREASE_STATE_LENGTH = "pregelix.incStateLength";
/** job id */
public static final String JOB_ID = "pregelix.jobid";
+ /** frame size */
+ public static final String FRAME_SIZE = "pregelix.framesize";
/**
* Constructor that will instantiate the configuration
@@ -130,8 +134,8 @@
/**
* Set the global aggregator class (optional)
*
- * @param vertexCombinerClass
- * Determines how vertex messages are combined
+ * @param globalAggregatorClass
+ * Determines how messages are globally aggregated
*/
final public void setGlobalAggregatorClass(Class<?> globalAggregatorClass) {
getConfiguration().setClass(GLOBAL_AGGREGATOR_CLASS, globalAggregatorClass, GlobalAggregator.class);
@@ -139,11 +143,27 @@
/**
* Set the job Id
- *
- * @param vertexCombinerClass
- * Determines how vertex messages are combined
*/
final public void setJobId(String jobId) {
getConfiguration().set(JOB_ID, jobId);
}
+
+ /**
+ * Set whether the vertex state length can be dynamically increased
+ *
+ * @param jobId
+ */
+ final public void setDynamicVertexValueSize(boolean incStateLengthDynamically) {
+ getConfiguration().setBoolean(INCREASE_STATE_LENGTH, incStateLengthDynamically);
+ }
+
+ /**
+ * Set the frame size for a job
+ *
+ * @param frameSize
+ * the desired frame size
+ */
+ final public void setFrameSize(int frameSize) {
+ getConfiguration().setInt(FRAME_SIZE, frameSize);
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 7c4853f..ff9724d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -410,4 +410,26 @@
throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
}
}
+
+ /**
+ * Get the job configuration parameter whether the vertex states will increase dynamically
+ *
+ * @param conf
+ * the job configuration
+ * @return the boolean setting of the parameter, by default it is false
+ */
+ public static boolean getDynamicVertexValueSize(Configuration conf) {
+ return conf.getBoolean(PregelixJob.INCREASE_STATE_LENGTH, false);
+ }
+
+ /**
+ * Get the specified frame size
+ *
+ * @param conf
+ * the job configuration
+ * @return the specified frame size; -1 if it is not set by users
+ */
+ public static int getFrameSize(Configuration conf) {
+ return conf.getInt(PregelixJob.FRAME_SIZE, -1);
+ }
}