Merged fullstack_staging -r 2426:2786
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_ioc@2788 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
index 4af35fe..e5f42fe 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
@@ -42,7 +42,7 @@
private E edgeValue = null;
/** Configuration - Used to instantiate classes */
private Configuration conf = null;
- /** Whether the edgeValue field is not null*/
+ /** Whether the edgeValue field is not null */
private boolean hasEdgeValue = false;
/**
@@ -115,8 +115,9 @@
destVertexId.readFields(input);
hasEdgeValue = input.readBoolean();
if (hasEdgeValue) {
- if (edgeValue == null)
+ if (edgeValue == null) {
edgeValue = (E) BspUtils.createEdgeValue(getConf());
+ }
edgeValue.readFields(input);
}
}
@@ -128,8 +129,9 @@
}
destVertexId.write(output);
output.writeBoolean(hasEdgeValue);
- if (hasEdgeValue)
+ if (hasEdgeValue) {
edgeValue.write(output);
+ }
}
@Override
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
index 734b1af..8d3d4c6 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
@@ -29,7 +29,7 @@
*/
public class MsgList<M extends Writable> extends ArrayListWritable<M> {
/** Defining a layout version for a serializable class. */
- private static final long serialVersionUID = 100L;
+ private static final long serialVersionUID = 1L;
/**
* Default constructor.s
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 6856e9a..b7f9e3d 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -142,45 +142,6 @@
usedValue = 0;
}
- private Edge<I, E> allocateEdge() {
- Edge<I, E> edge;
- if (usedEdge < edgePool.size()) {
- edge = edgePool.get(usedEdge);
- usedEdge++;
- } else {
- edge = new Edge<I, E>();
- edgePool.add(edge);
- usedEdge++;
- }
- return edge;
- }
-
- private M allocateMessage() {
- M message;
- if (usedMessage < msgPool.size()) {
- message = msgPool.get(usedEdge);
- usedMessage++;
- } else {
- message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
- msgPool.add(message);
- usedMessage++;
- }
- return message;
- }
-
- private V allocateValue() {
- V value;
- if (usedValue < valuePool.size()) {
- value = valuePool.get(usedEdge);
- usedValue++;
- } else {
- value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
- valuePool.add(value);
- usedValue++;
- }
- return value;
- }
-
/**
* Set the vertex id
*
@@ -201,60 +162,24 @@
}
/**
- * Set the global superstep for all the vertices (internal use)
+ * Get the vertex value
*
- * @param superstep
- * New superstep
+ * @return the vertex value
*/
- public static void setSuperstep(long superstep) {
- Vertex.superstep = superstep;
- }
-
- public static long getCurrentSuperstep() {
- return superstep;
- }
-
- public final long getSuperstep() {
- return superstep;
- }
-
public final V getVertexValue() {
return vertexValue;
}
+ /**
+ * Set the vertex value
+ *
+ * @param vertexValue
+ */
public final void setVertexValue(V vertexValue) {
this.vertexValue = vertexValue;
this.updated = true;
}
- /**
- * Set the total number of vertices from the last superstep.
- *
- * @param numVertices
- * Aggregate vertices in the last superstep
- */
- public static void setNumVertices(long numVertices) {
- Vertex.numVertices = numVertices;
- }
-
- public final long getNumVertices() {
- return numVertices;
- }
-
- /**
- * Set the total number of edges from the last superstep.
- *
- * @param numEdges
- * Aggregate edges in the last superstep
- */
- public static void setNumEdges(long numEdges) {
- Vertex.numEdges = numEdges;
- }
-
- public final long getNumEdges() {
- return numEdges;
- }
-
/***
* Send a message to a specific vertex
*
@@ -309,6 +234,7 @@
vertexId.readFields(in);
delegate.setVertexId(vertexId);
boolean hasVertexValue = in.readBoolean();
+
if (hasVertexValue) {
vertexValue = allocateValue();
vertexValue.readFields(in);
@@ -352,12 +278,6 @@
out.writeBoolean(halt);
}
- private boolean addEdge(Edge<I, E> edge) {
- edge.setConf(getContext().getConfiguration());
- destEdgeList.add(edge);
- return true;
- }
-
/**
* Get the list of incoming messages
*
@@ -376,14 +296,7 @@
return this.destEdgeList;
}
- public final Mapper<?, ?, ?, ?>.Context getContext() {
- return context;
- }
-
- public final static void setContext(Mapper<?, ?, ?, ?>.Context context) {
- Vertex.context = context;
- }
-
+ @Override
@SuppressWarnings("unchecked")
public String toString() {
Collections.sort(destEdgeList);
@@ -396,37 +309,236 @@
return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
}
- public void setOutputWriters(List<IFrameWriter> writers) {
- delegate.setOutputWriters(writers);
- }
-
- public void setOutputAppenders(List<FrameTupleAppender> appenders) {
- delegate.setOutputAppenders(appenders);
- }
-
- public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
- delegate.setOutputTupleBuilders(tbs);
- }
-
- public void finishCompute() throws IOException {
- delegate.finishCompute();
- }
-
- public boolean hasUpdate() {
- return this.updated;
- }
-
- public boolean hasMessage() {
- return this.hasMessage;
- }
-
+ /**
+ * Get the number of outgoing edges
+ *
+ * @return the number of outging edges
+ */
public int getNumOutEdges() {
return destEdgeList.size();
}
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void setOutputWriters(List<IFrameWriter> writers) {
+ delegate.setOutputWriters(writers);
+ }
+
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+ delegate.setOutputAppenders(appenders);
+ }
+
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+ delegate.setOutputTupleBuilders(tbs);
+ }
+
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void finishCompute() throws IOException {
+ delegate.finishCompute();
+ }
+
+ /**
+ * Pregelix internal use only
+ */
+ public boolean hasUpdate() {
+ return this.updated;
+ }
+
+ /**
+ * Pregelix internal use only
+ */
+ public boolean hasMessage() {
+ return this.hasMessage;
+ }
+
+ /**
+ * sort the edges
+ */
@SuppressWarnings("unchecked")
public void sortEdges() {
- Collections.sort((List) destEdgeList);
+ Collections.sort(destEdgeList);
+ }
+
+ /**
+ * Allocate a new edge from the edge pool
+ */
+ private Edge<I, E> allocateEdge() {
+ Edge<I, E> edge;
+ if (usedEdge < edgePool.size()) {
+ edge = edgePool.get(usedEdge);
+ usedEdge++;
+ } else {
+ edge = new Edge<I, E>();
+ edgePool.add(edge);
+ usedEdge++;
+ }
+ return edge;
+ }
+
+ /**
+ * Allocate a new message from the message pool
+ */
+ private M allocateMessage() {
+ M message;
+ if (usedMessage < msgPool.size()) {
+ message = msgPool.get(usedEdge);
+ usedMessage++;
+ } else {
+ message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
+ msgPool.add(message);
+ usedMessage++;
+ }
+ return message;
+ }
+
+ /**
+ * Set the global superstep for all the vertices (internal use)
+ *
+ * @param superstep
+ * New superstep
+ */
+ public static final void setSuperstep(long superstep) {
+ Vertex.superstep = superstep;
+ }
+
+ /**
+ * Add an outgoing edge into the vertex
+ *
+ * @param edge
+ * the edge to be added
+ * @return true if the edge list changed as a result of this call
+ */
+ public boolean addEdge(Edge<I, E> edge) {
+ edge.setConf(getContext().getConfiguration());
+ return destEdgeList.add(edge);
+ }
+
+ /**
+ * remove an outgoing edge in the graph
+ *
+ * @param edge
+ * the edge to be removed
+ * @return true if the edge is in the edge list of the vertex
+ */
+ public boolean removeEdge(Edge<I, E> edge) {
+ return destEdgeList.remove(edge);
+ }
+
+ /**
+ * Add a new vertex into the graph
+ *
+ * @param vertexId the vertex id
+ * @param vertex the vertex
+ */
+ public final void addVertex(I vertexId, V vertex) {
+ delegate.addVertex(vertexId, vertex);
+ }
+
+ /**
+ * Delete a vertex from id
+ *
+ * @param vertexId the vertex id
+ */
+ public final void deleteVertex(I vertexId) {
+ delegate.deleteVertex(vertexId);
+ }
+
+ /**
+ * Allocate a vertex value from the object pool
+ *
+ * @return a vertex value instance
+ */
+ private V allocateValue() {
+ V value;
+ if (usedValue < valuePool.size()) {
+ value = valuePool.get(usedValue);
+ usedValue++;
+ } else {
+ value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
+ valuePool.add(value);
+ usedValue++;
+ }
+ return value;
+ }
+
+ /**
+ * Get the current global superstep number
+ *
+ * @return the current superstep number
+ */
+ public static final long getSuperstep() {
+ return superstep;
+ }
+
+ /**
+ * Set the total number of vertices from the last superstep.
+ *
+ * @param numVertices
+ * Aggregate vertices in the last superstep
+ */
+ public static final void setNumVertices(long numVertices) {
+ Vertex.numVertices = numVertices;
+ }
+
+ /**
+ * Get the number of vertexes in the graph
+ *
+ * @return the number of vertexes in the graph
+ */
+ public static final long getNumVertices() {
+ return numVertices;
+ }
+
+ /**
+ * Set the total number of edges from the last superstep.
+ *
+ * @param numEdges
+ * Aggregate edges in the last superstep
+ */
+ public static void setNumEdges(long numEdges) {
+ Vertex.numEdges = numEdges;
+ }
+
+ /**
+ * Get the number of edges from this graph
+ *
+ * @return the number of edges in the graph
+ */
+ public static final long getNumEdges() {
+ return numEdges;
+ }
+
+ /**
+ * Pregelix internal use only
+ */
+ public static final Mapper<?, ?, ?, ?>.Context getContext() {
+ return context;
+ }
+
+ /**
+ * Pregelix internal use only
+ *
+ * @param context
+ */
+ public static final void setContext(Mapper<?, ?, ?, ?>.Context context) {
+ Vertex.context = context;
}
}
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
index 7267f30..d949bc5 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
@@ -44,6 +44,16 @@
private IFrameWriter aliveWriter;
private FrameTupleAppender appenderAlive;
+ /** the tuple for insert */
+ private ArrayTupleBuilder insertTb;
+ private IFrameWriter insertWriter;
+ private FrameTupleAppender appenderInsert;
+
+ /** the tuple for insert */
+ private ArrayTupleBuilder deleteTb;
+ private IFrameWriter deleteWriter;
+ private FrameTupleAppender appenderDelete;
+
/** message list */
private MsgList dummyMessageList = new MsgList();
/** whether alive message should be pushed out */
@@ -95,25 +105,57 @@
this.vertexId = vertexId;
}
+ public final void addVertex(I vertexId, V vertex) {
+ try {
+ insertTb.reset();
+ DataOutput outputInsert = insertTb.getDataOutput();
+ vertexId.write(outputInsert);
+ insertTb.addFieldEndOffset();
+ vertex.write(outputInsert);
+ insertTb.addFieldEndOffset();
+ FrameTupleUtils.flushTuple(appenderInsert, insertTb, insertWriter);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public final void deleteVertex(I vertexId) {
+ try {
+ deleteTb.reset();
+ DataOutput outputDelete = deleteTb.getDataOutput();
+ vertexId.write(outputDelete);
+ deleteTb.addFieldEndOffset();
+ FrameTupleUtils.flushTuple(appenderDelete, deleteTb, deleteWriter);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
public final void setOutputWriters(List<IFrameWriter> outputs) {
msgWriter = outputs.get(0);
- if (outputs.size() > 1) {
- aliveWriter = outputs.get(1);
+ insertWriter = outputs.get(1);
+ deleteWriter = outputs.get(2);
+ if (outputs.size() > 3) {
+ aliveWriter = outputs.get(outputs.size() - 1);
pushAlive = true;
}
}
public final void setOutputAppenders(List<FrameTupleAppender> appenders) {
appenderMsg = appenders.get(0);
- if (appenders.size() > 1) {
- appenderAlive = appenders.get(1);
+ appenderInsert = appenders.get(1);
+ appenderDelete = appenders.get(2);
+ if (appenders.size() > 3) {
+ appenderAlive = appenders.get(appenders.size() - 1);
}
}
public final void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
message = tbs.get(0);
- if (tbs.size() > 1) {
- alive = tbs.get(1);
+ insertTb = tbs.get(1);
+ deleteTb = tbs.get(2);
+ if (tbs.size() > 3) {
+ alive = tbs.get(tbs.size() - 1);
}
}
}
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 6ef7e13..8b6d1b6 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -60,8 +60,12 @@
public static final String NUM_VERTICE = "pregelix.numVertices";
/** num of edges */
public static final String NUM_EDGES = "pregelix.numEdges";
+ /** increase state length */
+ public static final String INCREASE_STATE_LENGTH = "pregelix.incStateLength";
/** job id */
public static final String JOB_ID = "pregelix.jobid";
+ /** frame size */
+ public static final String FRAME_SIZE = "pregelix.framesize";
/**
* Constructor that will instantiate the configuration
@@ -130,8 +134,8 @@
/**
* Set the global aggregator class (optional)
*
- * @param vertexCombinerClass
- * Determines how vertex messages are combined
+ * @param globalAggregatorClass
+ * Determines how messages are globally aggregated
*/
final public void setGlobalAggregatorClass(Class<?> globalAggregatorClass) {
getConfiguration().setClass(GLOBAL_AGGREGATOR_CLASS, globalAggregatorClass, GlobalAggregator.class);
@@ -139,11 +143,27 @@
/**
* Set the job Id
- *
- * @param vertexCombinerClass
- * Determines how vertex messages are combined
*/
final public void setJobId(String jobId) {
getConfiguration().set(JOB_ID, jobId);
}
+
+ /**
+ * Set whether the vertex state length can be dynamically increased
+ *
+ * @param jobId
+ */
+ final public void setDynamicVertexValueSize(boolean incStateLengthDynamically) {
+ getConfiguration().setBoolean(INCREASE_STATE_LENGTH, incStateLengthDynamically);
+ }
+
+ /**
+ * Set the frame size for a job
+ *
+ * @param frameSize
+ * the desired frame size
+ */
+ final public void setFrameSize(int frameSize) {
+ getConfiguration().setInt(FRAME_SIZE, frameSize);
+ }
}
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 7c4853f..ff9724d 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -410,4 +410,26 @@
throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
}
}
+
+ /**
+ * Get the job configuration parameter whether the vertex states will increase dynamically
+ *
+ * @param conf
+ * the job configuration
+ * @return the boolean setting of the parameter, by default it is false
+ */
+ public static boolean getDynamicVertexValueSize(Configuration conf) {
+ return conf.getBoolean(PregelixJob.INCREASE_STATE_LENGTH, false);
+ }
+
+ /**
+ * Get the specified frame size
+ *
+ * @param conf
+ * the job configuration
+ * @return the specified frame size; -1 if it is not set by users
+ */
+ public static int getFrameSize(Configuration conf) {
+ return conf.getInt(PregelixJob.FRAME_SIZE, -1);
+ }
}
diff --git a/pregelix-core/pom.xml b/pregelix-core/pom.xml
index 0b5a12f..44e9547 100644
--- a/pregelix-core/pom.xml
+++ b/pregelix-core/pom.xml
@@ -329,7 +329,7 @@
<dependency>
<groupId>edu.uci.ics.hyracks.examples</groupId>
<artifactId>hyracks-integration-tests</artifactId>
- <version>0.2.1</version>
+ <version>0.2.3-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index de29dbc..efdbd41 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -74,10 +74,10 @@
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.core.util.DatatypeHelper;
+import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.FileWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
@@ -86,377 +86,462 @@
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
public abstract class JobGen implements IJobGen {
- private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
- protected static final int MB = 1048576;
- protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
- protected static final int frameSize = ClusterConfig.getFrameSize();
- protected static final int maxFrameSize = (int) (((long) 32 * MB) / frameSize);
- protected static final int tableSize = 10485767;
- protected static final String PRIMARY_INDEX = "primary";
- protected final Configuration conf;
- protected final PregelixJob giraphJob;
- protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
- protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
- protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+ private static final Logger LOGGER = Logger.getLogger(JobGen.class
+ .getName());
+ protected static final int MB = 1048576;
+ protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
+ protected static final int tableSize = 10485767;
+ protected static final String PRIMARY_INDEX = "primary";
+ protected final Configuration conf;
+ protected final PregelixJob giraphJob;
+ protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
+ protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
+ protected String jobId = new UUID(System.currentTimeMillis(),
+ System.nanoTime()).toString();
+ protected int frameSize = ClusterConfig.getFrameSize();
+ protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
- protected static final String SECONDARY_INDEX_ODD = "secondary1";
- protected static final String SECONDARY_INDEX_EVEN = "secondary2";
+ protected static final String SECONDARY_INDEX_ODD = "secondary1";
+ protected static final String SECONDARY_INDEX_EVEN = "secondary2";
- public JobGen(PregelixJob job) {
- this.conf = job.getConfiguration();
- this.giraphJob = job;
- this.initJobConfiguration();
- job.setJobId(jobId);
- }
+ public JobGen(PregelixJob job) {
+ this.conf = job.getConfiguration();
+ this.giraphJob = job;
+ this.initJobConfiguration();
+ job.setJobId(jobId);
+
+ //set the frame size to be the one user specified if the user did specify.
+ int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
+ if (specifiedFrameSize > 0) {
+ frameSize = specifiedFrameSize;
+ maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+ }
+ if (maxFrameNumber <= 0) {
+ maxFrameNumber = 1;
+ }
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private void initJobConfiguration() {
- Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS, Vertex.class);
- List<Type> parameterTypes = ReflectionUtils.getTypeArguments(Vertex.class, vertexClass);
- Type vertexIndexType = parameterTypes.get(0);
- Type vertexValueType = parameterTypes.get(1);
- Type edgeValueType = parameterTypes.get(2);
- Type messageValueType = parameterTypes.get(3);
- conf.setClass(PregelixJob.VERTEX_INDEX_CLASS, (Class<?>) vertexIndexType, WritableComparable.class);
- conf.setClass(PregelixJob.VERTEX_VALUE_CLASS, (Class<?>) vertexValueType, Writable.class);
- conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType, Writable.class);
- conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS, (Class<?>) messageValueType, Writable.class);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private void initJobConfiguration() {
+ Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS,
+ Vertex.class);
+ List<Type> parameterTypes = ReflectionUtils.getTypeArguments(
+ Vertex.class, vertexClass);
+ Type vertexIndexType = parameterTypes.get(0);
+ Type vertexValueType = parameterTypes.get(1);
+ Type edgeValueType = parameterTypes.get(2);
+ Type messageValueType = parameterTypes.get(3);
+ conf.setClass(PregelixJob.VERTEX_INDEX_CLASS,
+ (Class<?>) vertexIndexType, WritableComparable.class);
+ conf.setClass(PregelixJob.VERTEX_VALUE_CLASS,
+ (Class<?>) vertexValueType, Writable.class);
+ conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType,
+ Writable.class);
+ conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS,
+ (Class<?>) messageValueType, Writable.class);
- Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
- if (!aggregatorClass.equals(GlobalAggregator.class)) {
- List<Type> argTypes = ReflectionUtils.getTypeArguments(GlobalAggregator.class, aggregatorClass);
- Type partialAggregateValueType = argTypes.get(4);
- conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS, (Class<?>) partialAggregateValueType,
- Writable.class);
- Type finalAggregateValueType = argTypes.get(5);
- conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, (Class<?>) finalAggregateValueType, Writable.class);
- }
+ Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
+ if (!aggregatorClass.equals(GlobalAggregator.class)) {
+ List<Type> argTypes = ReflectionUtils.getTypeArguments(
+ GlobalAggregator.class, aggregatorClass);
+ Type partialAggregateValueType = argTypes.get(4);
+ conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS,
+ (Class<?>) partialAggregateValueType, Writable.class);
+ Type finalAggregateValueType = argTypes.get(5);
+ conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS,
+ (Class<?>) finalAggregateValueType, Writable.class);
+ }
- Class combinerClass = BspUtils.getMessageCombinerClass(conf);
- if (!combinerClass.equals(MessageCombiner.class)) {
- List<Type> argTypes = ReflectionUtils.getTypeArguments(MessageCombiner.class, combinerClass);
- Type partialCombineValueType = argTypes.get(2);
- conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, (Class<?>) partialCombineValueType, Writable.class);
- }
- }
+ Class combinerClass = BspUtils.getMessageCombinerClass(conf);
+ if (!combinerClass.equals(MessageCombiner.class)) {
+ List<Type> argTypes = ReflectionUtils.getTypeArguments(
+ MessageCombiner.class, combinerClass);
+ Type partialCombineValueType = argTypes.get(2);
+ conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS,
+ (Class<?>) partialCombineValueType, Writable.class);
+ }
+ }
- public String getJobId() {
- return jobId;
- }
+ public String getJobId() {
+ return jobId;
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public JobSpecification generateCreatingJob() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
- JobSpecification spec = new JobSpecification();
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public JobSpecification generateCreatingJob() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
+ .getVertexIndexClass(conf);
+ JobSpecification spec = new JobSpecification();
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
+ WritableComparator.get(vertexIdClass).getClass());
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
- TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, btreeCreate);
- return spec;
- }
+ IFileSplitProvider fileSplitProvider = ClusterConfig
+ .getFileSplitProvider(jobId, PRIMARY_INDEX);
+ TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(
+ spec, storageManagerInterface, treeRegistryProvider,
+ fileSplitProvider, typeTraits, comparatorFactories,
+ new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, btreeCreate);
+ return spec;
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public JobSpecification generateLoadingJob() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public JobSpecification generateLoadingJob() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
+ .getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
+ IFileSplitProvider fileSplitProvider = ClusterConfig
+ .getFileSplitProvider(jobId, PRIMARY_INDEX);
- /**
- * the graph file scan operator and use count constraint first, will use
- * absolute constraint later
- */
- VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
- List<InputSplit> splits = new ArrayList<InputSplit>();
- try {
- splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
- LOGGER.info("number of splits: " + splits.size());
- for (InputSplit split : splits)
- LOGGER.info(split.toString());
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
- confFactory);
- ClusterConfig.setLocationConstraint(spec, scanner, splits);
+ /**
+ * the graph file scan operator and use count constraint first, will use
+ * absolute constraint later
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(giraphJob,
+ fileSplitProvider.getFileSplits().length);
+ LOGGER.info("number of splits: " + splits.size());
+ for (InputSplit split : splits)
+ LOGGER.info(split.toString());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils
+ .getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(
+ spec, recordDescriptor, splits, confFactory);
+ ClusterConfig.setLocationConstraint(spec, scanner, splits);
- /**
- * construct sort operator
- */
- int[] sortFields = new int[1];
- sortFields[0] = 0;
- INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
- .getAscINormalizedKeyComputerFactory(vertexIdClass);
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameSize, sortFields,
- nkmFactory, comparatorFactories, recordDescriptor);
- ClusterConfig.setLocationConstraint(spec, sorter);
+ /**
+ * construct sort operator
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+ .getAscINormalizedKeyComputerFactory(vertexIdClass);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
+ WritableComparator.get(vertexIdClass).getClass());
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+ spec, maxFrameNumber, sortFields, nkmFactory,
+ comparatorFactories, recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, sorter);
- /**
- * construct tree bulk load operator
- */
- int[] fieldPermutation = new int[2];
- fieldPermutation[0] = 0;
- fieldPermutation[1] = 1;
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+ /**
+ * construct tree bulk load operator
+ */
+ int[] fieldPermutation = new int[2];
+ fieldPermutation[0] = 0;
+ fieldPermutation[1] = 1;
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(
+ spec, storageManagerInterface, treeRegistryProvider,
+ fileSplitProvider, typeTraits, comparatorFactories,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR,
+ new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
- /**
- * connect operator descriptors
- */
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
- return spec;
- }
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec,
+ hashPartitionComputerFactory), scanner, 0, sorter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0,
+ btreeBulkLoad, 0);
+ return spec;
+ }
- @Override
- public JobSpecification generateJob(int iteration) throws HyracksException {
- if (iteration <= 0)
- throw new IllegalStateException("iteration number cannot be less than 1");
- if (iteration == 1)
- return generateFirstIteration(iteration);
- else
- return generateNonFirstIteration(iteration);
- }
+ @Override
+ public JobSpecification generateJob(int iteration) throws HyracksException {
+ if (iteration <= 0)
+ throw new IllegalStateException(
+ "iteration number cannot be less than 1");
+ if (iteration == 1)
+ return generateFirstIteration(iteration);
+ else
+ return generateNonFirstIteration(iteration);
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public JobSpecification scanSortPrintGraph(String nodeName, String path) throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
- JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanSortPrintGraph(String nodeName, String path)
+ throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
+ .getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
+ JobSpecification spec = new JobSpecification();
+ IFileSplitProvider fileSplitProvider = ClusterConfig
+ .getFileSplitProvider(jobId, PRIMARY_INDEX);
- /**
- * the graph file scan operator and use count constraint first, will use
- * absolute constraint later
- */
- VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
- List<InputSplit> splits = new ArrayList<InputSplit>();
- try {
- splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
- confFactory);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, scanner, splits.size());
+ /**
+ * the graph file scan operator and use count constraint first, will use
+ * absolute constraint later
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(giraphJob,
+ fileSplitProvider.getFileSplits().length);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils
+ .getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(
+ spec, recordDescriptor, splits, confFactory);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, scanner,
+ splits.size());
- /**
- * construct sort operator
- */
- int[] sortFields = new int[1];
- sortFields[0] = 0;
- INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
- .getAscINormalizedKeyComputerFactory(vertexIdClass);
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
- nkmFactory, comparatorFactories, recordDescriptor);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter, splits.size());
+ /**
+ * construct sort operator
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+ .getAscINormalizedKeyComputerFactory(vertexIdClass);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
+ WritableComparator.get(vertexIdClass).getClass());
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+ spec, maxFrameLimit, sortFields, nkmFactory,
+ comparatorFactories, recordDescriptor);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter,
+ splits.size());
- /**
- * construct write file operator
- */
- FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
- FileSplit[] results = new FileSplit[1];
- results[0] = resultFile;
- IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
- IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, inputRdFactory,
- resultFileSplitProvider, preHookFactory, null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
- PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+ /**
+ * construct write file operator
+ */
+ FileSplit resultFile = new FileSplit(nodeName, new FileReference(
+ new File(path)));
+ FileSplit[] results = new FileSplit[1];
+ results[0] = resultFile;
+ IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(
+ results);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils
+ .getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(
+ spec, inputRdFactory, resultFileSplitProvider, preHookFactory,
+ null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer,
+ new String[] { "nc1" });
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
- /**
- * connect operator descriptors
- */
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
- spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
- comparatorFactories), sorter, 0, writer, 0);
- return spec;
- }
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter,
+ 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec,
+ hashPartitionComputerFactory, sortFields, comparatorFactories),
+ sorter, 0, writer, 0);
+ return spec;
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public JobSpecification scanIndexPrintGraph(String nodeName, String path) throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanIndexPrintGraph(String nodeName, String path)
+ throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
+ .getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
- /**
- * construct empty tuple operator
- */
- ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
- tb.addFieldEndOffset();
- ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
- ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
- keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ /**
+ * construct empty tuple operator
+ */
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(
+ spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
+ tb.getSize());
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
- /**
- * construct btree search operator
- */
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ /**
+ * construct btree search operator
+ */
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ RecordDescriptor recordDescriptor = DataflowUtils
+ .getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
+ WritableComparator.get(vertexIdClass).getClass());
+ IFileSplitProvider fileSplitProvider = ClusterConfig
+ .getFileSplitProvider(jobId, PRIMARY_INDEX);
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(
+ spec, recordDescriptor, storageManagerInterface,
+ treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, null, true, true,
+ new BTreeDataflowHelperFactory(), false,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, scanner);
- /**
- * construct write file operator
- */
- FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
- FileSplit[] results = new FileSplit[1];
- results[0] = resultFile;
- IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
- IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, inputRdFactory,
- resultFileSplitProvider, preHookFactory, null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
- PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+ /**
+ * construct write file operator
+ */
+ FileSplit resultFile = new FileSplit(nodeName, new FileReference(
+ new File(path)));
+ FileSplit[] results = new FileSplit[1];
+ results[0] = resultFile;
+ IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(
+ results);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils
+ .getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(
+ spec, inputRdFactory, resultFileSplitProvider, preHookFactory,
+ null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer,
+ new String[] { "nc1" });
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
- /**
- * connect operator descriptors
- */
- int[] sortFields = new int[1];
- sortFields[0] = 0;
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
- spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
- comparatorFactories), scanner, 0, writer, 0);
- spec.setFrameSize(frameSize);
- return spec;
- }
+ /**
+ * connect operator descriptors
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource,
+ 0, scanner, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec,
+ hashPartitionComputerFactory, sortFields, comparatorFactories),
+ scanner, 0, writer, 0);
+ spec.setFrameSize(frameSize);
+ return spec;
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public JobSpecification scanIndexWriteGraph() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanIndexWriteGraph() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
+ .getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
- /**
- * construct empty tuple operator
- */
- ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
- tb.addFieldEndOffset();
- ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
- ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
- keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ /**
+ * construct empty tuple operator
+ */
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(
+ spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
+ tb.getSize());
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
- /**
- * construct btree search operator
- */
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ /**
+ * construct btree search operator
+ */
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ RecordDescriptor recordDescriptor = DataflowUtils
+ .getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
+ WritableComparator.get(vertexIdClass).getClass());
+ IFileSplitProvider fileSplitProvider = ClusterConfig
+ .getFileSplitProvider(jobId, PRIMARY_INDEX);
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(
+ spec, recordDescriptor, storageManagerInterface,
+ treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, null, true, true,
+ new BTreeDataflowHelperFactory(), false,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, scanner);
- /**
- * construct write file operator
- */
- IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, confFactory, inputRdFactory);
- ClusterConfig.setLocationConstraint(spec, writer);
+ /**
+ * construct write file operator
+ */
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils
+ .getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(
+ spec, confFactory, inputRdFactory);
+ ClusterConfig.setLocationConstraint(spec, writer);
- /**
- * connect operator descriptors
- */
- spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
- return spec;
- }
+ /**
+ * connect operator descriptors
+ */
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource,
+ 0, scanner, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer,
+ 0);
+ return spec;
+ }
- /***
- * drop the sindex
- *
- * @return JobSpecification
- * @throws HyracksException
- */
- protected JobSpecification dropIndex(String indexName) throws HyracksException {
- JobSpecification spec = new JobSpecification();
+ /***
+ * drop the sindex
+ *
+ * @return JobSpecification
+ * @throws HyracksException
+ */
+ protected JobSpecification dropIndex(String indexName)
+ throws HyracksException {
+ JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
- TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(spec, storageManagerInterface,
- treeRegistryProvider, fileSplitProvider);
+ IFileSplitProvider fileSplitProvider = ClusterConfig
+ .getFileSplitProvider(jobId, indexName);
+ TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(
+ spec, storageManagerInterface, treeRegistryProvider,
+ fileSplitProvider);
- ClusterConfig.setLocationConstraint(spec, drop);
- spec.addRoot(drop);
- return spec;
- }
+ ClusterConfig.setLocationConstraint(spec, drop);
+ spec.addRoot(drop);
+ return spec;
+ }
- /** generate non-first iteration job */
- protected abstract JobSpecification generateNonFirstIteration(int iteration) throws HyracksException;
+ /** generate non-first iteration job */
+ protected abstract JobSpecification generateNonFirstIteration(int iteration)
+ throws HyracksException;
- /** generate first iteration job */
- protected abstract JobSpecification generateFirstIteration(int iteration) throws HyracksException;
+ /** generate first iteration job */
+ protected abstract JobSpecification generateFirstIteration(int iteration)
+ throws HyracksException;
- /** generate clean-up job */
- public abstract JobSpecification[] generateCleanup() throws HyracksException;
+ /** generate clean-up job */
+ public abstract JobSpecification[] generateCleanup()
+ throws HyracksException;
}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 00cdf07..727e7fe 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -39,6 +39,9 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -127,13 +130,16 @@
MsgList.class.getName());
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), messageValueClass.getName());
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ vertexClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 4,
+ new BTreeDataflowHelperFactory(), inputRdFactory, 6,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
- rdPartialAggregate, rdFinal);
+ rdPartialAggregate, rdInsert, rdDelete, rdFinal);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -173,7 +179,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
@@ -212,9 +218,36 @@
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
ClusterConfig.setLocationConstraint(spec, emptySink);
+ /**
+ * add the insert operator to insert vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, insertOp);
+
+ /**
+ * add the delete operator to delete vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink4);
+
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
@@ -223,7 +256,18 @@
terminateWriter, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
finalAggregator, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 3, btreeBulkLoad, 0);
+
+ /**
+ * connect the insert/delete operator
+ */
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
+ 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
+ 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 5, btreeBulkLoad, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
localGby, 0, globalGby, 0);
@@ -235,6 +279,8 @@
spec.addRoot(btreeBulkLoad);
spec.addRoot(terminateWriter);
spec.addRoot(finalAggregator);
+ spec.addRoot(emptySink3);
+ spec.addRoot(emptySink4);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
spec.setFrameSize(frameSize);
@@ -261,6 +307,9 @@
.getClass());
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ vertexClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
/**
* construct empty tuple operator
@@ -316,8 +365,8 @@
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 4, new ComputeUpdateFunctionFactory(confFactory),
- preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdFinal);
+ new BTreeDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
+ preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
ClusterConfig.setLocationConstraint(spec, join);
/**
@@ -342,7 +391,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
@@ -395,6 +444,32 @@
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+ /**
+ * add the insert operator to insert vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, insertOp);
+
+ /**
+ * add the delete operator to delete vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink4);
+
ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
@@ -406,10 +481,18 @@
spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
terminateWriter, 0);
-
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
finalAggregator, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), join, 3, btreeBulkLoad, 0);
+
+ /**
+ * connect the insert/delete operator
+ */
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), join, 5, btreeBulkLoad, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
localGby, 0, globalGby, 0);
@@ -420,6 +503,8 @@
spec.addRoot(emptySink);
spec.addRoot(btreeBulkLoad);
spec.addRoot(terminateWriter);
+ spec.addRoot(emptySink3);
+ spec.addRoot(emptySink4);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
spec.setFrameSize(frameSize);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 3847aa7..9bad169 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -39,6 +39,9 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -121,13 +124,16 @@
vertexIdClass.getName(), vertexClass.getName());
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), messageValueClass.getName());
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ vertexClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+ new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
- rdPartialAggregate);
+ rdPartialAggregate, rdInsert, rdDelete);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -139,14 +145,15 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
/**
* construct local pre-clustered group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false, false);
+ IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
+ false);
PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localGby);
@@ -156,8 +163,8 @@
*/
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
- .getAccumulatingAggregatorFactory(conf, true, true);
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+ true, true);
PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -193,6 +200,33 @@
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+ /**
+ * add the insert operator to insert vertexes
+ */
+ int[] fieldPermutation = new int[] { 0, 1 };
+ TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, insertOp);
+
+ /**
+ * add the delete operator to delete vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink4);
+
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
/** connect all operators **/
@@ -203,6 +237,20 @@
terminateWriter, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
finalAggregator, 0);
+
+ /**
+ * connect the insert/delete operator
+ */
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
+ 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
+ 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
+ /**
+ * connect the group-by operator
+ */
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
localGby, 0, globalGby, 0);
@@ -213,6 +261,8 @@
spec.addRoot(terminateWriter);
spec.addRoot(finalAggregator);
spec.addRoot(emptySink2);
+ spec.addRoot(emptySink3);
+ spec.addRoot(emptySink4);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
spec.setFrameSize(frameSize);
@@ -239,6 +289,9 @@
.getClass());
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ vertexClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
/**
* construct empty tuple operator
@@ -286,9 +339,9 @@
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
- keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+ keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
- rdPartialAggregate);
+ rdPartialAggregate, rdInsert, rdDelete);
ClusterConfig.setLocationConstraint(spec, join);
/**
@@ -299,14 +352,15 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
/**
* construct local pre-clustered group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false, false);
+ IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
+ false);
PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localGby);
@@ -314,8 +368,8 @@
/**
* construct global group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
- .getAccumulatingAggregatorFactory(conf, true, true);
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+ true, true);
PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -351,6 +405,33 @@
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+ /**
+ * add the insert operator to insert vertexes
+ */
+ int[] fieldPermutation = new int[] { 0, 1 };
+ TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, insertOp);
+
+ /**
+ * add the delete operator to delete vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink4);
+
ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
@@ -364,6 +445,15 @@
terminateWriter, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
finalAggregator, 0);
+
+ /**
+ * connect the insert/delete operator
+ */
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
localGby, 0, globalGby, 0);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index ec783a7..ffdef10 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -38,6 +38,9 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -123,10 +126,14 @@
vertexIdClass.getName(), vertexClass.getName());
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), messageValueClass.getName());
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ vertexClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+ new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
rdPartialAggregate);
ClusterConfig.setLocationConstraint(spec, scanner);
@@ -140,7 +147,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, globalSort);
@@ -185,6 +192,33 @@
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+ /**
+ * add the insert operator to insert vertexes
+ */
+ int[] fieldPermutation = new int[] { 0, 1 };
+ TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, insertOp);
+
+ /**
+ * add the delete operator to delete vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink4);
+
ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
@@ -196,6 +230,17 @@
terminateWriter, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
finalAggregator, 0);
+
+ /**
+ * connect the insert/delete operator
+ */
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
+ 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
+ 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
@@ -204,6 +249,8 @@
spec.addRoot(terminateWriter);
spec.addRoot(finalAggregator);
spec.addRoot(emptySink2);
+ spec.addRoot(emptySink3);
+ spec.addRoot(emptySink4);
spec.setConnectorPolicyAssignmentPolicy(new NonCombinerConnectorPolicyAssignmentPolicy());
spec.setFrameSize(frameSize);
@@ -230,6 +277,9 @@
.getClass());
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ vertexClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
/**
* construct empty tuple operator
@@ -277,7 +327,7 @@
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
- keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+ keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
rdPartialAggregate);
ClusterConfig.setLocationConstraint(spec, join);
@@ -290,7 +340,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, globalSort);
@@ -333,6 +383,31 @@
FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+
+
+ int[] fieldPermutation = new int[] { 0, 1 };
+ TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, insertOp);
+
+ /**
+ * add the delete operator to delete vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
@@ -347,6 +422,16 @@
terminateWriter, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
finalAggregator, 0);
+ /**
+ * connect the insert/delete operator
+ */
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp,
+ 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp,
+ 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index bb939e3..cc12523 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -38,6 +38,9 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -119,10 +122,14 @@
vertexIdClass.getName(), vertexClass.getName());
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), messageValueClass.getName());
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ vertexClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+ new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
rdPartialAggregate);
ClusterConfig.setLocationConstraint(spec, scanner);
@@ -136,7 +143,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
@@ -152,7 +159,7 @@
/**
* construct global sort operator
*/
- ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, globalSort);
@@ -198,6 +205,33 @@
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+ /**
+ * add the insert operator to insert vertexes
+ */
+ int[] fieldPermutation = new int[] { 0, 1 };
+ TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, insertOp);
+
+ /**
+ * add the delete operator to delete vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink4);
+
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
/** connect all operators **/
@@ -208,6 +242,16 @@
terminateWriter, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
finalAggregator, 0);
+ /**
+ * connect the insert/delete operator
+ */
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
+ 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
+ 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
@@ -218,6 +262,8 @@
spec.addRoot(terminateWriter);
spec.addRoot(finalAggregator);
spec.addRoot(emptySink2);
+ spec.addRoot(emptySink3);
+ spec.addRoot(emptySink4);
spec.setFrameSize(frameSize);
return spec;
@@ -243,6 +289,9 @@
.getClass());
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ vertexClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
/**
* construct empty tuple operator
@@ -290,7 +339,7 @@
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
- keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+ keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
rdPartialAggregate);
ClusterConfig.setLocationConstraint(spec, join);
@@ -303,7 +352,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
@@ -319,7 +368,7 @@
/**
* construct global sort operator
*/
- ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, globalSort);
@@ -363,6 +412,33 @@
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+ /**
+ * add the insert operator to insert vertexes
+ */
+ int[] fieldPermutation = new int[] { 0, 1 };
+ TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, insertOp);
+
+ /**
+ * add the delete operator to delete vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink4);
+
ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
@@ -376,6 +452,14 @@
terminateWriter, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
finalAggregator, 0);
+ /**
+ * connect the insert/delete operator
+ */
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
@@ -386,6 +470,8 @@
spec.addRoot(terminateWriter);
spec.addRoot(finalAggregator);
spec.addRoot(emptySink);
+ spec.addRoot(emptySink3);
+ spec.addRoot(emptySink4);
spec.setFrameSize(frameSize);
return spec;
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index ce3f949..98d9612 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -52,7 +52,8 @@
ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
ccConfig.defaultMaxJobAttempts = 0;
- ccConfig.jobHistorySize = 10;
+ ccConfig.jobHistorySize = 0;
+ ccConfig.profileDumpPeriod = -1;
// cluster controller
cc = new ClusterControllerService(ccConfig);
diff --git a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index 7596782..572bff9 100644
--- a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -62,7 +62,7 @@
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
import edu.uci.ics.pregelix.core.util.TestUtils;
-import edu.uci.ics.pregelix.dataflow.std.FileWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.ProjectOperatorDescriptor;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
@@ -193,7 +193,7 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -360,7 +360,7 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -457,7 +457,7 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -564,7 +564,7 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
diff --git a/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java b/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
index 62f92dd..a0d365f 100644
--- a/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
+++ b/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
@@ -16,17 +16,19 @@
package edu.uci.ics.pregelix.dataflow.std.base;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
public interface IUpdateFunction extends IFunction {
- /**
- * update the tuple pointed by tupleRef called after process,
- * one-input-tuple-at-a-time
- *
- * @param tupleRef
- * @throws HyracksDataException
- */
- public void update(ITupleReference tupleRef) throws HyracksDataException;
+ /**
+ * update the tuple pointed by tupleRef called after process,
+ * one-input-tuple-at-a-time
+ *
+ * @param tupleRef
+ * @throws HyracksDataException
+ */
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb)
+ throws HyracksDataException;
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
index fb84aa0..3938613 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
@@ -43,6 +43,7 @@
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
public class BTreeSearchFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
protected TreeIndexDataflowHelper treeIndexHelper;
@@ -70,6 +71,8 @@
private final IFrameWriter[] writers;
private final FunctionProxy functionProxy;
+ private ArrayTupleBuilder cloneUpdateTb;
+ private final UpdateBuffer updateBuffer;
public BTreeSearchFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -94,6 +97,7 @@
this.writers = new IFrameWriter[outputArity];
this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
writers);
+ this.updateBuffer = new UpdateBuffer(ctx, 2);
}
@Override
@@ -122,6 +126,9 @@
appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
indexAccessor = btree.createAccessor();
+
+ cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
+ updateBuffer.setFieldCount(btree.getFieldCount());
} catch (Exception e) {
treeIndexHelper.deinit();
throw new HyracksDataException(e);
@@ -136,7 +143,24 @@
while (cursor.hasNext()) {
cursor.next();
ITupleReference tuple = cursor.getTuple();
- functionProxy.functionCall(tuple);
+ functionProxy.functionCall(tuple, cloneUpdateTb);
+
+ //doing clone update
+ if (cloneUpdateTb.getSize() > 0) {
+ if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+ //release the cursor/latch
+ cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
+
+ //search again
+ cursor.reset();
+ rangePred.setLowKey(tuple, true);
+ rangePred.setHighKey(highKey, highKeyInclusive);
+ indexAccessor.search(cursor, rangePred);
+ }
+ }
+ cloneUpdateTb.reset();
}
}
@@ -168,6 +192,8 @@
try {
try {
cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 75a8087..37029f3 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.pregelix.dataflow.std;
-import java.io.DataOutput;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
@@ -44,6 +43,7 @@
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
public class IndexNestedLoopJoinFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
private TreeIndexDataflowHelper treeIndexOpHelper;
@@ -51,9 +51,6 @@
private ByteBuffer writeBuffer;
private FrameTupleAppender appender;
- private ArrayTupleBuilder tb;
- private DataOutput dos;
-
private BTree btree;
private PermutingFrameTupleReference lowKey;
private PermutingFrameTupleReference highKey;
@@ -67,17 +64,16 @@
protected ITreeIndexAccessor indexAccessor;
private RecordDescriptor recDesc;
- private final RecordDescriptor inputRecDesc;
-
private final IFrameWriter[] writers;
private final FunctionProxy functionProxy;
+ private ArrayTupleBuilder cloneUpdateTb;
+ private final UpdateBuffer updateBuffer;
public IndexNestedLoopJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
IRuntimeHookFactory postHookFactory, IRecordDescriptorFactory inputRdFactory, int outputArity) {
- inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
opDesc, ctx, partition);
this.lowKeyInclusive = lowKeyInclusive;
@@ -95,6 +91,7 @@
this.writers = new IFrameWriter[outputArity];
this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
writers);
+ this.updateBuffer = new UpdateBuffer(ctx, 2);
}
protected void setCursor() {
@@ -144,12 +141,12 @@
rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
highKeySearchCmp);
writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
- tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
- dos = tb.getDataOutput();
appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
indexAccessor = btree.createAccessor();
+ cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
+ updateBuffer.setFieldCount(btree.getFieldCount());
} catch (Exception e) {
treeIndexOpHelper.deinit();
throw new HyracksDataException(e);
@@ -158,27 +155,29 @@
private void writeSearchResults(IFrameTupleAccessor leftAccessor, int tIndex) throws Exception {
while (cursor.hasNext()) {
- tb.reset();
cursor.next();
-
ITupleReference tupleRef = cursor.getTuple();
- for (int i = 0; i < inputRecDesc.getFields().length; i++) {
- int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
- int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
- int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
- int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
- dos.write(leftAccessor.getBuffer().array(), offset, len);
- tb.addFieldEndOffset();
- }
- for (int i = 0; i < tupleRef.getFieldCount(); i++) {
- dos.write(tupleRef.getFieldData(i), tupleRef.getFieldStart(i), tupleRef.getFieldLength(i));
- tb.addFieldEndOffset();
- }
/**
* call the update function
*/
- functionProxy.functionCall(tb, tupleRef);
+ functionProxy.functionCall(leftAccessor, tIndex, tupleRef, cloneUpdateTb);
+
+ if (cloneUpdateTb.getSize() > 0) {
+ if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+ //release the cursor/latch
+ cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
+
+ //search again
+ cursor.reset();
+ rangePred.setLowKey(tupleRef, true);
+ rangePred.setHighKey(highKey, highKeyInclusive);
+ indexAccessor.search(cursor, rangePred);
+ }
+ }
+ cloneUpdateTb.reset();
}
}
@@ -210,6 +209,8 @@
try {
try {
cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index c31ebd4..f7b3d62 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -45,6 +45,7 @@
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
public class IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable extends
AbstractUnaryInputOperatorNodePushable {
@@ -53,7 +54,7 @@
private ByteBuffer writeBuffer;
private FrameTupleAppender appender;
- private ArrayTupleBuilder tb;
+ private ArrayTupleBuilder nullTupleBuilder;
private DataOutput dos;
private BTree btree;
@@ -76,6 +77,8 @@
private final IFrameWriter[] writers;
private final FunctionProxy functionProxy;
+ private ArrayTupleBuilder cloneUpdateTb;
+ private UpdateBuffer updateBuffer;
public IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -100,6 +103,7 @@
this.writers = new IFrameWriter[outputArity];
this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
writers);
+ this.updateBuffer = new UpdateBuffer(ctx, 2);
}
protected void setCursor() {
@@ -144,8 +148,15 @@
rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
- tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
- dos = tb.getDataOutput();
+
+ nullTupleBuilder = new ArrayTupleBuilder(inputRecDesc.getFields().length);
+ dos = nullTupleBuilder.getDataOutput();
+ nullTupleBuilder.reset();
+ for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+ nullWriter[i].writeNull(dos);
+ nullTupleBuilder.addFieldEndOffset();
+ }
+
appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
@@ -164,32 +175,38 @@
match = false;
}
+ cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
+ updateBuffer.setFieldCount(btree.getFieldCount());
} catch (Exception e) {
treeIndexOpHelper.deinit();
throw new HyracksDataException(e);
}
}
+ //for the join match casesos
private void writeResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
throws Exception {
- tb.reset();
- for (int i = 0; i < inputRecDesc.getFields().length; i++) {
- int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
- int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
- int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
- int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
- dos.write(leftAccessor.getBuffer().array(), offset, len);
- tb.addFieldEndOffset();
- }
- for (int i = 0; i < frameTuple.getFieldCount(); i++) {
- dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
- tb.addFieldEndOffset();
- }
-
/**
* function call
*/
- functionProxy.functionCall(tb, frameTuple);
+ functionProxy.functionCall(leftAccessor, tIndex, frameTuple, cloneUpdateTb);
+
+ //doing clone update
+ if (cloneUpdateTb.getSize() > 0) {
+ if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+ //release the cursor/latch
+ cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
+
+ //search again and recover the cursor
+ cursor.reset();
+ rangePred.setLowKey(frameTuple, true);
+ rangePred.setHighKey(null, true);
+ indexAccessor.search(cursor, rangePred);
+ }
+ cloneUpdateTb.reset();
+ }
}
@Override
@@ -243,6 +260,8 @@
}
try {
cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
} catch (Exception e) {
throw new HyracksDataException(e);
}
@@ -271,20 +290,27 @@
/** write result for outer case */
private void writeResults(ITupleReference frameTuple) throws Exception {
- tb.reset();
- for (int i = 0; i < inputRecDesc.getFields().length; i++) {
- nullWriter[i].writeNull(dos);
- tb.addFieldEndOffset();
- }
- for (int i = 0; i < frameTuple.getFieldCount(); i++) {
- dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
- tb.addFieldEndOffset();
- }
-
/**
* function call
*/
- functionProxy.functionCall(tb, frameTuple);
+ functionProxy.functionCall(nullTupleBuilder, frameTuple, cloneUpdateTb);
+
+ //doing clone update
+ if (cloneUpdateTb.getSize() > 0) {
+ if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+ //release the cursor/latch
+ cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
+
+ //search again and recover the cursor
+ cursor.reset();
+ rangePred.setLowKey(frameTuple, true);
+ rangePred.setHighKey(null, true);
+ indexAccessor.search(cursor, rangePred);
+ }
+ cloneUpdateTb.reset();
+ }
}
@Override
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 0a966b5..6af60a8 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.pregelix.dataflow.std;
-import java.io.DataOutput;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
@@ -44,6 +43,7 @@
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
public class IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
private TreeIndexDataflowHelper treeIndexOpHelper;
@@ -51,8 +51,6 @@
private ByteBuffer writeBuffer;
private FrameTupleAppender appender;
- private ArrayTupleBuilder tb;
- private DataOutput dos;
private BTree btree;
private boolean isForward;
@@ -63,8 +61,6 @@
protected ITreeIndexAccessor indexAccessor;
private RecordDescriptor recDesc;
- private final RecordDescriptor inputRecDesc;
-
private PermutingFrameTupleReference lowKey;
private PermutingFrameTupleReference highKey;
@@ -73,13 +69,14 @@
private final IFrameWriter[] writers;
private final FunctionProxy functionProxy;
+ private ArrayTupleBuilder cloneUpdateTb;
+ private UpdateBuffer updateBuffer;
public IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
int[] lowKeyFields, int[] highKeyFields, IUpdateFunctionFactory functionFactory,
IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
IRecordDescriptorFactory inputRdFactory, int outputArity) {
- inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
opDesc, ctx, partition);
this.isForward = isForward;
@@ -97,6 +94,7 @@
this.writers = new IFrameWriter[outputArity];
this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
writers);
+ this.updateBuffer = new UpdateBuffer(ctx, 2);
}
protected void setCursor() {
@@ -123,8 +121,6 @@
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
- tb = new ArrayTupleBuilder(btree.getFieldCount());
- dos = tb.getDataOutput();
appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
@@ -142,7 +138,8 @@
currentTopTuple = cursor.getTuple();
match = false;
}
-
+ cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
+ updateBuffer.setFieldCount(btree.getFieldCount());
} catch (Exception e) {
treeIndexOpHelper.deinit();
throw new HyracksDataException(e);
@@ -207,6 +204,9 @@
}
try {
cursor.close();
+
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
} catch (Exception e) {
throw new HyracksDataException(e);
}
@@ -231,29 +231,47 @@
/** write the right result */
private void writeRightResults(ITupleReference frameTuple) throws Exception {
- tb.reset();
- for (int i = 0; i < frameTuple.getFieldCount(); i++) {
- dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
- tb.addFieldEndOffset();
- }
+ functionProxy.functionCall(frameTuple, cloneUpdateTb);
- functionProxy.functionCall(tb, frameTuple);
+ //doing clone update
+ if (cloneUpdateTb.getSize() > 0) {
+ if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+ //release the cursor/latch
+ cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
+
+ //search again
+ cursor.reset();
+ rangePred.setLowKey(frameTuple, true);
+ rangePred.setHighKey(null, true);
+ indexAccessor.search(cursor, rangePred);
+ }
+ cloneUpdateTb.reset();
+ }
}
/** write the left result */
private void writeLeftResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
throws Exception {
- tb.reset();
- for (int i = 0; i < inputRecDesc.getFields().length; i++) {
- int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
- int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
- int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
- int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
- dos.write(leftAccessor.getBuffer().array(), offset, len);
- tb.addFieldEndOffset();
- }
+ functionProxy.functionCall(leftAccessor, tIndex, frameTuple, cloneUpdateTb);
- functionProxy.functionCall(tb, frameTuple);
+ //doing clone update
+ if (cloneUpdateTb.getSize() > 0) {
+ if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+ //release the cursor/latch
+ cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
+
+ //search again
+ cursor.reset();
+ rangePred.setLowKey(frameTuple, true);
+ rangePred.setHighKey(null, true);
+ indexAccessor.search(cursor, rangePred);
+ }
+ cloneUpdateTb.reset();
+ }
}
@Override
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index 4b0f4a5..82ac18e 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -15,6 +15,7 @@
package edu.uci.ics.pregelix.dataflow.util;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -68,16 +69,19 @@
/**
* Call the function
*
- * @param tb
- * input data
+ * @param leftAccessor
+ * input page accessor
+ * @param leftTupleIndex
+ * the tuple index in the page
* @param updateRef
* update pointer
* @throws HyracksDataException
*/
- public void functionCall(ArrayTupleBuilder tb, ITupleReference updateRef) throws HyracksDataException {
- Object[] tuple = tupleDe.deserializeRecord(tb);
+ public void functionCall(IFrameTupleAccessor leftAccessor, int leftTupleIndex, ITupleReference right,
+ ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+ Object[] tuple = tupleDe.deserializeRecord(leftAccessor, leftTupleIndex, right);
function.process(tuple);
- function.update(updateRef);
+ function.update(right, cloneUpdateTb);
}
/**
@@ -86,10 +90,26 @@
* @param updateRef
* @throws HyracksDataException
*/
- public void functionCall(ITupleReference updateRef) throws HyracksDataException {
+ public void functionCall(ITupleReference updateRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
Object[] tuple = tupleDe.deserializeRecord(updateRef);
function.process(tuple);
- function.update(updateRef);
+ function.update(updateRef, cloneUpdateTb);
+ }
+
+ /**
+ * Call the function
+ *
+ * @param tb
+ * input data
+ * @param inPlaceUpdateRef
+ * update pointer
+ * @throws HyracksDataException
+ */
+ public void functionCall(ArrayTupleBuilder tb, ITupleReference inPlaceUpdateRef, ArrayTupleBuilder cloneUpdateTb)
+ throws HyracksDataException {
+ Object[] tuple = tupleDe.deserializeRecord(tb, inPlaceUpdateRef);
+ function.process(tuple);
+ function.update(inPlaceUpdateRef, cloneUpdateTb);
}
/**
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
index 5ae1d81..4fe83db 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
@@ -20,6 +20,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -42,7 +43,7 @@
}
public Object[] deserializeRecord(ITupleReference tupleRef) throws HyracksDataException {
- for (int i = 0; i < record.length; ++i) {
+ for (int i = 0; i < tupleRef.getFieldCount(); ++i) {
byte[] data = tupleRef.getFieldData(i);
int offset = tupleRef.getFieldStart(i);
bbis.setByteArray(data, offset);
@@ -65,11 +66,65 @@
return record;
}
- public Object[] deserializeRecord(ArrayTupleBuilder tb) throws HyracksDataException {
+ public Object[] deserializeRecord(IFrameTupleAccessor left, int tIndex, ITupleReference right)
+ throws HyracksDataException {
+ byte[] data = left.getBuffer().array();
+ int tStart = left.getTupleStartOffset(tIndex) + left.getFieldSlotsLength();
+ int leftFieldCount = left.getFieldCount();
+ int fStart = tStart;
+ for (int i = 0; i < leftFieldCount; ++i) {
+ /**
+ * reset the input
+ */
+ fStart = tStart + left.getFieldStartOffset(tIndex, i);
+ bbis.setByteArray(data, fStart);
+
+ /**
+ * do deserialization
+ */
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest(i + " " + instance);
+ }
+ record[i] = instance;
+ if (FrameConstants.DEBUG_FRAME_IO) {
+ try {
+ if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+ throw new HyracksDataException("Field magic mismatch");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ for (int i = leftFieldCount; i < record.length; ++i) {
+ byte[] rightData = right.getFieldData(i - leftFieldCount);
+ int rightOffset = right.getFieldStart(i - leftFieldCount);
+ bbis.setByteArray(rightData, rightOffset);
+
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest(i + " " + instance);
+ }
+ record[i] = instance;
+ if (FrameConstants.DEBUG_FRAME_IO) {
+ try {
+ if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+ throw new HyracksDataException("Field magic mismatch");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ return record;
+ }
+
+ public Object[] deserializeRecord(ArrayTupleBuilder tb, ITupleReference right) throws HyracksDataException {
byte[] data = tb.getByteArray();
int[] offset = tb.getFieldEndOffsets();
int start = 0;
- for (int i = 0; i < record.length; ++i) {
+ for (int i = 0; i < offset.length; ++i) {
/**
* reset the input
*/
@@ -94,6 +149,26 @@
}
}
}
+ for (int i = offset.length; i < record.length; ++i) {
+ byte[] rightData = right.getFieldData(i - offset.length);
+ int rightOffset = right.getFieldStart(i - offset.length);
+ bbis.setByteArray(rightData, rightOffset);
+
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest(i + " " + instance);
+ }
+ record[i] = instance;
+ if (FrameConstants.DEBUG_FRAME_IO) {
+ try {
+ if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+ throw new HyracksDataException("Field magic mismatch");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
return record;
}
}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
new file mode 100644
index 0000000..9a30647
--- /dev/null
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package edu.uci.ics.pregelix.dataflow.util;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+
+/**
+ * The buffer to hold updates.
+ * We do a batch update for the B-tree during index search and join so that
+ * avoid to open/close cursors frequently.
+ */
+public class UpdateBuffer {
+
+ private int currentInUse = 0;
+ private final int pageLimit;
+ private final List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+ private final FrameTupleAppender appender;
+ private final IHyracksTaskContext ctx;
+ private final FrameTupleReference tuple = new FrameTupleReference();
+ private final int frameSize;
+ private IFrameTupleAccessor fta;
+
+ public UpdateBuffer(int numPages, IHyracksTaskContext ctx, int fieldCount) {
+ this.appender = new FrameTupleAppender(ctx.getFrameSize());
+ ByteBuffer buffer = ctx.allocateFrame();
+ this.buffers.add(buffer);
+ this.appender.reset(buffer, true);
+ this.pageLimit = numPages;
+ this.ctx = ctx;
+ this.frameSize = ctx.getFrameSize();
+ this.fta = new UpdateBufferTupleAccessor(frameSize, fieldCount);
+ }
+
+ public UpdateBuffer(IHyracksTaskContext ctx, int fieldCount) {
+ //by default, the update buffer has 1000 pages
+ this(1000, ctx, fieldCount);
+ }
+
+ public void setFieldCount(int fieldCount) {
+ if (fta.getFieldCount() != fieldCount) {
+ this.fta = new UpdateBufferTupleAccessor(frameSize, fieldCount);
+ }
+ }
+
+ public boolean appendTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ if (currentInUse + 1 < pageLimit) {
+ // move to the new buffer
+ currentInUse++;
+ allocate(currentInUse);
+ ByteBuffer buffer = buffers.get(currentInUse);
+ appender.reset(buffer, true);
+
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new HyracksDataException("tuple cannot be appended to a new frame!");
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return true;
+ }
+ }
+
+ public void updateBTree(ITreeIndexAccessor bta) throws HyracksDataException, IndexException {
+ // batch update
+ for (int i = 0; i <= currentInUse; i++) {
+ ByteBuffer buffer = buffers.get(i);
+ fta.reset(buffer);
+ for (int j = 0; j < fta.getTupleCount(); j++) {
+ tuple.reset(fta, j);
+ bta.update(tuple);
+ }
+ }
+
+ //cleanup the buffer
+ currentInUse = 0;
+ ByteBuffer buffer = buffers.get(0);
+ appender.reset(buffer, true);
+ }
+
+ private void allocate(int index) {
+ if (index >= buffers.size()) {
+ buffers.add(ctx.allocateFrame());
+ }
+ }
+}
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBufferTupleAccessor.java b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBufferTupleAccessor.java
new file mode 100644
index 0000000..39f1361
--- /dev/null
+++ b/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBufferTupleAccessor.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.util;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+
+public final class UpdateBufferTupleAccessor implements IFrameTupleAccessor {
+ private final int frameSize;
+ private final int fieldCount;
+ private ByteBuffer buffer;
+
+ public UpdateBufferTupleAccessor(int frameSize, int fieldCount) {
+ this.frameSize = frameSize;
+ this.fieldCount = fieldCount;
+ }
+
+ @Override
+ public void reset(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ @Override
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
+ @Override
+ public int getTupleCount() {
+ return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
+ }
+
+ @Override
+ public int getTupleStartOffset(int tupleIndex) {
+ return tupleIndex == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * tupleIndex);
+ }
+
+ @Override
+ public int getTupleEndOffset(int tupleIndex) {
+ return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleIndex + 1));
+ }
+
+ @Override
+ public int getFieldStartOffset(int tupleIndex, int fIdx) {
+ return fIdx == 0 ? 0 : buffer.getInt(getTupleStartOffset(tupleIndex) + (fIdx - 1) * 4);
+ }
+
+ @Override
+ public int getFieldEndOffset(int tupleIndex, int fIdx) {
+ return buffer.getInt(getTupleStartOffset(tupleIndex) + fIdx * 4);
+ }
+
+ @Override
+ public int getFieldLength(int tupleIndex, int fIdx) {
+ return getFieldEndOffset(tupleIndex, fIdx) - getFieldStartOffset(tupleIndex, fIdx);
+ }
+
+ @Override
+ public int getFieldSlotsLength() {
+ return getFieldCount() * 4;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return fieldCount;
+ }
+}
\ No newline at end of file
diff --git a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FileWriteOperatorDescriptor.java b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
similarity index 89%
rename from pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FileWriteOperatorDescriptor.java
rename to pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
index 356f06c..d7cbb3a 100644
--- a/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FileWriteOperatorDescriptor.java
+++ b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.pregelix.dataflow.std;
+package edu.uci.ics.pregelix.dataflow;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -35,14 +35,14 @@
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
-public class FileWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+public class VertexWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
private final FileSplit[] splits;
private final IRuntimeHookFactory preHookFactory;
private final IRuntimeHookFactory postHookFactory;
private final IRecordDescriptorFactory inputRdFactory;
- public FileWriteOperatorDescriptor(JobSpecification spec, IRecordDescriptorFactory inputRdFactory,
+ public VertexWriteOperatorDescriptor(JobSpecification spec, IRecordDescriptorFactory inputRdFactory,
IFileSplitProvider fileSplitProvider, IRuntimeHookFactory preHookFactory,
IRuntimeHookFactory postHookFactory) {
super(spec, 1, 0);
@@ -60,7 +60,6 @@
private RecordDescriptor rd0;
private FrameDeserializer frameDeserializer;
private PrintWriter outputWriter;
- private final static String separator = "|";
@Override
public void open() throws HyracksDataException {
@@ -82,10 +81,7 @@
frameDeserializer.reset(frame);
while (!frameDeserializer.done()) {
Object[] tuple = frameDeserializer.deserializeRecord();
- for (int i = 0; i < tuple.length - 1; i++) {
- outputWriter.print(StringSerializationUtils.toString(tuple[i]));
- outputWriter.print(separator);
- }
+ // output the vertex
outputWriter.print(StringSerializationUtils.toString(tuple[tuple.length - 1]));
outputWriter.println();
}
diff --git a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 43b6d17..567e220 100644
--- a/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -119,23 +119,23 @@
Vertex.setNumEdges(numEdges);
giraphJobIdToSuperStep.put(giraphJobId, superStep);
giraphJobIdToMove.put(giraphJobId, false);
- LOGGER.info("start iteration " + Vertex.getCurrentSuperstep());
+ LOGGER.info("start iteration " + Vertex.getSuperstep());
}
System.gc();
}
public synchronized void endSuperStep(String giraphJobId) {
giraphJobIdToMove.put(giraphJobId, true);
- LOGGER.info("end iteration " + Vertex.getCurrentSuperstep());
+ LOGGER.info("end iteration " + Vertex.getSuperstep());
}
@Override
public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
final FileReference fRef = ioManager.createWorkspaceFile(prefix);
- List<FileReference> files = iterationToFiles.get(Vertex.getCurrentSuperstep());
+ List<FileReference> files = iterationToFiles.get(Vertex.getSuperstep());
if (files == null) {
files = new ArrayList<FileReference>();
- iterationToFiles.put(Vertex.getCurrentSuperstep(), files);
+ iterationToFiles.put(Vertex.getSuperstep(), files);
}
files.add(fRef);
return fRef;
diff --git a/pregelix-example/data/clique/clique.txt b/pregelix-example/data/clique/clique.txt
new file mode 100755
index 0000000..08280e3
--- /dev/null
+++ b/pregelix-example/data/clique/clique.txt
@@ -0,0 +1,7 @@
+1 2 3 4
+2 1 3 4 5
+3 1 2 4 5
+4 1 2 3
+5 6 7
+6 5 7
+7 5 6
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
index 30e88ea..74ae455 100644
--- a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
@@ -126,6 +126,11 @@
}
}
+ @Override
+ public String toString() {
+ return getVertexId() + " " + getVertexValue();
+ }
+
public static void main(String[] args) throws Exception {
PregelixJob job = new PregelixJob(ConnectedComponentsVertex.class.getSimpleName());
job.setVertexClass(ConnectedComponentsVertex.class);
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index 290f90e..02e1625 100644
--- a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -97,7 +97,7 @@
@Override
public void compute(Iterator<DoubleWritable> msgIterator) {
- int maxIteration = this.getContext().getConfiguration().getInt(ITERATIONS, 10);
+ int maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 10);
if (getSuperstep() == 1) {
tmpVertexValue.set(1.0 / getNumVertices());
setVertexValue(tmpVertexValue);
@@ -123,13 +123,13 @@
/**
* Simple VertexReader that supports {@link SimplePageRankVertex}
*/
- public static class SimplePageRankVertexReader extends
+ public static class SimulatedPageRankVertexReader extends
GeneratedVertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
/** Class logger */
- private static final Logger LOG = Logger.getLogger(SimplePageRankVertexReader.class.getName());
+ private static final Logger LOG = Logger.getLogger(SimulatedPageRankVertexReader.class.getName());
private Map<VLongWritable, FloatWritable> edges = Maps.newHashMap();
- public SimplePageRankVertexReader() {
+ public SimulatedPageRankVertexReader() {
super();
}
@@ -162,12 +162,12 @@
/**
* Simple VertexInputFormat that supports {@link SimplePageRankVertex}
*/
- public static class SimplePageRankVertexInputFormat extends
+ public static class SimulatedPageRankVertexInputFormat extends
GeneratedVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
@Override
public VertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
- return new SimplePageRankVertexReader();
+ return new SimulatedPageRankVertexReader();
}
}
@@ -188,6 +188,11 @@
}
}
+ @Override
+ public String toString() {
+ return getVertexId() + " " + getVertexValue();
+ }
+
/**
* Simple VertexOutputFormat that supports {@link SimplePageRankVertex}
*/
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
index 2f0ca45..daafc82 100644
--- a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
@@ -160,6 +160,11 @@
}
voteToHalt();
}
+
+ @Override
+ public String toString() {
+ return getVertexId() + " " + getVertexValue();
+ }
private void signalTerminate() {
Configuration conf = getContext().getConfiguration();
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
index a018f08..199870e 100644
--- a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
@@ -126,6 +126,11 @@
}
voteToHalt();
}
+
+ @Override
+ public String toString() {
+ return getVertexId() + " " + getVertexValue();
+ }
public static void main(String[] args) throws Exception {
PregelixJob job = new PregelixJob(ShortestPathsVertex.class.getSimpleName());
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java
new file mode 100644
index 0000000..83e0a6b
--- /dev/null
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.maximalclique;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * The adjacency list contains <src, list-of-neighbors>
+ */
+public class AdjacencyListWritable implements Writable {
+
+ private VLongWritable sourceVertex = new VLongWritable();
+ private Set<VLongWritable> destinationVertexes = new TreeSet<VLongWritable>();
+
+ public AdjacencyListWritable() {
+ }
+
+ public void reset() {
+ this.destinationVertexes.clear();
+ }
+
+ public void setSource(VLongWritable source) {
+ this.sourceVertex = source;
+ }
+
+ public void addNeighbor(VLongWritable neighbor) {
+ destinationVertexes.add(neighbor);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ sourceVertex = new VLongWritable();
+ destinationVertexes.clear();
+ sourceVertex.readFields(input);
+ int numberOfNeighbors = input.readInt();
+ for (int i = 0; i < numberOfNeighbors; i++) {
+ VLongWritable neighbor = new VLongWritable();
+ neighbor.readFields(input);
+ destinationVertexes.add(neighbor);
+ }
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ sourceVertex.write(output);
+ output.writeInt(destinationVertexes.size());
+ for (VLongWritable dest : destinationVertexes) {
+ dest.write(output);
+ }
+ }
+
+ public int numberOfNeighbors() {
+ return destinationVertexes.size();
+ }
+
+ public void removeNeighbor(VLongWritable v) {
+ destinationVertexes.remove(v);
+ }
+
+ public VLongWritable getSource() {
+ return sourceVertex;
+ }
+
+ public Iterator<VLongWritable> getNeighbors() {
+ return destinationVertexes.iterator();
+ }
+
+ public void cleanNonMatch(Collection<VLongWritable> matches) {
+ destinationVertexes.retainAll(matches);
+ }
+
+ public boolean isNeighbor(VLongWritable v) {
+ return destinationVertexes.contains(v);
+ }
+
+}
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/CliquesWritable.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/CliquesWritable.java
new file mode 100644
index 0000000..0e22ea1
--- /dev/null
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/CliquesWritable.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.maximalclique;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * The representation of cliques stored in a vertex.
+ */
+public class CliquesWritable implements Writable {
+
+ private List<VLongWritable> cliques = new ArrayList<VLongWritable>();
+ private int sizeOfClique = 0;
+
+ public CliquesWritable(List<VLongWritable> cliques, int sizeOfClique) {
+ this.cliques = cliques;
+ this.sizeOfClique = sizeOfClique;
+ }
+
+ public CliquesWritable() {
+
+ }
+
+ /**
+ * Set the size of cliques.
+ *
+ * @param sizeOfClique
+ * the size of each maximal clique
+ */
+ public void setCliqueSize(int sizeOfClique) {
+ this.sizeOfClique = sizeOfClique;
+ }
+
+ /**
+ * Add the clique vertexes
+ *
+ * @param cliques
+ * the list of vertexes -- can contain multiple cliques
+ */
+ public void addCliques(CliquesWritable cliques) {
+ this.cliques.addAll(cliques.cliques);
+ }
+
+ /**
+ * Add the clique vertexes
+ *
+ * @param cliques
+ * the list of vertexes -- can contain multiple cliques
+ */
+ public void addCliques(List<VLongWritable> vertexes) {
+ this.cliques.addAll(vertexes);
+ }
+
+ /**
+ * @return the size of the clique
+ */
+ public int getSizeOfClique() {
+ return sizeOfClique;
+ }
+
+ /**
+ * rese the clique
+ */
+ public void reset() {
+ this.cliques.clear();
+ this.sizeOfClique = 0;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ cliques.clear();
+ int numCliques = input.readInt();
+ if (numCliques < 0) {
+ sizeOfClique = 0;
+ return;
+ }
+ sizeOfClique = input.readInt();
+ for (int i = 0; i < numCliques; i++) {
+ for (int j = 0; j < sizeOfClique; j++) {
+ VLongWritable vid = new VLongWritable();
+ vid.readFields(input);
+ cliques.add(vid);
+ }
+ }
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ if (sizeOfClique <= 0) {
+ output.writeInt(-1);
+ return;
+ }
+ output.writeInt(cliques.size() / sizeOfClique);
+ output.writeInt(sizeOfClique);
+
+ for (int i = 0; i < cliques.size(); i++) {
+ cliques.get(i).write(output);
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (sizeOfClique == 0)
+ return "";
+ StringBuffer sb = new StringBuffer();
+ int numCliques = cliques.size() / sizeOfClique;
+ for (int i = 0; i < numCliques; i++) {
+ for (int j = 0; j < sizeOfClique - 1; j++) {
+ sb.append(cliques.get(j));
+ sb.append(",");
+ }
+ sb.append(cliques.get(sizeOfClique - 1));
+ sb.append(";");
+ }
+ return sb.toString();
+ }
+}
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueAggregator.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueAggregator.java
new file mode 100644
index 0000000..061e9e0
--- /dev/null
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueAggregator.java
@@ -0,0 +1,65 @@
+package edu.uci.ics.pregelix.example.maximalclique;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * The global aggregator aggregates the count of triangles
+ */
+public class MaximalCliqueAggregator
+ extends
+ GlobalAggregator<VLongWritable, CliquesWritable, NullWritable, AdjacencyListWritable, CliquesWritable, CliquesWritable> {
+
+ private CliquesWritable state = new CliquesWritable();
+
+ @Override
+ public void init() {
+ state.reset();
+ }
+
+ @Override
+ public void step(Vertex<VLongWritable, CliquesWritable, NullWritable, AdjacencyListWritable> v)
+ throws HyracksDataException {
+ CliquesWritable cliques = v.getVertexValue();
+ updateAggregateState(cliques);
+ }
+
+ /**
+ * Update the current aggregate state
+ *
+ * @param cliques the incoming cliques
+ */
+ private void updateAggregateState(CliquesWritable cliques) {
+ if (cliques.getSizeOfClique() > state.getSizeOfClique()) {
+ //reset the vertex state
+ state.reset();
+ state.setCliqueSize(cliques.getSizeOfClique());
+ state.addCliques(cliques);
+ } else if (cliques.getSizeOfClique() == state.getSizeOfClique()) {
+ //add the new cliques
+ state.addCliques(cliques);
+ } else {
+ return;
+ }
+ }
+
+ @Override
+ public void step(CliquesWritable partialResult) {
+ updateAggregateState(partialResult);
+ }
+
+ @Override
+ public CliquesWritable finishPartial() {
+ return state;
+ }
+
+ @Override
+ public CliquesWritable finishFinal() {
+ return state;
+ }
+
+}
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java
new file mode 100644
index 0000000..266feb7
--- /dev/null
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java
@@ -0,0 +1,347 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.maximalclique;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+import edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingVertex;
+
+/**
+ * The maximal clique example -- find maximal cliques in an undirected graph.
+ * The result cliques contains vertexes ordered by the vertex id ascendingly. The algorithm takes
+ * advantage of that property to do effective pruning.
+ */
+public class MaximalCliqueVertex extends Vertex<VLongWritable, CliquesWritable, NullWritable, AdjacencyListWritable> {
+
+ private Map<VLongWritable, AdjacencyListWritable> map = new TreeMap<VLongWritable, AdjacencyListWritable>();
+ private List<VLongWritable> vertexList = new ArrayList<VLongWritable>();
+ private Map<VLongWritable, Integer> invertedMap = new TreeMap<VLongWritable, Integer>();
+ private int largestCliqueSizeSoFar = 0;
+ private List<BitSet> currentMaximalCliques = new ArrayList<BitSet>();
+ private CliquesWritable tmpValue = new CliquesWritable();
+ private List<VLongWritable> cliques = new ArrayList<VLongWritable>();
+
+ /**
+ * Update the current maximal cliques
+ *
+ * @param values
+ * the received adjcency lists
+ */
+ private void updateCurrentMaximalCliques(Iterator<AdjacencyListWritable> values) {
+ map.clear();
+ vertexList.clear();
+ invertedMap.clear();
+ currentMaximalCliques.clear();
+ cliques.clear();
+ tmpValue.reset();
+
+ // build the initial sub graph
+ while (values.hasNext()) {
+ AdjacencyListWritable adj = values.next();
+ map.put(adj.getSource(), adj);
+ }
+ VLongWritable srcId = getVertexId();
+ map.put(srcId, new AdjacencyListWritable());
+
+ // build the vertex list (vertex id in ascending order) and the inverted list of vertexes
+ int i = 0;
+ for (VLongWritable v : map.keySet()) {
+ vertexList.add(v);
+ invertedMap.put(v, i++);
+ }
+
+ //clean up adjacency list --- remove vertexes who are not neighbors of key
+ for (AdjacencyListWritable adj : map.values()) {
+ adj.cleanNonMatch(vertexList);
+ }
+
+ // get the h-index of the subgraph --- which is the maximum depth to explore
+ int[] neighborCounts = new int[map.size()];
+ i = 0;
+ for (AdjacencyListWritable adj : map.values()) {
+ neighborCounts[i++] = adj.numberOfNeighbors();
+ }
+ Arrays.sort(neighborCounts);
+ int h = 0;
+ for (i = neighborCounts.length - 1; i >= 0; i--) {
+ if (h >= neighborCounts[i]) {
+ break;
+ }
+ h++;
+ }
+ if (h < largestCliqueSizeSoFar) {
+ return;
+ }
+
+ //start depth-first search
+ BitSet cliqueSoFar = new BitSet(h);
+ for (VLongWritable v : vertexList) {
+ cliqueSoFar.set(invertedMap.get(v));
+ searchClique(h, cliqueSoFar, 1, v);
+ cliqueSoFar.clear();
+ }
+
+ //output local maximal cliques
+ for (BitSet clique : currentMaximalCliques) {
+ int keyIndex = invertedMap.get(srcId);
+ clique.set(keyIndex);
+ generateClique(clique);
+ tmpValue.addCliques(cliques);
+ tmpValue.setCliqueSize(clique.cardinality());
+ }
+
+ //update the vertex state
+ setVertexValue(tmpValue);
+ }
+
+ /**
+ * Output a clique with vertex ids.
+ *
+ * @param clique
+ * the bitmap representation of a clique
+ */
+ private void generateClique(BitSet clique) {
+ for (int j = 0; j < clique.length();) {
+ j = clique.nextSetBit(j);
+ VLongWritable v = vertexList.get(j);
+ cliques.add(v);
+ j++;
+ }
+ }
+
+ /**
+ * find cliques using the depth-first search
+ *
+ * @param maxDepth
+ * the maximum search depth
+ * @param cliqueSoFar
+ * the the cliques found so far
+ * @param depthSoFar
+ * the current search depth
+ * @param currentSource
+ * the vertex to be added into the clique
+ */
+ private void searchClique(int maxDepth, BitSet cliqueSoFar, int depthSoFar, VLongWritable currentSource) {
+ if (depthSoFar > maxDepth) {
+ // update maximal clique info
+ updateMaximalClique(cliqueSoFar);
+ return;
+ }
+
+ AdjacencyListWritable adj = map.get(currentSource);
+ Iterator<VLongWritable> neighbors = adj.getNeighbors();
+ ++depthSoFar;
+ while (neighbors.hasNext()) {
+ VLongWritable neighbor = neighbors.next();
+ if (!isTested(neighbor, cliqueSoFar) && isClique(neighbor, cliqueSoFar)) {
+ //snapshot the clique
+ int cliqueLength = cliqueSoFar.length();
+ // expand the clique
+ cliqueSoFar.set(invertedMap.get(neighbor));
+ searchClique(maxDepth, cliqueSoFar, depthSoFar, neighbor);
+ // back to the snapshot clique
+ cliqueSoFar.set(cliqueLength, cliqueSoFar.length(), false);
+ }
+ }
+
+ // update maximal clique info
+ updateMaximalClique(cliqueSoFar);
+ }
+
+ /**
+ * Update the maximal clique to a larger one if it exists
+ *
+ * @param cliqueSoFar
+ * the clique so far, in the bitmap representation
+ */
+ private void updateMaximalClique(BitSet cliqueSoFar) {
+ int cliqueSize = cliqueSoFar.cardinality();
+ if (cliqueSize > largestCliqueSizeSoFar) {
+ currentMaximalCliques.clear();
+ currentMaximalCliques.add((BitSet) cliqueSoFar.clone());
+ largestCliqueSizeSoFar = cliqueSize;
+ } else if (cliqueSize == largestCliqueSizeSoFar) {
+ currentMaximalCliques.add((BitSet) cliqueSoFar.clone());
+ } else {
+ return;
+ }
+ }
+
+ /**
+ * Should we test the vertex newVertex?
+ *
+ * @param newVertex
+ * the vertex to be tested
+ * @param cliqueSoFar
+ * the current clique, in the bitmap representation
+ * @return true if new vertex has been tested
+ */
+ private boolean isTested(VLongWritable newVertex, BitSet cliqueSoFar) {
+ int index = invertedMap.get(newVertex);
+ int largestSetIndex = cliqueSoFar.length() - 1;
+ if (index > largestSetIndex) {
+ // we only return cliques with vertexes in the ascending order
+ // hence, the new vertex must be larger than the largesetSetIndex in the clique
+ return false;
+ } else {
+ // otherwise, we think the vertex is "tested"
+ return true;
+ }
+ }
+
+ /**
+ * Will adding the newVertex yield a bigger clique?
+ *
+ * @param newVertex
+ * the new vertex id
+ * @param cliqueSoFar
+ * the bitmap representation of the clique
+ * @return true if adding the new vertex yelds a bigger clique
+ */
+ private boolean isClique(VLongWritable newVertex, BitSet cliqueSoFar) {
+ AdjacencyListWritable adj = map.get(newVertex);
+ // check whether each existing vertex is in the neighbor set of newVertex
+ for (int i = 0; i < cliqueSoFar.length();) {
+ i = cliqueSoFar.nextSetBit(i);
+ VLongWritable v = vertexList.get(i);
+ if (!adj.isNeighbor(v)) {
+ return false;
+ }
+ i++;
+ }
+ return true;
+ }
+
+ /**
+ * For superstep 1, send outgoing mesages.
+ * For superstep 2, calculate maximal cliques.
+ * otherwise, vote to halt.
+ */
+ @Override
+ public void compute(Iterator<AdjacencyListWritable> msgIterator) {
+ if (getSuperstep() == 1) {
+ sortEdges();
+ sendOutgoingMsgs(getEdges());
+ } else if (getSuperstep() == 2) {
+ updateCurrentMaximalCliques(msgIterator);
+ } else {
+ voteToHalt();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getVertexId() + " " + getVertexValue();
+ }
+
+ private static CliquesWritable readMaximalCliqueResult(Configuration conf) {
+ try {
+ CliquesWritable result = (CliquesWritable) IterationUtils.readGlobalAggregateValue(conf,
+ BspUtils.getJobId(conf));
+ return result;
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(TriangleCountingVertex.class.getSimpleName());
+ job.setVertexClass(MaximalCliqueVertex.class);
+ job.setGlobalAggregatorClass(MaximalCliqueAggregator.class);
+ job.setDynamicVertexValueSize(true);
+ job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
+ job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
+ Client.run(args, job);
+ System.out.println("maximal cliques: \n" + readMaximalCliqueResult(job.getConfiguration()));
+ }
+
+ /**
+ * Send the adjacency lists
+ *
+ * @param edges
+ * the outgoing edges
+ */
+ private void sendOutgoingMsgs(List<Edge<VLongWritable, NullWritable>> edges) {
+ for (int i = 0; i < edges.size(); i++) {
+ if (edges.get(i).getDestVertexId().get() < getVertexId().get()) {
+ // only add emit for the vertexes whose id is smaller than the vertex id
+ // to avoid the duplicate removal step,
+ // because all the resulting cliques will have vertexes in the ascending order.
+ AdjacencyListWritable msg = new AdjacencyListWritable();
+ msg.setSource(getVertexId());
+ for (int j = i + 1; j < edges.size(); j++) {
+ msg.addNeighbor(edges.get(j).getDestVertexId());
+ }
+ sendMsg(edges.get(i).getDestVertexId(), msg);
+ }
+ }
+ }
+
+ /**
+ * Maximal Clique VertexWriter
+ */
+ public static class MaximalCliqueVertexWriter extends
+ TextVertexWriter<VLongWritable, CliquesWritable, NullWritable> {
+ public MaximalCliqueVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<VLongWritable, CliquesWritable, NullWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+ new Text(vertex.getVertexValue().toString()));
+ }
+ }
+
+ /**
+ * output format for maximal clique
+ */
+ public static class MaximalCliqueVertexOutputFormat extends
+ TextVertexOutputFormat<VLongWritable, CliquesWritable, NullWritable> {
+
+ @Override
+ public VertexWriter<VLongWritable, CliquesWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+ return new MaximalCliqueVertexWriter(recordWriter);
+ }
+
+ }
+}
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/TextMaximalCliqueInputFormat.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/TextMaximalCliqueInputFormat.java
new file mode 100644
index 0000000..ec7b32c
--- /dev/null
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/TextMaximalCliqueInputFormat.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.maximalclique;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class TextMaximalCliqueInputFormat extends
+ TextVertexInputFormat<VLongWritable, CliquesWritable, NullWritable, AdjacencyListWritable> {
+
+ @Override
+ public VertexReader<VLongWritable, CliquesWritable, NullWritable, AdjacencyListWritable> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new TextMaximalCliqueGraphReader(textInputFormat.createRecordReader(split, context));
+ }
+}
+
+@SuppressWarnings("rawtypes")
+class TextMaximalCliqueGraphReader extends
+ TextVertexReader<VLongWritable, CliquesWritable, NullWritable, AdjacencyListWritable> {
+
+ private final static String separator = " ";
+ private Vertex vertex;
+ private VLongWritable vertexId = new VLongWritable();
+ private List<VLongWritable> pool = new ArrayList<VLongWritable>();
+ private int used = 0;
+
+ public TextMaximalCliqueGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+ super(lineRecordReader);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Vertex<VLongWritable, CliquesWritable, NullWritable, AdjacencyListWritable> getCurrentVertex()
+ throws IOException, InterruptedException {
+ used = 0;
+ if (vertex == null)
+ vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+
+ vertex.reset();
+ Text line = getRecordReader().getCurrentValue();
+ String[] fields = line.toString().split(separator);
+
+ if (fields.length > 0) {
+ /**
+ * set the src vertex id
+ */
+ long src = Long.parseLong(fields[0]);
+ vertexId.set(src);
+ vertex.setVertexId(vertexId);
+ long dest = -1L;
+
+ /**
+ * set up edges
+ */
+ for (int i = 1; i < fields.length; i++) {
+ dest = Long.parseLong(fields[i]);
+ VLongWritable destId = allocate();
+ destId.set(dest);
+ vertex.addEdge(destId, null);
+ }
+ }
+ return vertex;
+ }
+
+ private VLongWritable allocate() {
+ if (used >= pool.size()) {
+ VLongWritable value = new VLongWritable();
+ pool.add(value);
+ used++;
+ return value;
+ } else {
+ VLongWritable value = pool.get(used);
+ used++;
+ return value;
+ }
+ }
+}
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TextTriangleCountingInputFormat.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TextTriangleCountingInputFormat.java
new file mode 100644
index 0000000..bb399ff
--- /dev/null
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TextTriangleCountingInputFormat.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.trianglecounting;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class TextTriangleCountingInputFormat extends
+ TextVertexInputFormat<VLongWritable, VLongWritable, VLongWritable, VLongWritable> {
+
+ @Override
+ public VertexReader<VLongWritable, VLongWritable, VLongWritable, VLongWritable> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new TextPageRankGraphReader(textInputFormat.createRecordReader(split, context));
+ }
+}
+
+@SuppressWarnings("rawtypes")
+class TextPageRankGraphReader extends TextVertexReader<VLongWritable, VLongWritable, VLongWritable, VLongWritable> {
+
+ private final static String separator = " ";
+ private Vertex vertex;
+ private VLongWritable vertexId = new VLongWritable();
+ private List<VLongWritable> pool = new ArrayList<VLongWritable>();
+ private int used = 0;
+
+ public TextPageRankGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+ super(lineRecordReader);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Vertex<VLongWritable, VLongWritable, VLongWritable, VLongWritable> getCurrentVertex() throws IOException,
+ InterruptedException {
+ used = 0;
+ if (vertex == null)
+ vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+
+ vertex.reset();
+ Text line = getRecordReader().getCurrentValue();
+ String[] fields = line.toString().split(separator);
+
+ if (fields.length > 0) {
+ /**
+ * set the src vertex id
+ */
+ long src = Long.parseLong(fields[0]);
+ vertexId.set(src);
+ vertex.setVertexId(vertexId);
+ long dest = -1L;
+
+ /**
+ * set up edges
+ */
+ for (int i = 1; i < fields.length; i++) {
+ dest = Long.parseLong(fields[i]);
+ VLongWritable destId = allocate();
+ destId.set(dest);
+ vertex.addEdge(destId, null);
+ }
+ }
+ // vertex.sortEdges();
+ return vertex;
+ }
+
+ private VLongWritable allocate() {
+ if (used >= pool.size()) {
+ VLongWritable value = new VLongWritable();
+ pool.add(value);
+ used++;
+ return value;
+ } else {
+ VLongWritable value = pool.get(used);
+ used++;
+ return value;
+ }
+ }
+}
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingAggregator.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingAggregator.java
new file mode 100644
index 0000000..67b028d
--- /dev/null
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingAggregator.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.pregelix.example.trianglecounting;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * The global aggregator aggregates the count of triangles
+ */
+public class TriangleCountingAggregator extends
+ GlobalAggregator<VLongWritable, VLongWritable, VLongWritable, VLongWritable, VLongWritable, VLongWritable> {
+
+ private VLongWritable state = new VLongWritable(0);
+
+ @Override
+ public void init() {
+ state.set(0);
+ }
+
+ @Override
+ public void step(Vertex<VLongWritable, VLongWritable, VLongWritable, VLongWritable> v) throws HyracksDataException {
+ state.set(state.get() + v.getVertexValue().get());
+ }
+
+ @Override
+ public void step(VLongWritable partialResult) {
+ state.set(state.get() + partialResult.get());
+ }
+
+ @Override
+ public VLongWritable finishPartial() {
+ return state;
+ }
+
+ @Override
+ public VLongWritable finishFinal() {
+ return state;
+ }
+
+}
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java
new file mode 100644
index 0000000..d3db095
--- /dev/null
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/trianglecounting/TriangleCountingVertex.java
@@ -0,0 +1,153 @@
+package edu.uci.ics.pregelix.example.trianglecounting;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * The triangle counting example -- counting the triangles in an undirected graph.
+ */
+public class TriangleCountingVertex extends Vertex<VLongWritable, VLongWritable, VLongWritable, VLongWritable> {
+
+ private VLongWritable tmpValue = new VLongWritable(0);
+ private long triangleCount = 0;
+ private Edge<VLongWritable, VLongWritable> candidateEdge = new Edge<VLongWritable, VLongWritable>(
+ new VLongWritable(0), new VLongWritable(0));
+ private EdgeComparator edgeComparator = new EdgeComparator();
+
+ @Override
+ public void compute(Iterator<VLongWritable> msgIterator) {
+ // transforms the edge list into a set to facilitate lookup
+ if (getSuperstep() == 1) {
+ // sorting edges could be avoid if the dataset already has that property
+ sortEdges();
+ List<Edge<VLongWritable, VLongWritable>> edges = this.getEdges();
+ int numEdges = edges.size();
+
+ //decoding longs
+ long src = getVertexId().get();
+ long[] dests = new long[numEdges];
+ for (int i = 0; i < numEdges; i++) {
+ dests[i] = edges.get(i).getDestVertexId().get();
+ }
+
+ //send messages -- take advantage of that each discovered
+ //triangle should have vertexes ordered by vertex id
+ for (int i = 0; i < numEdges; i++) {
+ if (dests[i] < src) {
+ for (int j = i + 1; j < numEdges; j++) {
+ //send messages -- v_j.id > v_i.id -- guaranteed by sortEdge()
+ if (dests[j] > src) {
+ sendMsg(edges.get(i).getDestVertexId(), edges.get(j).getDestVertexId());
+ }
+ }
+ }
+ }
+ }
+ if (getSuperstep() >= 2) {
+ triangleCount = 0;
+ List<Edge<VLongWritable, VLongWritable>> edges = this.getEdges();
+ while (msgIterator.hasNext()) {
+ VLongWritable msg = msgIterator.next();
+ candidateEdge.setDestVertexId(msg);
+ if (Collections.binarySearch(edges, candidateEdge, edgeComparator) >= 0) {
+ // if the msg value is a dest from this vertex
+ triangleCount++;
+ }
+ }
+
+ // set vertex value
+ tmpValue.set(triangleCount);
+ setVertexValue(tmpValue);
+ voteToHalt();
+ }
+ }
+
+ /**
+ * Triangle Counting VertexWriter
+ */
+ public static class TriangleCountingVertexWriter extends
+ TextVertexWriter<VLongWritable, VLongWritable, VLongWritable> {
+ public TriangleCountingVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<VLongWritable, VLongWritable, VLongWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+ new Text(vertex.getVertexValue().toString()));
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getVertexId() + " " + getVertexValue();
+ }
+
+ /**
+ * output format for triangle counting
+ */
+ public static class TriangleCountingVertexOutputFormat extends
+ TextVertexOutputFormat<VLongWritable, VLongWritable, VLongWritable> {
+
+ @Override
+ public VertexWriter<VLongWritable, VLongWritable, VLongWritable> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+ return new TriangleCountingVertexWriter(recordWriter);
+ }
+
+ }
+
+ private static long readTriangleCountingResult(Configuration conf) {
+ try {
+ VLongWritable count = (VLongWritable) IterationUtils
+ .readGlobalAggregateValue(conf, BspUtils.getJobId(conf));
+ return count.get();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(TriangleCountingVertex.class.getSimpleName());
+ job.setVertexClass(TriangleCountingVertex.class);
+ job.setGlobalAggregatorClass(TriangleCountingAggregator.class);
+ job.setVertexInputFormatClass(TextTriangleCountingInputFormat.class);
+ job.setVertexOutputFormatClass(TriangleCountingVertexOutputFormat.class);
+ Client.run(args, job);
+ System.out.println("triangle count: " + readTriangleCountingResult(job.getConfiguration()));
+ }
+}
+
+/**
+ * The comparator for Edge<VLongWritable, VLongWritable>.
+ */
+class EdgeComparator implements Comparator<Edge<VLongWritable, VLongWritable>> {
+
+ @Override
+ public int compare(Edge<VLongWritable, VLongWritable> left, Edge<VLongWritable, VLongWritable> right) {
+ long leftValue = left.getDestVertexId().get();
+ long rightValue = right.getDestVertexId().get();
+ return leftValue > rightValue ? 1 : (leftValue < rightValue ? -1 : 0);
+ }
+}
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexAggregator.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/VertexAggregator.java
similarity index 98%
rename from pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexAggregator.java
rename to pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/VertexAggregator.java
index 68b7cca..d8f704e 100644
--- a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexAggregator.java
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/VertexAggregator.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.pregelix.example;
+package edu.uci.ics.pregelix.example.utils;
import java.io.IOException;
import java.util.Iterator;
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexSorter.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/VertexSorter.java
similarity index 98%
rename from pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexSorter.java
rename to pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/VertexSorter.java
index 1dd6922..8421088 100644
--- a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexSorter.java
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/VertexSorter.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.pregelix.example;
+package edu.uci.ics.pregelix.example.utils;
import java.io.IOException;
import java.util.Iterator;
diff --git a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
index a71c7ad..da6d564 100644
--- a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
+++ b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
@@ -37,7 +37,7 @@
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
import edu.uci.ics.pregelix.example.PageRankVertex;
-import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexInputFormat;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimulatedPageRankVertexInputFormat;
import edu.uci.ics.pregelix.example.util.TestUtils;
@SuppressWarnings("deprecation")
@@ -65,7 +65,7 @@
public DataLoadTest() throws Exception {
job = new PregelixJob(GIRAPH_JOB_NAME);
job.setVertexClass(PageRankVertex.class);
- job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
+ job.setVertexInputFormatClass(SimulatedPageRankVertexInputFormat.class);
job.getConfiguration().setClass(PregelixJob.VERTEX_INDEX_CLASS, LongWritable.class, WritableComparable.class);
job.getConfiguration().setClass(PregelixJob.VERTEX_VALUE_CLASS, DoubleWritable.class, Writable.class);
job.getConfiguration().setClass(PregelixJob.EDGE_VALUE_CLASS, FloatWritable.class, Writable.class);
diff --git a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index c0b4a10..c353d84 100644
--- a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -26,8 +26,8 @@
import edu.uci.ics.pregelix.example.ConnectedComponentsVertex;
import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
import edu.uci.ics.pregelix.example.PageRankVertex;
-import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexInputFormat;
import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimulatedPageRankVertexInputFormat;
import edu.uci.ics.pregelix.example.ReachabilityVertex;
import edu.uci.ics.pregelix.example.ReachabilityVertex.SimpleReachibilityVertexOutputFormat;
import edu.uci.ics.pregelix.example.ShortestPathsVertex;
@@ -35,6 +35,14 @@
import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
import edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat;
import edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat;
+import edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator;
+import edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueVertex;
+import edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueVertex.MaximalCliqueVertexOutputFormat;
+import edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat;
+import edu.uci.ics.pregelix.example.trianglecounting.TextTriangleCountingInputFormat;
+import edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingAggregator;
+import edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingVertex;
+import edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingVertex.TriangleCountingVertexOutputFormat;
public class JobGenerator {
private static String outputBase = "src/test/resources/jobs/";
@@ -44,6 +52,9 @@
private static String HDFS_INPUTPATH2 = "/webmapcomplex";
private static String HDFS_OUTPUTPAH2 = "/resultcomplex";
+ private static String HDFS_INPUTPATH3 = "/clique";
+ private static String HDFS_OUTPUTPAH3 = "/resultclique";
+
private static void generatePageRankJobReal(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(PageRankVertex.class);
@@ -148,7 +159,7 @@
private static void generatePageRankJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(PageRankVertex.class);
- job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
+ job.setVertexInputFormatClass(SimulatedPageRankVertexInputFormat.class);
job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
@@ -157,17 +168,10 @@
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
- private static void genPageRank() throws IOException {
- generatePageRankJob("PageRank", outputBase + "PageRank.xml");
- generatePageRankJobReal("PageRank", outputBase + "PageRankReal.xml");
- generatePageRankJobRealComplex("PageRank", outputBase + "PageRankRealComplex.xml");
- generatePageRankJobRealNoCombiner("PageRank", outputBase + "PageRankRealNoCombiner.xml");
- }
-
private static void generateShortestPathJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(ShortestPathsVertex.class);
- job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
+ job.setVertexInputFormatClass(SimulatedPageRankVertexInputFormat.class);
job.setMessageCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
@@ -177,6 +181,50 @@
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
+ private static void generatePageRankJobRealDynamic(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(PageRankVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setDynamicVertexValueSize(true);
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void generateTriangleCountingJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(TriangleCountingVertex.class);
+ job.setGlobalAggregatorClass(TriangleCountingAggregator.class);
+ job.setVertexInputFormatClass(TextTriangleCountingInputFormat.class);
+ job.setVertexOutputFormatClass(TriangleCountingVertexOutputFormat.class);
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH3);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void generateMaximalCliqueJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(MaximalCliqueVertex.class);
+ job.setGlobalAggregatorClass(MaximalCliqueAggregator.class);
+ job.setDynamicVertexValueSize(true);
+ job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
+ job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH3);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genPageRank() throws IOException {
+ generatePageRankJob("PageRank", outputBase + "PageRank.xml");
+ generatePageRankJobReal("PageRank", outputBase + "PageRankReal.xml");
+ generatePageRankJobRealDynamic("PageRank", outputBase + "PageRankRealDynamic.xml");
+ generatePageRankJobRealComplex("PageRank", outputBase + "PageRankRealComplex.xml");
+ generatePageRankJobRealNoCombiner("PageRank", outputBase + "PageRankRealNoCombiner.xml");
+ }
+
private static void genShortestPath() throws IOException {
generateShortestPathJob("ShortestPaths", outputBase + "ShortestPaths.xml");
generateShortestPathJobReal("ShortestPaths", outputBase + "ShortestPathsReal.xml");
@@ -194,11 +242,20 @@
+ "ReachibilityRealComplexNoConnectivity.xml");
}
+ private static void genTriangleCounting() throws IOException {
+ generateTriangleCountingJob("Triangle Counting", outputBase + "TriangleCounting.xml");
+ }
+
+ private static void genMaximalClique() throws IOException {
+ generateMaximalCliqueJob("Maximal Clique", outputBase + "MaximalClique.xml");
+ }
+
public static void main(String[] args) throws IOException {
genPageRank();
genShortestPath();
genConnectedComponents();
genReachibility();
+ genTriangleCounting();
+ genMaximalClique();
}
-
}
diff --git a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
index 89bce34..7a5bba6 100644
--- a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
+++ b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
@@ -30,6 +30,7 @@
import edu.uci.ics.pregelix.core.jobgen.JobGen;
import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
+import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
@@ -44,6 +45,9 @@
private static String HDFS_INPUTPATH2 = "/webmapcomplex";
private static String HDFS_OUTPUTPAH2 = "/resultcomplex";
+ private static String HDFS_INPUTPATH3 = "/clique";
+ private static String HDFS_OUTPUTPAH3 = "/resultclique";
+
private final PregelixJob job;
private JobGen[] giraphJobGens;
private final String resultFileName;
@@ -61,21 +65,24 @@
if (inputPaths[0].toString().endsWith(HDFS_INPUTPATH)) {
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
- } else {
+ } else if (inputPaths[0].toString().endsWith(HDFS_INPUTPATH2)) {
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
+ } else {
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH3);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
}
job.setJobName(jobName);
this.resultFileName = resultFile;
this.expectedFileName = expectedFile;
- giraphJobGens = new JobGen[3];
+ giraphJobGens = new JobGen[4];
giraphJobGens[0] = new JobGenOuterJoin(job);
waitawhile();
giraphJobGens[1] = new JobGenInnerJoin(job);
waitawhile();
giraphJobGens[2] = new JobGenOuterJoinSort(job);
- //waitawhile();
- // giraphJobGens[3] = new JobGenOuterJoinSingleSort(job);
+ waitawhile();
+ giraphJobGens[3] = new JobGenOuterJoinSingleSort(job);
}
private void waitawhile() throws InterruptedException {
diff --git a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
index 1253bde..ca16c15 100644
--- a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
+++ b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
@@ -59,6 +59,10 @@
private static final String DATA_PATH2 = "data/webmapcomplex/webmap_link.txt";
private static final String HDFS_PATH2 = "/webmapcomplex/";
+ private static final String DATA_PATH3 = "data/clique/clique.txt";
+ private static final String HDFS_PATH3 = "/clique/";
+
+ private static final String HYRACKS_APP_NAME = "giraph";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
private MiniDFSCluster dfsCluster;
@@ -102,6 +106,11 @@
dfs.mkdirs(dest);
dfs.copyFromLocalFile(src, dest);
+ src = new Path(DATA_PATH3);
+ dest = new Path(HDFS_PATH3);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
+
DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
conf.writeXml(confOutput);
confOutput.flush();
diff --git a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
index 1b22b47..d89ec46 100644
--- a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
+++ b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
@@ -62,8 +62,8 @@
if (row1.equals(row2))
continue;
- String[] fields1 = row1.split(",");
- String[] fields2 = row2.split(",");
+ String[] fields1 = row1.split(" ");
+ String[] fields2 = row2.split(" ");
for (int j = 0; j < fields1.length; j++) {
if (fields1[j].equals(fields2[j])) {
@@ -71,8 +71,6 @@
} else if (fields1[j].indexOf('.') < 0) {
return false;
} else {
- fields1[j] = fields1[j].split("=")[1];
- fields2[j] = fields2[j].split("=")[1];
Double double1 = Double.parseDouble(fields1[j]);
Double double2 = Double.parseDouble(fields2[j]);
float float1 = (float) double1.doubleValue();
diff --git a/pregelix-example/src/test/resources/expected/ConnectedComponentsReal.result b/pregelix-example/src/test/resources/expected/ConnectedComponentsReal.result
index b8efedc..45376e2 100644
--- a/pregelix-example/src/test/resources/expected/ConnectedComponentsReal.result
+++ b/pregelix-example/src/test/resources/expected/ConnectedComponentsReal.result
@@ -1,20 +1,20 @@
-0|Vertex(id=0,value=0, edges=(1,))
-1|Vertex(id=1,value=0, edges=(1,2,))
-2|Vertex(id=2,value=0, edges=(1,2,3,))
-3|Vertex(id=3,value=0, edges=(1,2,3,4,))
-4|Vertex(id=4,value=0, edges=(1,2,3,4,5,))
-5|Vertex(id=5,value=0, edges=(1,2,3,4,5,6,))
-6|Vertex(id=6,value=0, edges=(1,2,3,4,5,6,7,))
-7|Vertex(id=7,value=0, edges=(1,2,3,4,5,6,7,8,))
-8|Vertex(id=8,value=0, edges=(1,2,3,4,5,6,7,8,9,))
-9|Vertex(id=9,value=0, edges=(1,2,3,4,5,6,7,8,9,10,))
-10|Vertex(id=10,value=0, edges=(11,))
-11|Vertex(id=11,value=0, edges=(11,12,))
-12|Vertex(id=12,value=0, edges=(11,12,13,))
-13|Vertex(id=13,value=0, edges=(11,12,13,14,))
-14|Vertex(id=14,value=0, edges=(11,12,13,14,15,))
-15|Vertex(id=15,value=0, edges=(11,12,13,14,15,16,))
-16|Vertex(id=16,value=0, edges=(11,12,13,14,15,16,17,))
-17|Vertex(id=17,value=0, edges=(11,12,13,14,15,16,17,18,))
-18|Vertex(id=18,value=0, edges=(11,12,13,14,15,16,17,18,19,))
-19|Vertex(id=19,value=0, edges=(0,11,12,13,14,15,16,17,18,19,))
+0 0
+1 0
+2 0
+3 0
+4 0
+5 0
+6 0
+7 0
+8 0
+9 0
+10 0
+11 0
+12 0
+13 0
+14 0
+15 0
+16 0
+17 0
+18 0
+19 0
diff --git a/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex.result b/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex.result
index ad448b2..dbc30fc 100644
--- a/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex.result
+++ b/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex.result
@@ -1,23 +1,23 @@
-0|Vertex(id=0,value=0, edges=(1,50,))
-1|Vertex(id=1,value=0, edges=(1,2,))
-2|Vertex(id=2,value=0, edges=(1,2,3,))
-3|Vertex(id=3,value=0, edges=(1,2,3,4,))
-4|Vertex(id=4,value=0, edges=(1,2,3,4,5,))
-5|Vertex(id=5,value=0, edges=(1,2,3,4,5,6,))
-6|Vertex(id=6,value=0, edges=(1,2,3,4,5,6,7,))
-7|Vertex(id=7,value=0, edges=(1,2,3,4,5,6,7,8,))
-8|Vertex(id=8,value=0, edges=(1,2,3,4,5,6,7,8,9,))
-9|Vertex(id=9,value=0, edges=(1,2,3,4,5,6,7,8,9,10,))
-10|Vertex(id=10,value=0, edges=(11,99,))
-11|Vertex(id=11,value=0, edges=(11,12,101,))
-12|Vertex(id=12,value=0, edges=(11,12,13,))
-13|Vertex(id=13,value=0, edges=(11,12,13,14,))
-14|Vertex(id=14,value=0, edges=(11,12,13,14,15,))
-15|Vertex(id=15,value=0, edges=(11,12,13,14,15,16,))
-16|Vertex(id=16,value=0, edges=(11,12,13,14,15,16,17,))
-17|Vertex(id=17,value=0, edges=(11,12,13,14,15,16,17,18,))
-18|Vertex(id=18,value=0, edges=(11,12,13,14,15,16,17,18,19,))
-19|Vertex(id=19,value=0, edges=(0,11,12,13,14,15,16,17,18,19,))
-21|Vertex(id=21,value=21, edges=(22,23,24,))
-25|Vertex(id=25,value=25, edges=())
-27|Vertex(id=27,value=27, edges=())
+0 0
+1 0
+2 0
+3 0
+4 0
+5 0
+6 0
+7 0
+8 0
+9 0
+10 0
+11 0
+12 0
+13 0
+14 0
+15 0
+16 0
+17 0
+18 0
+19 0
+21 21
+25 25
+27 27
diff --git a/pregelix-example/src/test/resources/expected/MaximalClique.result b/pregelix-example/src/test/resources/expected/MaximalClique.result
new file mode 100644
index 0000000..d238037
--- /dev/null
+++ b/pregelix-example/src/test/resources/expected/MaximalClique.result
@@ -0,0 +1,7 @@
+1 1,2,3,4;
+2 2,3,4;
+3
+4
+5
+6
+7
diff --git a/pregelix-example/src/test/resources/expected/PageRank.result b/pregelix-example/src/test/resources/expected/PageRank.result
index f38e191..9c4d83a 100644
--- a/pregelix-example/src/test/resources/expected/PageRank.result
+++ b/pregelix-example/src/test/resources/expected/PageRank.result
@@ -1,20 +1,20 @@
-0|Vertex(id=0,value=0.008290140026154316, edges=(1,))
-1|Vertex(id=1,value=0.1535152819247165, edges=(1,2,))
-2|Vertex(id=2,value=0.14646839195826475, edges=(1,2,3,))
-3|Vertex(id=3,value=0.08125113985998214, edges=(1,2,3,4,))
-4|Vertex(id=4,value=0.03976979906329426, edges=(1,2,3,4,5,))
-5|Vertex(id=5,value=0.0225041581462058, edges=(1,2,3,4,5,6,))
-6|Vertex(id=6,value=0.015736276824953852, edges=(1,2,3,4,5,6,7,))
-7|Vertex(id=7,value=0.012542224114863661, edges=(1,2,3,4,5,6,7,8,))
-8|Vertex(id=8,value=0.010628239626209894, edges=(1,2,3,4,5,6,7,8,9,))
-9|Vertex(id=9,value=0.009294348455354817, edges=(1,2,3,4,5,6,7,8,9,10,))
-10|Vertex(id=10,value=0.008290140026154316, edges=(11,))
-11|Vertex(id=11,value=0.15351528192471647, edges=(11,12,))
-12|Vertex(id=12,value=0.14646839195826472, edges=(11,12,13,))
-13|Vertex(id=13,value=0.08125113985998214, edges=(11,12,13,14,))
-14|Vertex(id=14,value=0.03976979906329425, edges=(11,12,13,14,15,))
-15|Vertex(id=15,value=0.0225041581462058, edges=(11,12,13,14,15,16,))
-16|Vertex(id=16,value=0.015736276824953852, edges=(11,12,13,14,15,16,17,))
-17|Vertex(id=17,value=0.012542224114863661, edges=(11,12,13,14,15,16,17,18,))
-18|Vertex(id=18,value=0.010628239626209894, edges=(11,12,13,14,15,16,17,18,19,))
-19|Vertex(id=19,value=0.009294348455354817, edges=(0,11,12,13,14,15,16,17,18,19,))
+0 0.008290140026154316
+1 0.1535152819247165
+2 0.14646839195826475
+3 0.08125113985998214
+4 0.03976979906329426
+5 0.0225041581462058
+6 0.015736276824953852
+7 0.012542224114863661
+8 0.010628239626209894
+9 0.009294348455354817
+10 0.008290140026154316
+11 0.15351528192471647
+12 0.14646839195826472
+13 0.08125113985998214
+14 0.03976979906329425
+15 0.0225041581462058
+16 0.015736276824953852
+17 0.012542224114863661
+18 0.010628239626209894
+19 0.009294348455354817
diff --git a/pregelix-example/src/test/resources/expected/PageRankReal.result b/pregelix-example/src/test/resources/expected/PageRankReal.result
index ab05d38..6432eda 100644
--- a/pregelix-example/src/test/resources/expected/PageRankReal.result
+++ b/pregelix-example/src/test/resources/expected/PageRankReal.result
@@ -1,20 +1,20 @@
-0|Vertex(id=0,value=0.008290140026154316, edges=(1,))
-1|Vertex(id=1,value=0.1535152819247165, edges=(1,2,))
-2|Vertex(id=2,value=0.14646839195826475, edges=(1,2,3,))
-3|Vertex(id=3,value=0.08125113985998214, edges=(1,2,3,4,))
-4|Vertex(id=4,value=0.03976979906329426, edges=(1,2,3,4,5,))
-5|Vertex(id=5,value=0.0225041581462058, edges=(1,2,3,4,5,6,))
-6|Vertex(id=6,value=0.015736276824953852, edges=(1,2,3,4,5,6,7,))
-7|Vertex(id=7,value=0.012542224114863661, edges=(1,2,3,4,5,6,7,8,))
-8|Vertex(id=8,value=0.010628239626209894, edges=(1,2,3,4,5,6,7,8,9,))
-9|Vertex(id=9,value=0.009294348455354817, edges=(1,2,3,4,5,6,7,8,9,10,))
-10|Vertex(id=10,value=0.008290140026154316, edges=(11,))
-11|Vertex(id=11,value=0.15351528192471647, edges=(11,12,))
-12|Vertex(id=12,value=0.14646839195826472, edges=(11,12,13,))
-13|Vertex(id=13,value=0.08125113985998214, edges=(11,12,13,14,))
-14|Vertex(id=14,value=0.03976979906329426, edges=(11,12,13,14,15,))
-15|Vertex(id=15,value=0.0225041581462058, edges=(11,12,13,14,15,16,))
-16|Vertex(id=16,value=0.015736276824953852, edges=(11,12,13,14,15,16,17,))
-17|Vertex(id=17,value=0.012542224114863661, edges=(11,12,13,14,15,16,17,18,))
-18|Vertex(id=18,value=0.010628239626209894, edges=(11,12,13,14,15,16,17,18,19,))
-19|Vertex(id=19,value=0.009294348455354817, edges=(0,11,12,13,14,15,16,17,18,19,))
+0 0.008290140026154316
+1 0.1535152819247165
+2 0.14646839195826475
+3 0.08125113985998214
+4 0.03976979906329426
+5 0.0225041581462058
+6 0.015736276824953852
+7 0.012542224114863661
+8 0.010628239626209894
+9 0.009294348455354817
+10 0.008290140026154316
+11 0.15351528192471647
+12 0.14646839195826472
+13 0.08125113985998214
+14 0.03976979906329426
+15 0.0225041581462058
+16 0.015736276824953852
+17 0.012542224114863661
+18 0.010628239626209894
+19 0.009294348455354817
diff --git a/pregelix-example/src/test/resources/expected/PageRankRealComplex.result b/pregelix-example/src/test/resources/expected/PageRankRealComplex.result
index 1fc108a..2bd09e1 100644
--- a/pregelix-example/src/test/resources/expected/PageRankRealComplex.result
+++ b/pregelix-example/src/test/resources/expected/PageRankRealComplex.result
@@ -1,23 +1,23 @@
-0|Vertex(id=0,value=0.0072088164890121405, edges=(1,50,))
-1|Vertex(id=1,value=0.12352056961948686, edges=(1,2,))
-2|Vertex(id=2,value=0.12045670441668178, edges=(1,2,3,))
-3|Vertex(id=3,value=0.06798545786459467, edges=(1,2,3,4,))
-4|Vertex(id=4,value=0.03387281259892814, edges=(1,2,3,4,5,))
-5|Vertex(id=5,value=0.01942600635480669, edges=(1,2,3,4,5,6,))
-6|Vertex(id=6,value=0.013661020012182747, edges=(1,2,3,4,5,6,7,))
-7|Vertex(id=7,value=0.0109034351563503, edges=(1,2,3,4,5,6,7,8,))
-8|Vertex(id=8,value=0.009241684574402657, edges=(1,2,3,4,5,6,7,8,9,))
-9|Vertex(id=9,value=0.008082028259564783, edges=(1,2,3,4,5,6,7,8,9,10,))
-10|Vertex(id=10,value=0.007208817414047232, edges=(11,99,))
-11|Vertex(id=11,value=0.07555839219845861, edges=(11,12,101,))
-12|Vertex(id=12,value=0.07249452699565352, edges=(11,12,13,))
-13|Vertex(id=13,value=0.05063539695954156, edges=(11,12,13,14,))
-14|Vertex(id=14,value=0.029644452692487822, edges=(11,12,13,14,15,))
-15|Vertex(id=15,value=0.018670183493927354, edges=(11,12,13,14,15,16,))
-16|Vertex(id=16,value=0.013558283213067561, edges=(11,12,13,14,15,16,17,))
-17|Vertex(id=17,value=0.010892790899883237, edges=(11,12,13,14,15,16,17,18,))
-18|Vertex(id=18,value=0.009240874593661061, edges=(11,12,13,14,15,16,17,18,19,))
-19|Vertex(id=19,value=0.008081987856433137, edges=(0,11,12,13,14,15,16,17,18,19,))
-21|Vertex(id=21,value=0.006521739130434782, edges=(22,23,24,))
-25|Vertex(id=25,value=0.006521739130434782, edges=())
-27|Vertex(id=27,value=0.006521739130434782, edges=())
+0 0.0072088164890121405
+1 0.12352056961948686
+2 0.12045670441668178
+3 0.06798545786459467
+4 0.03387281259892814
+5 0.01942600635480669
+6 0.013661020012182747
+7 0.0109034351563503
+8 0.009241684574402657
+9 0.008082028259564783
+10 0.007208817414047232
+11 0.07555839219845861
+12 0.07249452699565352
+13 0.05063539695954156
+14 0.029644452692487822
+15 0.018670183493927354
+16 0.013558283213067561
+17 0.010892790899883237
+18 0.009240874593661061
+19 0.008081987856433137
+21 0.006521739130434782
+25 0.006521739130434782
+27 0.006521739130434782
diff --git a/pregelix-example/src/test/resources/expected/PageRankRealDynamic.result b/pregelix-example/src/test/resources/expected/PageRankRealDynamic.result
new file mode 100644
index 0000000..6432eda
--- /dev/null
+++ b/pregelix-example/src/test/resources/expected/PageRankRealDynamic.result
@@ -0,0 +1,20 @@
+0 0.008290140026154316
+1 0.1535152819247165
+2 0.14646839195826475
+3 0.08125113985998214
+4 0.03976979906329426
+5 0.0225041581462058
+6 0.015736276824953852
+7 0.012542224114863661
+8 0.010628239626209894
+9 0.009294348455354817
+10 0.008290140026154316
+11 0.15351528192471647
+12 0.14646839195826472
+13 0.08125113985998214
+14 0.03976979906329426
+15 0.0225041581462058
+16 0.015736276824953852
+17 0.012542224114863661
+18 0.010628239626209894
+19 0.009294348455354817
diff --git a/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner.result b/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner.result
index ab05d38..9a747a6 100755
--- a/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner.result
+++ b/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner.result
@@ -1,20 +1,20 @@
-0|Vertex(id=0,value=0.008290140026154316, edges=(1,))
-1|Vertex(id=1,value=0.1535152819247165, edges=(1,2,))
-2|Vertex(id=2,value=0.14646839195826475, edges=(1,2,3,))
-3|Vertex(id=3,value=0.08125113985998214, edges=(1,2,3,4,))
-4|Vertex(id=4,value=0.03976979906329426, edges=(1,2,3,4,5,))
-5|Vertex(id=5,value=0.0225041581462058, edges=(1,2,3,4,5,6,))
-6|Vertex(id=6,value=0.015736276824953852, edges=(1,2,3,4,5,6,7,))
-7|Vertex(id=7,value=0.012542224114863661, edges=(1,2,3,4,5,6,7,8,))
-8|Vertex(id=8,value=0.010628239626209894, edges=(1,2,3,4,5,6,7,8,9,))
-9|Vertex(id=9,value=0.009294348455354817, edges=(1,2,3,4,5,6,7,8,9,10,))
-10|Vertex(id=10,value=0.008290140026154316, edges=(11,))
-11|Vertex(id=11,value=0.15351528192471647, edges=(11,12,))
-12|Vertex(id=12,value=0.14646839195826472, edges=(11,12,13,))
-13|Vertex(id=13,value=0.08125113985998214, edges=(11,12,13,14,))
-14|Vertex(id=14,value=0.03976979906329426, edges=(11,12,13,14,15,))
-15|Vertex(id=15,value=0.0225041581462058, edges=(11,12,13,14,15,16,))
-16|Vertex(id=16,value=0.015736276824953852, edges=(11,12,13,14,15,16,17,))
-17|Vertex(id=17,value=0.012542224114863661, edges=(11,12,13,14,15,16,17,18,))
-18|Vertex(id=18,value=0.010628239626209894, edges=(11,12,13,14,15,16,17,18,19,))
-19|Vertex(id=19,value=0.009294348455354817, edges=(0,11,12,13,14,15,16,17,18,19,))
+0 0.008290140026154316
+1 0.15351528192471647
+2 0.14646839195826475
+3 0.08125113985998211
+4 0.03976979906329425
+5 0.0225041581462058
+6 0.01573627682495385
+7 0.012542224114863661
+8 0.010628239626209894
+9 0.009294348455354817
+10 0.008290140026154316
+11 0.1535152819247165
+12 0.14646839195826475
+13 0.08125113985998214
+14 0.03976979906329426
+15 0.0225041581462058
+16 0.015736276824953852
+17 0.012542224114863661
+18 0.010628239626209894
+19 0.009294348455354817
diff --git a/pregelix-example/src/test/resources/expected/ReachibilityRealComplex.result b/pregelix-example/src/test/resources/expected/ReachibilityRealComplex.result
index 74113a8..a1dfc0f 100644
--- a/pregelix-example/src/test/resources/expected/ReachibilityRealComplex.result
+++ b/pregelix-example/src/test/resources/expected/ReachibilityRealComplex.result
@@ -1,23 +1,23 @@
-0|Vertex(id=0,value=2, edges=(1,50,))
-1|Vertex(id=1,value=3, edges=(1,2,))
-2|Vertex(id=2,value=1, edges=(1,2,3,))
-3|Vertex(id=3,value=1, edges=(1,2,3,4,))
-4|Vertex(id=4,value=1, edges=(1,2,3,4,5,))
-5|Vertex(id=5,value=1, edges=(1,2,3,4,5,6,))
-6|Vertex(id=6,value=1, edges=(1,2,3,4,5,6,7,))
-7|Vertex(id=7,value=1, edges=(1,2,3,4,5,6,7,8,))
-8|Vertex(id=8,value=1, edges=(1,2,3,4,5,6,7,8,9,))
-9|Vertex(id=9,value=1, edges=(1,2,3,4,5,6,7,8,9,10,))
-10|Vertex(id=10,value=3, edges=(11,99,))
-11|Vertex(id=11,value=2, edges=(11,12,101,))
-12|Vertex(id=12,value=2, edges=(11,12,13,))
-13|Vertex(id=13,value=2, edges=(11,12,13,14,))
-14|Vertex(id=14,value=2, edges=(11,12,13,14,15,))
-15|Vertex(id=15,value=2, edges=(11,12,13,14,15,16,))
-16|Vertex(id=16,value=2, edges=(11,12,13,14,15,16,17,))
-17|Vertex(id=17,value=2, edges=(11,12,13,14,15,16,17,18,))
-18|Vertex(id=18,value=2, edges=(11,12,13,14,15,16,17,18,19,))
-19|Vertex(id=19,value=2, edges=(0,11,12,13,14,15,16,17,18,19,))
-21|Vertex(id=21,value=0, edges=(22,23,24,))
-25|Vertex(id=25,value=0, edges=())
-27|Vertex(id=27,value=0, edges=())
+0 2
+1 3
+2 1
+3 1
+4 1
+5 1
+6 1
+7 1
+8 1
+9 1
+10 3
+11 2
+12 2
+13 2
+14 2
+15 2
+16 2
+17 2
+18 2
+19 2
+21 0
+25 0
+27 0
diff --git a/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity.result b/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity.result
index ea0edc2..1693fb2 100644
--- a/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity.result
+++ b/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity.result
@@ -1,23 +1,23 @@
-0|Vertex(id=0,value=1, edges=(1,50,))
-1|Vertex(id=1,value=1, edges=(1,2,))
-2|Vertex(id=2,value=1, edges=(1,2,3,))
-3|Vertex(id=3,value=1, edges=(1,2,3,4,))
-4|Vertex(id=4,value=1, edges=(1,2,3,4,5,))
-5|Vertex(id=5,value=1, edges=(1,2,3,4,5,6,))
-6|Vertex(id=6,value=1, edges=(1,2,3,4,5,6,7,))
-7|Vertex(id=7,value=1, edges=(1,2,3,4,5,6,7,8,))
-8|Vertex(id=8,value=1, edges=(1,2,3,4,5,6,7,8,9,))
-9|Vertex(id=9,value=1, edges=(1,2,3,4,5,6,7,8,9,10,))
-10|Vertex(id=10,value=1, edges=(11,99,))
-11|Vertex(id=11,value=1, edges=(11,12,101,))
-12|Vertex(id=12,value=1, edges=(11,12,13,))
-13|Vertex(id=13,value=1, edges=(11,12,13,14,))
-14|Vertex(id=14,value=1, edges=(11,12,13,14,15,))
-15|Vertex(id=15,value=1, edges=(11,12,13,14,15,16,))
-16|Vertex(id=16,value=1, edges=(11,12,13,14,15,16,17,))
-17|Vertex(id=17,value=1, edges=(11,12,13,14,15,16,17,18,))
-18|Vertex(id=18,value=1, edges=(11,12,13,14,15,16,17,18,19,))
-19|Vertex(id=19,value=1, edges=(0,11,12,13,14,15,16,17,18,19,))
-21|Vertex(id=21,value=0, edges=(22,23,24,))
-25|Vertex(id=25,value=2, edges=())
-27|Vertex(id=27,value=0, edges=())
+0 1
+1 1
+2 1
+3 1
+4 1
+5 1
+6 1
+7 1
+8 1
+9 1
+10 1
+11 1
+12 1
+13 1
+14 1
+15 1
+16 1
+17 1
+18 1
+19 1
+21 0
+25 2
+27 0
diff --git a/pregelix-example/src/test/resources/expected/ShortestPaths.result b/pregelix-example/src/test/resources/expected/ShortestPaths.result
index 7bb0ca3..46d1c73 100644
--- a/pregelix-example/src/test/resources/expected/ShortestPaths.result
+++ b/pregelix-example/src/test/resources/expected/ShortestPaths.result
@@ -1,20 +1,20 @@
-0|Vertex(id=0,value=0.0, edges=(1,))
-1|Vertex(id=1,value=0.0, edges=(1,2,))
-2|Vertex(id=2,value=100.0, edges=(1,2,3,))
-3|Vertex(id=3,value=300.0, edges=(1,2,3,4,))
-4|Vertex(id=4,value=600.0, edges=(1,2,3,4,5,))
-5|Vertex(id=5,value=1000.0, edges=(1,2,3,4,5,6,))
-6|Vertex(id=6,value=1500.0, edges=(1,2,3,4,5,6,7,))
-7|Vertex(id=7,value=2100.0, edges=(1,2,3,4,5,6,7,8,))
-8|Vertex(id=8,value=2800.0, edges=(1,2,3,4,5,6,7,8,9,))
-9|Vertex(id=9,value=3600.0, edges=(1,2,3,4,5,6,7,8,9,10,))
-10|Vertex(id=10,value=4500.0, edges=(11,))
-11|Vertex(id=11,value=5500.0, edges=(11,12,))
-12|Vertex(id=12,value=6600.0, edges=(11,12,13,))
-13|Vertex(id=13,value=7800.0, edges=(11,12,13,14,))
-14|Vertex(id=14,value=9100.0, edges=(11,12,13,14,15,))
-15|Vertex(id=15,value=10500.0, edges=(11,12,13,14,15,16,))
-16|Vertex(id=16,value=12000.0, edges=(11,12,13,14,15,16,17,))
-17|Vertex(id=17,value=13600.0, edges=(11,12,13,14,15,16,17,18,))
-18|Vertex(id=18,value=15300.0, edges=(11,12,13,14,15,16,17,18,19,))
-19|Vertex(id=19,value=17100.0, edges=(0,11,12,13,14,15,16,17,18,19,))
+0 0.0
+1 0.0
+2 100.0
+3 300.0
+4 600.0
+5 1000.0
+6 1500.0
+7 2100.0
+8 2800.0
+9 3600.0
+10 4500.0
+11 5500.0
+12 6600.0
+13 7800.0
+14 9100.0
+15 10500.0
+16 12000.0
+17 13600.0
+18 15300.0
+19 17100.0
diff --git a/pregelix-example/src/test/resources/expected/ShortestPathsReal.result b/pregelix-example/src/test/resources/expected/ShortestPathsReal.result
index f2c31a6..b42462f 100644
--- a/pregelix-example/src/test/resources/expected/ShortestPathsReal.result
+++ b/pregelix-example/src/test/resources/expected/ShortestPathsReal.result
@@ -1,20 +1,20 @@
-0|Vertex(id=0,value=0.0, edges=(1,))
-1|Vertex(id=1,value=1.0, edges=(1,2,))
-2|Vertex(id=2,value=2.0, edges=(1,2,3,))
-3|Vertex(id=3,value=3.0, edges=(1,2,3,4,))
-4|Vertex(id=4,value=4.0, edges=(1,2,3,4,5,))
-5|Vertex(id=5,value=5.0, edges=(1,2,3,4,5,6,))
-6|Vertex(id=6,value=6.0, edges=(1,2,3,4,5,6,7,))
-7|Vertex(id=7,value=7.0, edges=(1,2,3,4,5,6,7,8,))
-8|Vertex(id=8,value=8.0, edges=(1,2,3,4,5,6,7,8,9,))
-9|Vertex(id=9,value=9.0, edges=(1,2,3,4,5,6,7,8,9,10,))
-10|Vertex(id=10,value=10.0, edges=(11,))
-11|Vertex(id=11,value=11.0, edges=(11,12,))
-12|Vertex(id=12,value=12.0, edges=(11,12,13,))
-13|Vertex(id=13,value=13.0, edges=(11,12,13,14,))
-14|Vertex(id=14,value=14.0, edges=(11,12,13,14,15,))
-15|Vertex(id=15,value=15.0, edges=(11,12,13,14,15,16,))
-16|Vertex(id=16,value=16.0, edges=(11,12,13,14,15,16,17,))
-17|Vertex(id=17,value=17.0, edges=(11,12,13,14,15,16,17,18,))
-18|Vertex(id=18,value=18.0, edges=(11,12,13,14,15,16,17,18,19,))
-19|Vertex(id=19,value=19.0, edges=(0,11,12,13,14,15,16,17,18,19,))
+0 0.0
+1 1.0
+2 2.0
+3 3.0
+4 4.0
+5 5.0
+6 6.0
+7 7.0
+8 8.0
+9 9.0
+10 10.0
+11 11.0
+12 12.0
+13 13.0
+14 14.0
+15 15.0
+16 16.0
+17 17.0
+18 18.0
+19 19.0
diff --git a/pregelix-example/src/test/resources/expected/TriangleCounting.result b/pregelix-example/src/test/resources/expected/TriangleCounting.result
new file mode 100644
index 0000000..4818e13
--- /dev/null
+++ b/pregelix-example/src/test/resources/expected/TriangleCounting.result
@@ -0,0 +1,7 @@
+1 3
+2 2
+3 0
+4 0
+5 1
+6 0
+7 0
diff --git a/pregelix-example/src/test/resources/jobs/MaximalClique.xml b/pregelix-example/src/test/resources/jobs/MaximalClique.xml
new file mode 100644
index 0000000..616c647
--- /dev/null
+++ b/pregelix-example/src/test/resources/jobs/MaximalClique.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/clique</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/resultclique</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Maximal Clique</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueVertex$MaximalCliqueVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix-example/src/test/resources/jobs/PageRank.xml b/pregelix-example/src/test/resources/jobs/PageRank.xml
index e425b38..744e5b0 100644
--- a/pregelix-example/src/test/resources/jobs/PageRank.xml
+++ b/pregelix-example/src/test/resources/jobs/PageRank.xml
@@ -123,7 +123,7 @@
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexInputFormat</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimulatedPageRankVertexInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml b/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
new file mode 100644
index 0000000..c1a04ae
--- /dev/null
+++ b/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>PageRank</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix-example/src/test/resources/jobs/ShortestPaths.xml b/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
index 3719247..9e791e2 100644
--- a/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
+++ b/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
@@ -124,7 +124,7 @@
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexInputFormat</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimulatedPageRankVertexInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix-example/src/test/resources/jobs/TriangleCounting.xml b/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
new file mode 100644
index 0000000..ee2acc1
--- /dev/null
+++ b/pregelix-example/src/test/resources/jobs/TriangleCounting.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/clique</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/resultclique</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Triangle Counting</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingVertex$TriangleCountingVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.trianglecounting.TriangleCountingAggregator</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 105d3e2..1b8fce4 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -58,6 +59,8 @@
private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
+ private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
// for writing out to message channel
private IFrameWriter writerMsg;
@@ -82,6 +85,16 @@
private ByteBuffer bufferGlobalAggregate;
private GlobalAggregator aggregator;
+ // for writing out to insert vertex channel
+ private IFrameWriter writerInsert;
+ private FrameTupleAppender appenderInsert;
+ private ByteBuffer bufferInsert;
+
+ // for writing out to delete vertex channel
+ private IFrameWriter writerDelete;
+ private FrameTupleAppender appenderDelete;
+ private ByteBuffer bufferDelete;
+
private Vertex vertex;
private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
private DataOutput output = new DataOutputStream(bbos);
@@ -90,11 +103,13 @@
private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+ private Configuration conf;
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
- this.aggregator = BspUtils.createGlobalAggregator(confFactory.createConfiguration());
+ this.conf = confFactory.createConfiguration();
+ this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
this.writerMsg = writers[0];
@@ -114,8 +129,22 @@
this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
- if (writers.length > 3) {
- this.writerAlive = writers[3];
+ this.writerInsert = writers[3];
+ this.bufferInsert = ctx.allocateFrame();
+ this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderInsert.reset(bufferInsert, true);
+ this.writers.add(writerInsert);
+ this.appenders.add(appenderInsert);
+
+ this.writerDelete = writers[4];
+ this.bufferDelete = ctx.allocateFrame();
+ this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderDelete.reset(bufferDelete, true);
+ this.writers.add(writerDelete);
+ this.appenders.add(appenderDelete);
+
+ if (writers.length > 5) {
+ this.writerAlive = writers[5];
this.bufferAlive = ctx.allocateFrame();
this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
this.appenderAlive.reset(bufferAlive, true);
@@ -125,6 +154,8 @@
}
tbs.add(tbMsg);
+ tbs.add(tbInsert);
+ tbs.add(tbDelete);
tbs.add(tbAlive);
}
@@ -164,6 +195,9 @@
@Override
public void close() throws HyracksDataException {
FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+ FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
+ FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
+
if (pushAlive)
FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
if (!terminate) {
@@ -177,7 +211,8 @@
private void writeOutGlobalAggregate() throws HyracksDataException {
try {
/**
- * get partial aggregate result and flush to the final aggregator
+ * get partial aggregate result and flush to the final
+ * aggregator
*/
Writable agg = aggregator.finishPartial();
agg.write(tbGlobalAggregate.getDataOutput());
@@ -203,15 +238,27 @@
}
@Override
- public void update(ITupleReference tupleRef) throws HyracksDataException {
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
try {
if (vertex != null && vertex.hasUpdate()) {
- int fieldCount = tupleRef.getFieldCount();
- for (int i = 1; i < fieldCount; i++) {
- byte[] data = tupleRef.getFieldData(i);
- int offset = tupleRef.getFieldStart(i);
- bbos.setByteArray(data, offset);
- vertex.write(output);
+ if (!BspUtils.getDynamicVertexValueSize(conf)) {
+ //in-place update
+ int fieldCount = tupleRef.getFieldCount();
+ for (int i = 1; i < fieldCount; i++) {
+ byte[] data = tupleRef.getFieldData(i);
+ int offset = tupleRef.getFieldStart(i);
+ bbos.setByteArray(data, offset);
+ vertex.write(output);
+ }
+ } else {
+ //write the vertex id
+ DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+ vertex.getVertexId().write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
+
+ //write the vertex value
+ vertex.write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
}
}
} catch (IOException e) {
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index f72b059..a4d54c8 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -58,6 +59,8 @@
private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
+ private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
// for writing out to message channel
private IFrameWriter writerMsg;
@@ -82,6 +85,16 @@
private ByteBuffer bufferTerminate;
private boolean terminate = true;
+ // for writing out to insert vertex channel
+ private IFrameWriter writerInsert;
+ private FrameTupleAppender appenderInsert;
+ private ByteBuffer bufferInsert;
+
+ // for writing out to delete vertex channel
+ private IFrameWriter writerDelete;
+ private FrameTupleAppender appenderDelete;
+ private ByteBuffer bufferDelete;
+
// dummy empty msgList
private MsgList msgList = new MsgList();
private ArrayIterator msgIterator = new ArrayIterator();
@@ -93,11 +106,13 @@
private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+ private Configuration conf;
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
- this.aggregator = BspUtils.createGlobalAggregator(confFactory.createConfiguration());
+ this.conf = confFactory.createConfiguration();
+ this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
this.writerMsg = writers[0];
@@ -117,8 +132,22 @@
this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
- if (writers.length > 3) {
- this.writerAlive = writers[3];
+ this.writerInsert = writers[3];
+ this.bufferInsert = ctx.allocateFrame();
+ this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderInsert.reset(bufferInsert, true);
+ this.writers.add(writerInsert);
+ this.appenders.add(appenderInsert);
+
+ this.writerDelete = writers[4];
+ this.bufferDelete = ctx.allocateFrame();
+ this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderDelete.reset(bufferDelete, true);
+ this.writers.add(writerDelete);
+ this.appenders.add(appenderDelete);
+
+ if (writers.length > 5) {
+ this.writerAlive = writers[5];
this.bufferAlive = ctx.allocateFrame();
this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
this.appenderAlive.reset(bufferAlive, true);
@@ -129,6 +158,8 @@
msgList.reset(msgIterator);
tbs.add(tbMsg);
+ tbs.add(tbInsert);
+ tbs.add(tbDelete);
tbs.add(tbAlive);
}
@@ -168,13 +199,16 @@
@Override
public void close() throws HyracksDataException {
FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+ FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
+ FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
+
if (pushAlive)
FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
if (!terminate) {
writeOutTerminationState();
}
-
- /**write out global aggregate value*/
+
+ /** write out global aggregate value */
writeOutGlobalAggregate();
}
@@ -207,15 +241,27 @@
}
@Override
- public void update(ITupleReference tupleRef) throws HyracksDataException {
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
try {
if (vertex != null && vertex.hasUpdate()) {
- int fieldCount = tupleRef.getFieldCount();
- for (int i = 1; i < fieldCount; i++) {
- byte[] data = tupleRef.getFieldData(i);
- int offset = tupleRef.getFieldStart(i);
- bbos.setByteArray(data, offset);
- vertex.write(output);
+ if (!BspUtils.getDynamicVertexValueSize(conf)) {
+ //in-place update
+ int fieldCount = tupleRef.getFieldCount();
+ for (int i = 1; i < fieldCount; i++) {
+ byte[] data = tupleRef.getFieldData(i);
+ int offset = tupleRef.getFieldStart(i);
+ bbos.setByteArray(data, offset);
+ vertex.write(output);
+ }
+ } else {
+ //write the vertex id
+ DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+ vertex.getVertexId().write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
+
+ //write the vertex value
+ vertex.write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
}
}
} catch (IOException e) {
@@ -224,5 +270,4 @@
}
};
}
-
}