Several major changes in hyracks:
-- reduced CC/NC communications for reporting partition request and availability; partition request/availability are only reported for the case of send-side materialized (without pipelining) policies in case of task re-attempt.
-- changed buffer cache to dynamically allocate memory based on needs instead of pre-allocating
-- changed each network channel to lazily allocate memory based on needs, and changed materialized connectors to lazily allocate files based on needs
-- changed several major CCNCCFunctions to use non-java serde
-- added a sort-based group-by operator which pushes group-by aggregations into an external sort
-- make external sort a stable sort
1,3,and 4 is to reduce the job overhead.
2 is to reduce the unecessary NC resource consumptions such as memory and files.
5 and 6 are improvements to runtime operators.
One change in algebricks:
-- implemented a rule to push group-by aggregation into sort, i.e., using the sort-based gby operator
Several important changes in pregelix:
-- remove static states in vertex
-- direct check halt bit without deserialization
-- optimize the sort algorithm by packing yet-another 2-byte normalized key into the tPointers array
Change-Id: Id696f9a9f1647b4a025b8b33d20b3a89127c60d6
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/35
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <westmann@gmail.com>
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
index a280c45..4bfa343 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
@@ -17,7 +17,6 @@
import java.io.IOException;
import java.util.Iterator;
-import java.util.List;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
@@ -35,7 +34,7 @@
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.example.client.Client;
import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
-import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat;
import edu.uci.ics.pregelix.example.io.VLongWritable;
/**
@@ -53,8 +52,9 @@
@Override
public void stepPartial(VLongWritable vertexIndex, VLongWritable msg) throws HyracksDataException {
long value = msg.get();
- if (min > value)
+ if (min > value) {
min = value;
+ }
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -66,8 +66,10 @@
@Override
public void stepFinal(VLongWritable vertexIndex, VLongWritable partialAggregate) throws HyracksDataException {
- if (min > partialAggregate.get())
- min = partialAggregate.get();
+ long value = partialAggregate.get();
+ if (min > value) {
+ min = value;
+ }
}
@Override
@@ -83,50 +85,68 @@
msgList.add(agg);
return msgList;
}
+
+ @Override
+ public void stepPartial2(VLongWritable vertexIndex, VLongWritable partialAggregate) throws HyracksDataException {
+ long value = partialAggregate.get();
+ if (min > value) {
+ min = value;
+ }
+ }
+
+ @Override
+ public VLongWritable finishPartial2() {
+ agg.set(min);
+ return agg;
+ }
}
- private VLongWritable outputValue = new VLongWritable();
private VLongWritable tmpVertexValue = new VLongWritable();
- private long minID;
@Override
public void compute(Iterator<VLongWritable> msgIterator) {
+ long currentComponent = getVertexValue().get();
+ // First superstep is special, because we can simply look at the neighbors
if (getSuperstep() == 1) {
- minID = getVertexId().get();
- List<Edge<VLongWritable, FloatWritable>> edges = this.getEdges();
- for (int i = 0; i < edges.size(); i++) {
- Edge<VLongWritable, FloatWritable> edge = edges.get(i);
+ for (Edge<VLongWritable, FloatWritable> edge : getEdges()) {
long neighbor = edge.getDestVertexId().get();
- if (minID > neighbor) {
- minID = neighbor;
+ if (neighbor < currentComponent) {
+ currentComponent = neighbor;
}
}
- tmpVertexValue.set(minID);
- setVertexValue(tmpVertexValue);
- sendOutMsgs();
- } else {
- minID = getVertexId().get();
- while (msgIterator.hasNext()) {
- minID = Math.min(minID, msgIterator.next().get());
- }
- if (minID < getVertexValue().get()) {
- tmpVertexValue.set(minID);
+ // Only need to send value if it is not the own id
+ if (currentComponent != getVertexValue().get()) {
+ tmpVertexValue.set(currentComponent);
setVertexValue(tmpVertexValue);
- sendOutMsgs();
+ for (Edge<VLongWritable, FloatWritable> edge : getEdges()) {
+ VLongWritable neighbor = edge.getDestVertexId();
+ if (neighbor.get() > currentComponent) {
+ sendMsg(neighbor, tmpVertexValue);
+ }
+ }
+ }
+ } else {
+ boolean changed = false;
+ // did we get a smaller id ?
+ while (msgIterator.hasNext()) {
+ VLongWritable message = msgIterator.next();
+ long candidateComponent = message.get();
+ if (candidateComponent < currentComponent) {
+ currentComponent = candidateComponent;
+ changed = true;
+ }
+ }
+
+ // propagate new component id to the neighbors
+ if (changed) {
+ tmpVertexValue.set(currentComponent);
+ setVertexValue(tmpVertexValue);
+ sendMsgToAllEdges(tmpVertexValue);
}
}
voteToHalt();
}
- private void sendOutMsgs() {
- List<Edge<VLongWritable, FloatWritable>> edges = this.getEdges();
- outputValue.set(minID);
- for (int i = 0; i < edges.size(); i++) {
- Edge<VLongWritable, FloatWritable> edge = edges.get(i);
- sendMsg(edge.getDestVertexId(), outputValue);
- }
- }
-
@Override
public String toString() {
return getVertexId() + " " + getVertexValue();
@@ -135,11 +155,12 @@
public static void main(String[] args) throws Exception {
PregelixJob job = new PregelixJob(ConnectedComponentsVertex.class.getSimpleName());
job.setVertexClass(ConnectedComponentsVertex.class);
- job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexInputFormatClass(TextConnectedComponentsInputFormat.class);
job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
job.setDynamicVertexValueSize(true);
+ job.setSkipCombinerKey(true);
Client.run(args, job);
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
index 7fae776..bdf81c7 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
@@ -45,7 +45,7 @@
@Override
public void compute(Iterator<DoubleWritable> msgIterator) {
- if (Vertex.getSuperstep() == 1) {
+ if (getSuperstep() == 1) {
if (newVertex == null) {
newVertex = new GraphMutationVertex();
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphSampleUndirectedVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphSampleUndirectedVertex.java
new file mode 100644
index 0000000..7e02036
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphSampleUndirectedVertex.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.GlobalVertexCountAggregator;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextGraphSampleVertexInputFormat;
+import edu.uci.ics.pregelix.example.io.BooleanWritable;
+import edu.uci.ics.pregelix.example.io.NullWritable;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class GraphSampleUndirectedVertex extends Vertex<VLongWritable, BooleanWritable, BooleanWritable, VLongWritable> {
+
+ public static class GlobalSamplingAggregator
+ extends
+ GlobalAggregator<VLongWritable, BooleanWritable, BooleanWritable, BooleanWritable, LongWritable, LongWritable> {
+
+ private LongWritable state = new LongWritable(0);
+
+ @Override
+ public void init() {
+ state.set(0);
+ }
+
+ @Override
+ public void step(Vertex<VLongWritable, BooleanWritable, BooleanWritable, BooleanWritable> v)
+ throws HyracksDataException {
+ if (v.getVertexValue().get() == true) {
+ state.set(state.get() + 1);
+ }
+ }
+
+ @Override
+ public void step(LongWritable partialResult) {
+ state.set(state.get() + partialResult.get());
+ }
+
+ @Override
+ public LongWritable finishPartial() {
+ return state;
+ }
+
+ @Override
+ public LongWritable finishFinal() {
+ return state;
+ }
+
+ }
+
+ public static final String GLOBAL_RATE = "pregelix.globalrate";
+ private int seedInterval = 0;
+ private int samplingInterval = 2;
+ private float globalRate = 0f;
+
+ private Random random = new Random(System.currentTimeMillis());
+ private BooleanWritable selectedFlag = new BooleanWritable(true);
+ private float fillingRate = 0f;
+
+ @Override
+ public void configure(Configuration conf) {
+ try {
+ globalRate = conf.getFloat(GLOBAL_RATE, 0);
+ seedInterval = (int) (1.0 / (globalRate / 100));
+ if (getSuperstep() > 1) {
+ LongWritable totalSelectedVertex = (LongWritable) IterationUtils.readGlobalAggregateValue(conf,
+ BspUtils.getJobId(conf), GlobalSamplingAggregator.class.getName());
+ LongWritable totalVertex = (LongWritable) IterationUtils.readGlobalAggregateValue(conf,
+ BspUtils.getJobId(conf), GlobalVertexCountAggregator.class.getName());
+ fillingRate = (float) totalSelectedVertex.get() / (float) totalVertex.get();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void compute(Iterator<VLongWritable> msgIterator) throws Exception {
+ if (getSuperstep() == 1) {
+ initSeeds();
+ } else {
+ if (fillingRate >= globalRate) {
+ if (msgIterator.hasNext()) {
+ setVertexValue(selectedFlag);
+
+ //keep the giraph undirected
+ while (msgIterator.hasNext()) {
+ //mark the reverse edge
+ VLongWritable dest = msgIterator.next();
+ markEdge(dest);
+ }
+ }
+ voteToHalt();
+ } else {
+ initSeeds();
+ if (msgIterator.hasNext()) {
+ markAsSelected();
+ }
+
+ //keep the graph undirected
+ while (msgIterator.hasNext()) {
+ //mark the reverse edge
+ VLongWritable dest = msgIterator.next();
+ markEdge(dest);
+ }
+ }
+ }
+ }
+
+ private void initSeeds() {
+ int randVal = random.nextInt(seedInterval);
+ if (randVal == 0) {
+ markAsSelected();
+ }
+ }
+
+ private void markAsSelected() {
+ setVertexValue(selectedFlag);
+ for (Edge<VLongWritable, BooleanWritable> edge : getEdges()) {
+ int randVal = random.nextInt(samplingInterval);
+ if (randVal == 0) {
+ if (edge.getEdgeValue().get() == false) {
+ edge.getEdgeValue().set(true);
+ sendMsg(edge.getDestVertexId(), getVertexId());
+ }
+ }
+ }
+ }
+
+ private void markEdge(VLongWritable destId) {
+ for (Edge<VLongWritable, BooleanWritable> edge : getEdges()) {
+ if (edge.getDestVertexId().equals(destId)) {
+ if (edge.getEdgeValue().get() == false) {
+ edge.getEdgeValue().set(true);
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer strBuffer = new StringBuffer();
+ strBuffer.append(getVertexId().toString());
+ strBuffer.append(" ");
+ for (Edge<VLongWritable, BooleanWritable> edge : getEdges()) {
+ if (edge.getEdgeValue().get() == true) {
+ strBuffer.append(edge.getDestVertexId());
+ strBuffer.append(" ");
+ }
+ }
+ return strBuffer.toString().trim();
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(GraphSampleUndirectedVertex.class.getSimpleName());
+ job.setVertexClass(GraphSampleUndirectedVertex.class);
+ job.setVertexInputFormatClass(TextGraphSampleVertexInputFormat.class);
+ job.setVertexOutputFormatClass(GraphSampleVertexOutputFormat.class);
+ job.addGlobalAggregatorClass(GraphSampleUndirectedVertex.GlobalSamplingAggregator.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setFixedVertexValueSize(true);
+ job.setSkipCombinerKey(true);
+ Client.run(args, job);
+ }
+
+ /**
+ * write sampled vertices
+ */
+ public static class GraphSampleVertexWriter extends TextVertexWriter<VLongWritable, BooleanWritable, NullWritable> {
+ public GraphSampleVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<VLongWritable, BooleanWritable, NullWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ if (vertex.getVertexValue().get() == true) {
+ getRecordWriter().write(new Text(vertex.toString()), new Text());
+ }
+ }
+ }
+
+ /**
+ * output format for sampled vertices
+ */
+ public static class GraphSampleVertexOutputFormat extends
+ TextVertexOutputFormat<VLongWritable, BooleanWritable, NullWritable> {
+
+ @Override
+ public VertexWriter<VLongWritable, BooleanWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+ return new GraphSampleVertexWriter(recordWriter);
+ }
+
+ }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphSampleVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphSampleVertex.java
new file mode 100644
index 0000000..bc6a9e4
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphSampleVertex.java
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.GlobalVertexCountAggregator;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextGraphSampleVertexInputFormat;
+import edu.uci.ics.pregelix.example.io.BooleanWritable;
+import edu.uci.ics.pregelix.example.io.NullWritable;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class GraphSampleVertex extends Vertex<VLongWritable, BooleanWritable, BooleanWritable, BooleanWritable> {
+
+ public static class SimpleSampleCombiner extends MessageCombiner<VLongWritable, BooleanWritable, BooleanWritable> {
+ private BooleanWritable agg = new BooleanWritable();
+ private MsgList<BooleanWritable> msgList;
+
+ @Override
+ public void stepPartial(VLongWritable vertexIndex, BooleanWritable msg) throws HyracksDataException {
+ agg.set(msg.get());
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void init(MsgList msgList) {
+ this.msgList = msgList;
+ }
+
+ @Override
+ public void stepFinal(VLongWritable vertexIndex, BooleanWritable partialAggregate) throws HyracksDataException {
+ agg.set(partialAggregate.get());
+ }
+
+ @Override
+ public BooleanWritable finishPartial() {
+ return agg;
+ }
+
+ @Override
+ public MsgList<BooleanWritable> finishFinal() {
+ msgList.clear();
+ msgList.add(agg);
+ return msgList;
+ }
+
+ @Override
+ public void stepPartial2(VLongWritable vertexIndex, BooleanWritable partialAggregate)
+ throws HyracksDataException {
+ agg.set(partialAggregate.get());
+ }
+
+ @Override
+ public BooleanWritable finishPartial2() {
+ return agg;
+ }
+ }
+
+ public static class GlobalSamplingAggregator
+ extends
+ GlobalAggregator<VLongWritable, BooleanWritable, BooleanWritable, BooleanWritable, LongWritable, LongWritable> {
+
+ private LongWritable state = new LongWritable(0);
+
+ @Override
+ public void init() {
+ state.set(0);
+ }
+
+ @Override
+ public void step(Vertex<VLongWritable, BooleanWritable, BooleanWritable, BooleanWritable> v)
+ throws HyracksDataException {
+ if (v.getVertexValue().get() == true) {
+ state.set(state.get() + 1);
+ }
+ }
+
+ @Override
+ public void step(LongWritable partialResult) {
+ state.set(state.get() + partialResult.get());
+ }
+
+ @Override
+ public LongWritable finishPartial() {
+ return state;
+ }
+
+ @Override
+ public LongWritable finishFinal() {
+ return state;
+ }
+
+ }
+
+ public static final String GLOBAL_RATE = "pregelix.globalrate";
+ private int seedInterval = 0;
+ private int samplingInterval = 2;
+ private float globalRate = 0f;
+
+ private Random random = new Random(System.currentTimeMillis());
+ private BooleanWritable selectedFlag = new BooleanWritable(true);
+ private float fillingRate = 0f;
+
+ @Override
+ public void configure(Configuration conf) {
+ try {
+ globalRate = conf.getFloat(GLOBAL_RATE, 0);
+ seedInterval = (int) (1.0 / (globalRate / 100));
+ if (getSuperstep() > 1) {
+ LongWritable totalSelectedVertex = (LongWritable) IterationUtils.readGlobalAggregateValue(conf,
+ BspUtils.getJobId(conf), GlobalSamplingAggregator.class.getName());
+ LongWritable totalVertex = (LongWritable) IterationUtils.readGlobalAggregateValue(conf,
+ BspUtils.getJobId(conf), GlobalVertexCountAggregator.class.getName());
+ fillingRate = (float) totalSelectedVertex.get() / (float) totalVertex.get();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void compute(Iterator<BooleanWritable> msgIterator) throws Exception {
+ if (getSuperstep() == 1) {
+ initSeeds();
+ } else {
+ if (fillingRate >= globalRate) {
+ if (msgIterator.hasNext()) {
+ setVertexValue(selectedFlag);
+ }
+ voteToHalt();
+ } else {
+ initSeeds();
+ if (msgIterator.hasNext()) {
+ markAsSelected();
+ }
+ }
+ }
+ }
+
+ private void initSeeds() {
+ int randVal = random.nextInt(seedInterval);
+ if (randVal == 0) {
+ markAsSelected();
+ }
+ }
+
+ private void markAsSelected() {
+ setVertexValue(selectedFlag);
+ for (Edge<VLongWritable, BooleanWritable> edge : getEdges()) {
+ int randVal = random.nextInt(samplingInterval);
+ if (randVal == 0) {
+ if (edge.getEdgeValue().get() == false) {
+ edge.getEdgeValue().set(true);
+ sendMsg(edge.getDestVertexId(), selectedFlag);
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer strBuffer = new StringBuffer();
+ strBuffer.append(getVertexId().toString());
+ strBuffer.append(" ");
+ for (Edge<VLongWritable, BooleanWritable> edge : getEdges()) {
+ if (edge.getEdgeValue().get() == true) {
+ strBuffer.append(edge.getDestVertexId());
+ strBuffer.append(" ");
+ }
+ }
+ return strBuffer.toString().trim();
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(GraphSampleVertex.class.getSimpleName());
+ job.setVertexClass(GraphSampleVertex.class);
+ job.setVertexInputFormatClass(TextGraphSampleVertexInputFormat.class);
+ job.setVertexOutputFormatClass(GraphSampleVertexOutputFormat.class);
+ job.setMessageCombinerClass(GraphSampleVertex.SimpleSampleCombiner.class);
+ job.addGlobalAggregatorClass(GraphSampleVertex.GlobalSamplingAggregator.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setFixedVertexValueSize(true);
+ job.setSkipCombinerKey(true);
+ Client.run(args, job);
+ }
+
+ /**
+ * write sampled vertices
+ */
+ public static class GraphSampleVertexWriter extends TextVertexWriter<VLongWritable, BooleanWritable, NullWritable> {
+ public GraphSampleVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<VLongWritable, BooleanWritable, NullWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ if (vertex.getVertexValue().get() == true) {
+ getRecordWriter().write(new Text(vertex.toString()), new Text());
+ }
+ }
+ }
+
+ /**
+ * output format for sampled vertices
+ */
+ public static class GraphSampleVertexOutputFormat extends
+ TextVertexOutputFormat<VLongWritable, BooleanWritable, NullWritable> {
+
+ @Override
+ public VertexWriter<VLongWritable, BooleanWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+ return new GraphSampleVertexWriter(recordWriter);
+ }
+
+ }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index 2508a1e..bc4adc6 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -21,6 +21,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -95,13 +96,32 @@
msgList.add(agg);
return msgList;
}
+
+ @Override
+ public void setPartialCombineState(DoubleWritable combineState) {
+ sum = combineState.get();
+ }
+
+ @Override
+ public void stepPartial2(VLongWritable vertexIndex, DoubleWritable partialAggregate)
+ throws HyracksDataException {
+ sum += partialAggregate.get();
+ }
+
+ @Override
+ public DoubleWritable finishPartial2() {
+ agg.set(sum);
+ return agg;
+ }
+ }
+
+ @Override
+ public void configure(Configuration conf){
+ maxIteration = conf.getInt(ITERATIONS, 10);
}
@Override
public void compute(Iterator<DoubleWritable> msgIterator) {
- if (maxIteration < 0) {
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 10);
- }
if (getSuperstep() == 1) {
tmpVertexValue.set(1.0 / getNumVertices());
setVertexValue(tmpVertexValue);
@@ -219,6 +239,7 @@
job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
job.setFixedVertexValueSize(true);
+ job.setSkipCombinerKey(true);
Client.run(args, job);
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
index fa16ce5..eecb7de 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
@@ -83,6 +83,17 @@
msgList.add(agg);
return msgList;
}
+
+ @Override
+ public void stepPartial2(VLongWritable vertexIndex, ByteWritable partialAggregate) throws HyracksDataException {
+ int newState = agg.get() | partialAggregate.get();
+ agg.set((byte) newState);
+ }
+
+ @Override
+ public ByteWritable finishPartial2() {
+ return agg;
+ }
}
private ByteWritable tmpVertexValue = new ByteWritable();
@@ -115,12 +126,14 @@
private boolean isDest(VLongWritable v) {
return (v.get() == destId);
}
+
+ @Override
+ public void configure(Configuration conf){
+ sourceId = conf.getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
+ }
@Override
public void compute(Iterator<ByteWritable> msgIterator) throws Exception {
- if (sourceId < 0) {
- sourceId = getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
- }
if (destId < 0) {
destId = getContext().getConfiguration().getLong(DEST_ID, DEST_ID_DEFAULT);
}
@@ -220,6 +233,8 @@
job.setVertexOutputFormatClass(SimpleReachibilityVertexOutputFormat.class);
job.setMessageCombinerClass(ReachabilityVertex.SimpleReachibilityCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setSkipCombinerKey(true);
+ job.setFixedVertexValueSize(true);
Client.run(args, job);
System.out.println("reachable? " + readReachibilityResult(job.getConfiguration()));
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
index 2fea813..80a5c61 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
@@ -17,6 +17,7 @@
import java.util.Iterator;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.FloatWritable;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -47,8 +48,9 @@
@Override
public void stepPartial(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
double value = msg.get();
- if (min > value)
+ if (min > value) {
min = value;
+ }
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -67,8 +69,9 @@
@Override
public void stepFinal(VLongWritable vertexIndex, DoubleWritable partialAggregate) throws HyracksDataException {
double value = partialAggregate.get();
- if (min > value)
+ if (min > value) {
min = value;
+ }
}
@Override
@@ -78,6 +81,21 @@
msgList.add(agg);
return msgList;
}
+
+ @Override
+ public void stepPartial2(VLongWritable vertexIndex, DoubleWritable partialAggregate)
+ throws HyracksDataException {
+ double value = partialAggregate.get();
+ if (min > value) {
+ min = value;
+ }
+ }
+
+ @Override
+ public DoubleWritable finishPartial2() {
+ agg.set(min);
+ return agg;
+ }
}
private DoubleWritable outputValue = new DoubleWritable();
@@ -99,10 +117,12 @@
}
@Override
+ public void configure(Configuration conf) {
+ sourceId = conf.getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
+ }
+
+ @Override
public void compute(Iterator<DoubleWritable> msgIterator) {
- if (sourceId < 0) {
- sourceId = getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
- }
if (getSuperstep() == 1) {
tmpVertexValue.set(Double.MAX_VALUE);
setVertexValue(tmpVertexValue);
@@ -134,7 +154,8 @@
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
job.setMessageCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
- job.getConfiguration().setLong(SOURCE_ID, 0);
+ job.setSkipCombinerKey(true);
+ job.setFixedVertexValueSize(true);
Client.run(args, job);
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
index 9fb0958..3928414 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
@@ -73,6 +73,9 @@
@Option(name = "-dyn-opt", usage = "whether to enable dynamic optimization -- for better performance", required = false)
public String dynamicOptimization = "false";
+
+ @Option(name = "-cust-prop", usage = "comma separated customized properties, for example: pregelix.xyz=abc,pregelix.efg=hij", required = false)
+ public String customizedProperties = "";
}
public static void run(String[] args, PregelixJob job) throws Exception {
@@ -133,6 +136,23 @@
if (options.numIteration > 0)
job.getConfiguration().setLong(PageRankVertex.ITERATIONS, options.numIteration);
job.setCheckpointingInterval(options.ckpInterval);
+
+ /**
+ * set customized key value pairs
+ */
+ String customizedProperties = options.customizedProperties;
+ if (customizedProperties.length() > 0) {
+ String[] properties = customizedProperties.split(",");
+ for (String property : properties) {
+ String[] keyValue = property.split("=");
+ if (keyValue.length != 2) {
+ throw new IllegalStateException(property + " is not a valid key value pair!");
+ }
+ String key = keyValue[0];
+ String value = keyValue[1];
+ job.getConfiguration().set(key, value);
+ }
+ }
}
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java
index 4062c74..53c9df4 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.StringTokenizer;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
@@ -38,7 +39,7 @@
@Override
public VertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
- return new TextReachibilityGraphReader(textInputFormat.createRecordReader(split, context));
+ return new TextConnectedComponentsGraphReader(textInputFormat.createRecordReader(split, context));
}
}
@@ -46,7 +47,6 @@
class TextConnectedComponentsGraphReader extends
TextVertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
- private final static String separator = " ";
private Vertex vertex;
private VLongWritable vertexId = new VLongWritable();
private List<VLongWritable> pool = new ArrayList<VLongWritable>();
@@ -73,13 +73,14 @@
vertex.reset();
Text line = getRecordReader().getCurrentValue();
- String[] fields = line.toString().split(separator);
+ String lineStr = line.toString();
+ StringTokenizer tokenizer = new StringTokenizer(lineStr);
- if (fields.length > 0) {
+ if (tokenizer.hasMoreTokens()) {
/**
* set the src vertex id
*/
- long src = Long.parseLong(fields[0]);
+ long src = Long.parseLong(tokenizer.nextToken());
vertexId.set(src);
vertex.setVertexId(vertexId);
long dest = -1L;
@@ -87,12 +88,17 @@
/**
* set up edges
*/
- for (int i = 1; i < fields.length; i++) {
- dest = Long.parseLong(fields[i]);
+ while (tokenizer.hasMoreTokens()) {
+ dest = Long.parseLong(tokenizer.nextToken());
VLongWritable destId = allocate();
destId.set(dest);
vertex.addEdge(destId, null);
}
+
+ /**
+ * set the vertex value
+ */
+ vertex.setVertexValue(vertexId);
}
// vertex.sortEdges();
return vertex;
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextGraphSampleVertexInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextGraphSampleVertexInputFormat.java
new file mode 100644
index 0000000..fc676fc
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextGraphSampleVertexInputFormat.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.inputformat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.BooleanWritable;
+import edu.uci.ics.pregelix.example.io.NullWritable;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class TextGraphSampleVertexInputFormat extends
+ TextVertexInputFormat<VLongWritable, BooleanWritable, NullWritable, BooleanWritable> {
+
+ @Override
+ public VertexReader<VLongWritable, BooleanWritable, NullWritable, BooleanWritable> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new TextSampleGraphReader(textInputFormat.createRecordReader(split, context));
+ }
+}
+
+@SuppressWarnings("rawtypes")
+class TextSampleGraphReader extends TextVertexReader<VLongWritable, BooleanWritable, NullWritable, BooleanWritable> {
+
+ private Vertex vertex;
+ private VLongWritable vertexId = new VLongWritable();
+ private List<VLongWritable> pool = new ArrayList<VLongWritable>();
+ private int used = 0;
+ private BooleanWritable value = new BooleanWritable(false);
+
+ public TextSampleGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+ super(lineRecordReader);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Vertex<VLongWritable, BooleanWritable, NullWritable, BooleanWritable> getCurrentVertex() throws IOException,
+ InterruptedException {
+ used = 0;
+ if (vertex == null)
+ vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+
+ vertex.reset();
+ Text line = getRecordReader().getCurrentValue();
+ String lineStr = line.toString();
+ StringTokenizer tokenizer = new StringTokenizer(lineStr);
+
+ if (tokenizer.hasMoreTokens()) {
+ /**
+ * set the src vertex id
+ */
+ long src = Long.parseLong(tokenizer.nextToken());
+ vertexId.set(src);
+ vertex.setVertexId(vertexId);
+ long dest = -1L;
+
+ /**
+ * set up edges
+ */
+ while (tokenizer.hasMoreTokens()) {
+ dest = Long.parseLong(tokenizer.nextToken());
+ VLongWritable destId = allocate();
+ destId.set(dest);
+ vertex.addEdge(destId, value);
+ }
+ }
+ vertex.setVertexValue(value);
+ return vertex;
+ }
+
+ private VLongWritable allocate() {
+ if (used >= pool.size()) {
+ VLongWritable value = new VLongWritable();
+ pool.add(value);
+ used++;
+ return value;
+ } else {
+ VLongWritable value = pool.get(used);
+ used++;
+ return value;
+ }
+ }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
index 67681d3..35207b5 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.StringTokenizer;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
@@ -46,7 +47,6 @@
@SuppressWarnings("rawtypes")
class TextPageRankGraphReader extends TextVertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
- private final static String separator = " ";
private Vertex vertex;
private VLongWritable vertexId = new VLongWritable();
private List<VLongWritable> pool = new ArrayList<VLongWritable>();
@@ -73,13 +73,14 @@
vertex.reset();
Text line = getRecordReader().getCurrentValue();
- String[] fields = line.toString().split(separator);
+ String lineStr = line.toString();
+ StringTokenizer tokenizer = new StringTokenizer(lineStr);
- if (fields.length > 0) {
+ if (tokenizer.hasMoreTokens()) {
/**
* set the src vertex id
*/
- long src = Long.parseLong(fields[0]);
+ long src = Long.parseLong(tokenizer.nextToken());
vertexId.set(src);
vertex.setVertexId(vertexId);
long dest = -1L;
@@ -87,8 +88,8 @@
/**
* set up edges
*/
- for (int i = 1; i < fields.length; i++) {
- dest = Long.parseLong(fields[i]);
+ while (tokenizer.hasMoreTokens()) {
+ dest = Long.parseLong(tokenizer.nextToken());
VLongWritable destId = allocate();
destId.set(dest);
vertex.addEdge(destId, null);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java
index 5cf6c1c..56de328 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.StringTokenizer;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
@@ -43,10 +44,8 @@
}
@SuppressWarnings("rawtypes")
-class TextReachibilityGraphReader extends
- TextVertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
+class TextReachibilityGraphReader extends TextVertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
- private final static String separator = " ";
private Vertex vertex;
private VLongWritable vertexId = new VLongWritable();
private List<VLongWritable> pool = new ArrayList<VLongWritable>();
@@ -73,13 +72,14 @@
vertex.reset();
Text line = getRecordReader().getCurrentValue();
- String[] fields = line.toString().split(separator);
+ String lineStr = line.toString();
+ StringTokenizer tokenizer = new StringTokenizer(lineStr);
- if (fields.length > 0) {
+ if (tokenizer.hasMoreTokens()) {
/**
* set the src vertex id
*/
- long src = Long.parseLong(fields[0]);
+ long src = Long.parseLong(tokenizer.nextToken());
vertexId.set(src);
vertex.setVertexId(vertexId);
long dest = -1L;
@@ -87,8 +87,8 @@
/**
* set up edges
*/
- for (int i = 1; i < fields.length; i++) {
- dest = Long.parseLong(fields[i]);
+ while (tokenizer.hasMoreTokens()) {
+ dest = Long.parseLong(tokenizer.nextToken());
VLongWritable destId = allocate();
destId.set(dest);
vertex.addEdge(destId, null);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
index 8987393..caa85bf 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.StringTokenizer;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
@@ -47,7 +48,6 @@
class TextShortestPathsGraphReader extends
TextVertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
- private final static String separator = " ";
private Vertex vertex;
private FloatWritable initValue = new FloatWritable(1.0f);
private VLongWritable vertexId = new VLongWritable();
@@ -75,13 +75,14 @@
vertex.getEdges().clear();
vertex.reset();
Text line = getRecordReader().getCurrentValue();
- String[] fields = line.toString().split(separator);
+ String lineStr = line.toString();
+ StringTokenizer tokenizer = new StringTokenizer(lineStr);
- if (fields.length > 0) {
+ if (tokenizer.hasMoreTokens()) {
/**
* set the src vertex id
*/
- long src = Long.parseLong(fields[0]);
+ long src = Long.parseLong(tokenizer.nextToken());
vertexId.set(src);
vertex.setVertexId(vertexId);
long dest = -1L;
@@ -89,8 +90,8 @@
/**
* set up edges
*/
- for (int i = 1; i < fields.length; i++) {
- dest = Long.parseLong(fields[i]);
+ while (tokenizer.hasMoreTokens()) {
+ dest = Long.parseLong(tokenizer.nextToken());
VLongWritable destId = allocate();
destId.set(dest);
vertex.addEdge(destId, initValue);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/DoubleWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/DoubleWritable.java
index ebc7fe4..8c85e3d 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/DoubleWritable.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/DoubleWritable.java
@@ -15,23 +15,69 @@
package edu.uci.ics.pregelix.example.io;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.pregelix.api.io.Pointable;
import edu.uci.ics.pregelix.api.io.WritableSizable;
+import edu.uci.ics.pregelix.example.utils.SerDeUtils;
/**
* Writable for Double values.
*/
-public class DoubleWritable extends org.apache.hadoop.io.DoubleWritable implements WritableSizable {
+public class DoubleWritable extends org.apache.hadoop.io.DoubleWritable implements WritableSizable, Pointable {
+
+ private byte[] data = new byte[8];
public DoubleWritable(double value) {
- super(value);
+ set(value);
}
public DoubleWritable() {
- super();
+ set(0.0);
+ }
+
+ public void set(double v) {
+ super.set(v);
+ SerDeUtils.writeLong(Double.doubleToLongBits(v), data, 0);
}
public int sizeInBytes() {
return 8;
}
+ @Override
+ public byte[] getByteArray() {
+ return data;
+ }
+
+ @Override
+ public int getStartOffset() {
+ return 0;
+ }
+
+ @Override
+ public int getLength() {
+ return 8;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ super.readFields(input);
+ SerDeUtils.writeLong(Double.doubleToLongBits(get()), data, 0);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.write(data);
+ }
+
+ @Override
+ public int set(byte[] bytes, int offset) {
+ super.set(Double.longBitsToDouble(SerDeUtils.readLong(bytes, offset)));
+ System.arraycopy(bytes, offset, data, 0, 8);
+ return 8;
+ }
+
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
index ffbbff4..d9688bc 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
@@ -16,10 +16,14 @@
package edu.uci.ics.pregelix.example.io;
import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import edu.uci.ics.pregelix.api.io.Pointable;
import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.example.utils.SerDeUtils;
@@ -30,35 +34,121 @@
* @see org.apache.hadoop.io.WritableUtils#readVLong(DataInput)
*/
@SuppressWarnings("rawtypes")
-public class VLongWritable extends org.apache.hadoop.io.VLongWritable implements WritableSizable {
+public class VLongWritable extends org.apache.hadoop.io.VLongWritable implements WritableComparable, WritableSizable,
+ Pointable {
+
+ private byte[] data = new byte[10];
+ private int numBytes = -1;
public VLongWritable() {
+ set(0);
}
public VLongWritable(long value) {
set(value);
}
+ @Override
+ public void set(long value) {
+ super.set(value);
+ reset();
+ }
+
public int sizeInBytes() {
- long i = get();
- if (i >= -112 && i <= 127) {
- return 1;
+ return numBytes;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ numBytes = 0;
+ byte firstByte = input.readByte();
+ data[numBytes++] = firstByte;
+ int len = WritableUtils.decodeVIntSize(firstByte);
+ if (len == 1) {
+ super.set(firstByte);
+ return;
+ }
+ long i = 0;
+ input.readFully(data, numBytes, len - 1);
+ numBytes += len - 1;
+ for (int idx = 1; idx < len; idx++) {
+ byte b = data[idx];
+ i = i << 8;
+ i = i | (b & 0xFF);
+ }
+ super.set((WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i));
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.write(data, 0, numBytes);
+ }
+
+ @Override
+ public byte[] getByteArray() {
+ return data;
+ }
+
+ @Override
+ public int getStartOffset() {
+ return 0;
+ }
+
+ @Override
+ public int getLength() {
+ return numBytes;
+ }
+
+ @Override
+ public int set(byte[] bytes, int offset) {
+ int position = offset;
+ numBytes = 0;
+ byte firstByte = bytes[position++];
+ data[numBytes++] = firstByte;
+ int len = WritableUtils.decodeVIntSize(firstByte);
+ if (len == 1) {
+ super.set(firstByte);
+ return numBytes;
+ }
+ long i = 0;
+ System.arraycopy(bytes, position, data, numBytes, len - 1);
+ numBytes += len - 1;
+ for (int idx = 1; idx < len; idx++) {
+ byte b = data[idx];
+ i = i << 8;
+ i = i | (b & 0xFF);
+ }
+ super.set((WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i));
+ return numBytes;
+ }
+
+ private void reset() {
+ numBytes = 0;
+ long value = get();
+ if (value >= -112 && value <= 127) {
+ data[numBytes++] = (byte) value;
+ return;
}
int len = -112;
- if (i < 0) {
- i ^= -1L; // take one's complement'
+ if (value < 0) {
+ value ^= -1L; // take one's complement'
len = -120;
}
- long tmp = i;
+ long tmp = value;
while (tmp != 0) {
tmp = tmp >> 8;
len--;
}
+ data[numBytes++] = (byte) len;
len = (len < -120) ? -(len + 120) : -(len + 112);
- return len + 1;
+ for (int idx = len; idx != 0; idx--) {
+ int shiftbits = (idx - 1) * 8;
+ long mask = 0xFFL << shiftbits;
+ data[numBytes++] = (byte) ((value & mask) >> shiftbits);
+ }
}
/** A Comparator optimized for LongWritable. */
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/CommonSource.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/CommonSource.java
new file mode 100644
index 0000000..60602ee
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/CommonSource.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+@SuppressWarnings("deprecation")
+public class CommonSource {
+ public static class MapRecordOnly extends MapReduceBase implements
+ Mapper<LongWritable, Text, LongWritable, NullWritable> {
+
+ public void map(LongWritable id, Text inputValue, OutputCollector<LongWritable, NullWritable> output,
+ Reporter reporter) throws IOException {
+ StringTokenizer tokenizer = new StringTokenizer(inputValue.toString());
+ String key = tokenizer.nextToken();
+ output.collect(new LongWritable(Long.parseLong(key)), NullWritable.get());
+ }
+ }
+
+ public static class ReduceRecordOnly extends MapReduceBase implements
+ Reducer<LongWritable, NullWritable, NullWritable, Text> {
+
+ NullWritable key = NullWritable.get();
+
+ public void reduce(LongWritable inputKey, Iterator<NullWritable> inputValue,
+ OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException {
+ int counter = 0;
+ while (inputValue.hasNext()) {
+ inputValue.next();
+ counter++;
+ }
+ if (counter >= 5) {
+ output.collect(key, new Text(inputKey.toString()));
+ }
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ JobConf job = new JobConf(GraphPreProcessor.class);
+
+ job.setJobName(GraphPreProcessor.class.getSimpleName());
+ job.setMapperClass(MapRecordOnly.class);
+ job.setReducerClass(ReduceRecordOnly.class);
+ job.setMapOutputKeyClass(LongWritable.class);
+ job.setMapOutputValueClass(NullWritable.class);
+
+ job.setInputFormat(TextInputFormat.class);
+ for (int i = 0; i < args.length - 2; i++) {
+ FileInputFormat.addInputPath(job, new Path(args[i]));
+ }
+ FileOutputFormat.setOutputPath(job, new Path(args[args.length - 2]));
+ job.setNumReduceTasks(Integer.parseInt(args[args.length - 1]));
+ JobClient.runJob(job);
+ }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/DuplicateGraph.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/DuplicateGraph.java
new file mode 100644
index 0000000..5d30143
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/DuplicateGraph.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.utils;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+@SuppressWarnings("deprecation")
+public class DuplicateGraph {
+ public static class MapRecordOnly extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
+
+ static long largestId = 172655479;
+ static long largestId2 = 172655479 * 2;
+ static long largestId3 = 172655479 * 3;
+
+ public void map(LongWritable id, Text inputValue, OutputCollector<Text, Text> output, Reporter reporter)
+ throws IOException {
+ StringTokenizer tokenizer = new StringTokenizer(inputValue.toString());
+ String key = tokenizer.nextToken();
+ long keyLong = Long.parseLong(key);
+ String key2 = Long.toString(keyLong + largestId);
+ String key3 = Long.toString(keyLong + largestId2);
+ String key4 = Long.toString(keyLong + largestId3);
+
+ StringBuilder value = new StringBuilder();
+ StringBuilder value2 = new StringBuilder();
+ StringBuilder value3 = new StringBuilder();
+ StringBuilder value4 = new StringBuilder();
+ while (tokenizer.hasMoreTokens()) {
+ String neighbor = tokenizer.nextToken();
+ long neighborLong = Long.parseLong(neighbor);
+ value.append(neighbor + " ");
+ value2.append((neighborLong + largestId) + " ");
+ value3.append((neighborLong + largestId2) + " ");
+ value4.append((neighborLong + largestId3) + " ");
+ }
+ output.collect(new Text(key), new Text(value.toString().trim()));
+ output.collect(new Text(key2), new Text(value2.toString().trim()));
+ output.collect(new Text(key3), new Text(value3.toString().trim()));
+ output.collect(new Text(key4), new Text(value4.toString().trim()));
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ JobConf job = new JobConf(DuplicateGraph.class);
+
+ job.setJobName(DuplicateGraph.class.getSimpleName());
+ job.setMapperClass(MapRecordOnly.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setInputFormat(TextInputFormat.class);
+ job.setOutputFormat(TextOutputFormat.class);
+
+ job.setInputFormat(TextInputFormat.class);
+ FileInputFormat.setInputPaths(job, args[0]);
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+ job.setNumReduceTasks(0);
+ JobClient.runJob(job);
+ }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/FilterCount.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/FilterCount.java
new file mode 100644
index 0000000..06114ac
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/FilterCount.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.utils;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+@SuppressWarnings("deprecation")
+public class FilterCount {
+ public static class MapRecordOnly extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
+
+ public void map(LongWritable id, Text inputValue, OutputCollector<Text, Text> output, Reporter reporter)
+ throws IOException {
+ StringTokenizer tokenizer = new StringTokenizer(inputValue.toString());
+ String key = tokenizer.nextToken();
+ //skip count
+ tokenizer.nextToken();
+ StringBuilder sb = new StringBuilder();
+ while (tokenizer.hasMoreTokens()) {
+ sb.append(tokenizer.nextToken() + " ");
+ }
+ output.collect(new Text(key), new Text(sb.toString()));
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ JobConf job = new JobConf(GraphPreProcessor.class);
+
+ job.setJobName(FilterCount.class.getSimpleName());
+ job.setMapperClass(MapRecordOnly.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setInputFormat(TextInputFormat.class);
+
+ job.setInputFormat(TextInputFormat.class);
+ for (int i = 0; i < args.length - 1; i++) {
+ FileInputFormat.addInputPath(job, new Path(args[i]));
+ }
+ FileOutputFormat.setOutputPath(job, new Path(args[args.length - 1]));
+ job.setNumReduceTasks(0);
+ JobClient.runJob(job);
+ }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/FindLargest.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/FindLargest.java
new file mode 100644
index 0000000..2217380
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/FindLargest.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+@SuppressWarnings("deprecation")
+public class FindLargest {
+ public static class MapRecordOnly extends MapReduceBase implements
+ Mapper<LongWritable, Text, LongWritable, NullWritable> {
+
+ public void map(LongWritable id, Text inputValue, OutputCollector<LongWritable, NullWritable> output,
+ Reporter reporter) throws IOException {
+ StringTokenizer tokenizer = new StringTokenizer(inputValue.toString());
+ String key = tokenizer.nextToken();
+ output.collect(new LongWritable(Long.parseLong(key)), NullWritable.get());
+ }
+ }
+
+ public static class ReduceRecordOnly extends MapReduceBase implements
+ Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
+
+ NullWritable value = NullWritable.get();
+ long currentMax = Long.MIN_VALUE;
+ OutputCollector<LongWritable, NullWritable> output;
+
+ public void reduce(LongWritable inputKey, Iterator<NullWritable> inputValue,
+ OutputCollector<LongWritable, NullWritable> output, Reporter reporter) throws IOException {
+ if (this.output == null) {
+ this.output = output;
+ }
+ if (inputKey.get() > currentMax) {
+ currentMax = inputKey.get();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ output.collect(new LongWritable(currentMax), value);
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ JobConf job = new JobConf(GraphPreProcessor.class);
+
+ job.setJobName(GraphPreProcessor.class.getSimpleName());
+ job.setMapperClass(MapRecordOnly.class);
+ job.setReducerClass(ReduceRecordOnly.class);
+ job.setCombinerClass(ReduceRecordOnly.class);
+ job.setMapOutputKeyClass(LongWritable.class);
+ job.setMapOutputValueClass(NullWritable.class);
+
+ job.setInputFormat(TextInputFormat.class);
+ for (int i = 0; i < args.length - 2; i++) {
+ FileInputFormat.addInputPath(job, new Path(args[i]));
+ }
+ FileOutputFormat.setOutputPath(job, new Path(args[args.length - 2]));
+ job.setNumReduceTasks(Integer.parseInt(args[args.length - 1]));
+ JobClient.runJob(job);
+ }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/GraphPreProcessor.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/GraphPreProcessor.java
new file mode 100644
index 0000000..02477b1
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/GraphPreProcessor.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.utils;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+@SuppressWarnings("deprecation")
+public class GraphPreProcessor {
+ public static class MapRecordOnly extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
+
+ public void map(LongWritable id, Text inputValue, OutputCollector<Text, Text> output, Reporter reporter)
+ throws IOException {
+ StringTokenizer tokenizer = new StringTokenizer(inputValue.toString());
+ String key = tokenizer.nextToken();
+ //skip the old key
+ tokenizer.nextToken();
+
+ StringBuilder value = new StringBuilder();
+ while (tokenizer.hasMoreTokens()) {
+ value.append(tokenizer.nextToken() + " ");
+ }
+ output.collect(new Text(key), new Text(value.toString().trim()));
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ JobConf job = new JobConf(GraphPreProcessor.class);
+
+ job.setJobName(GraphPreProcessor.class.getSimpleName());
+ job.setMapperClass(MapRecordOnly.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+
+ job.setInputFormat(TextInputFormat.class);
+ FileInputFormat.setInputPaths(job, args[0]);
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+ job.setNumReduceTasks(0);
+ JobClient.runJob(job);
+ }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/SerDeUtils.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/SerDeUtils.java
index 2800187..897861e 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/SerDeUtils.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/utils/SerDeUtils.java
@@ -53,4 +53,35 @@
return value < -120 || (value >= -112 && value < 0);
}
+ /**
+ * read a long value from an offset
+ *
+ * @param data
+ * @param offset
+ * @return the long value
+ */
+ public static long readLong(byte[] data, int offset) {
+ return (((long) data[0] << 56) + ((long) (data[1] & 255) << 48)
+ + ((long) (data[2] & 255) << 40) + ((long) (data[3] & 255) << 32)
+ + ((long) (data[4] & 255) << 24) + ((data[5] & 255) << 16) + ((data[6] & 255) << 8) + ((data[7] & 255) << 0));
+ }
+
+ /**
+ * write a long value to a byte region
+ *
+ * @param v
+ * @param data
+ * @param offset
+ */
+ public static void writeLong(long v, byte[] data, int offset) {
+ data[0] = (byte) (v >>> 56);
+ data[1] = (byte) (v >>> 48);
+ data[2] = (byte) (v >>> 40);
+ data[3] = (byte) (v >>> 32);
+ data[4] = (byte) (v >>> 24);
+ data[5] = (byte) (v >>> 16);
+ data[6] = (byte) (v >>> 8);
+ data[7] = (byte) (v >>> 0);
+ }
+
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureInjectionIterationCompleteHook.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureInjectionIterationCompleteHook.java
new file mode 100644
index 0000000..c59e3ed
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureInjectionIterationCompleteHook.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+
+/**
+ * @author yingyib
+ */
+public class FailureInjectionIterationCompleteHook implements IIterationCompleteReporterHook {
+
+ @Override
+ public void completeIteration(int superstep, PregelixJob job) throws HyracksDataException {
+ try {
+ if (superstep == 3) {
+ PregelixHyracksIntegrationUtil.shutdownNC1();
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryConnectedComponentsTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryConnectedComponentsTest.java
index efc7bcc..7c4ccce 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryConnectedComponentsTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryConnectedComponentsTest.java
@@ -21,7 +21,6 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
-import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
@@ -58,26 +57,10 @@
FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
job.setCheckpointHook(ConservativeCheckpointHook.class);
+ job.setIterationCompleteReporterHook(FailureInjectionIterationCompleteHook.class);
testCluster.setUp();
Driver driver = new Driver(PageRankVertex.class);
- Thread thread = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- synchronized (this) {
- while (Vertex.getSuperstep() <= 5) {
- this.wait(200);
- }
- PregelixHyracksIntegrationUtil.shutdownNC1();
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- });
- thread.start();
driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java
index 421f2f5..886fb58 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryInnerJoinTest.java
@@ -21,7 +21,6 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
-import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
import edu.uci.ics.pregelix.core.base.IDriver.Plan;
@@ -56,27 +55,11 @@
FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
job.setCheckpointHook(ConservativeCheckpointHook.class);
- job.setFixedVertexValueSize(true);
+ job.setIterationCompleteReporterHook(FailureInjectionIterationCompleteHook.class);
testCluster.setUp();
Driver driver = new Driver(PageRankVertex.class);
- Thread thread = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- synchronized (this) {
- while (Vertex.getSuperstep() <= 5) {
- this.wait(200);
- }
- PregelixHyracksIntegrationUtil.shutdownNC1();
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- });
- thread.start();
+
driver.runJob(job, Plan.INNER_JOIN, "127.0.0.1",
PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT, false);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
index b3ad112..c6e85cb 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
@@ -21,7 +21,6 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
-import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
import edu.uci.ics.pregelix.core.driver.Driver;
@@ -56,26 +55,10 @@
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
job.setCheckpointHook(ConservativeCheckpointHook.class);
job.setFixedVertexValueSize(true);
+ job.setIterationCompleteReporterHook(FailureInjectionIterationCompleteHook.class);
testCluster.setUp();
Driver driver = new Driver(PageRankVertex.class);
- Thread thread = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- synchronized (this) {
- while (Vertex.getSuperstep() <= 5) {
- this.wait(200);
- }
- PregelixHyracksIntegrationUtil.shutdownNC1();
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- });
- thread.start();
driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java
index 9a2ef2c..83b896c 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryWithoutCheckpointTest.java
@@ -21,7 +21,6 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
-import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.core.driver.Driver;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
@@ -54,26 +53,10 @@
FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
job.setFixedVertexValueSize(true);
+ job.setIterationCompleteReporterHook(FailureInjectionIterationCompleteHook.class);
testCluster.setUp();
Driver driver = new Driver(PageRankVertex.class);
- Thread thread = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- synchronized (this) {
- while (Vertex.getSuperstep() <= 5) {
- this.wait(200);
- }
- PregelixHyracksIntegrationUtil.shutdownNC1();
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- });
- thread.start();
driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/GraphSampleVertexTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/GraphSampleVertexTest.java
new file mode 100644
index 0000000..3afb417
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/GraphSampleVertexTest.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.GraphSampleVertex.GraphSampleVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextGraphSampleVertexInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+
+/**
+ * @author yingyib
+ */
+public class GraphSampleVertexTest {
+ private static String INPUTPATH = "data/webmapcomplex";
+ private static String OUTPUTPAH = "actual/result";
+
+ @Test
+ public void test() throws Exception {
+ TestCluster testCluster = new TestCluster();
+ try {
+ PregelixJob job = new PregelixJob(GraphSampleVertex.class.getName());
+ job.setVertexClass(GraphSampleVertex.class);
+ job.setVertexInputFormatClass(TextGraphSampleVertexInputFormat.class);
+ job.setVertexOutputFormatClass(GraphSampleVertexOutputFormat.class);
+ job.setMessageCombinerClass(GraphSampleVertex.SimpleSampleCombiner.class);
+ job.addGlobalAggregatorClass(GraphSampleVertex.GlobalSamplingAggregator.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setFixedVertexValueSize(true);
+ job.getConfiguration().set(GraphSampleVertex.GLOBAL_RATE, "0.5f");
+ FileInputFormat.setInputPaths(job, INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+
+ testCluster.setUp();
+ Driver driver = new Driver(GraphSampleVertex.class);
+ driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+ int sampledVertexNum = countVertex(OUTPUTPAH);
+ int totalVertexNum = countVertex(INPUTPATH);
+ float ratio = (float) sampledVertexNum / (float) totalVertexNum;
+ Assert.assertEquals(true, ratio >= 0.5f);
+ } finally {
+ PregelixHyracksIntegrationUtil.deinit();
+ testCluster.cleanupHDFS();
+ }
+ }
+
+ private int countVertex(String filePath) throws Exception {
+ File dir = new File(filePath);
+ int count = 0;
+ if (dir.isDirectory()) {
+ File[] files = dir.listFiles();
+ for (File file : files) {
+ if (file.isFile() && !file.getName().contains(".crc")) {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
+ while (reader.readLine() != null) {
+ count++;
+ }
+ reader.close();
+ }
+ }
+ return count;
+ } else {
+ return count;
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobConnectedComponentsTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobConnectedComponentsTest.java
index 65b9845..a5f793f 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobConnectedComponentsTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobConnectedComponentsTest.java
@@ -68,6 +68,10 @@
@Override
public void run() {
try {
+ synchronized (this) {
+ this.wait(2000);
+ this.notifyAll();
+ }
Driver driver = new Driver(PageRankVertex.class);
PregelixJob job2 = new PregelixJob(ConnectedComponentsVertex.class.getName());
job2.setVertexClass(ConnectedComponentsVertex.class);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobPageRankTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobPageRankTest.java
index cfd1b27..414fab7 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobPageRankTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobPageRankTest.java
@@ -65,6 +65,10 @@
@Override
public void run() {
try {
+ synchronized (this) {
+ this.wait(5000);
+ this.notifyAll();
+ }
Driver driver = new Driver(PageRankVertex.class);
PregelixJob job2 = new PregelixJob(PageRankVertex.class.getName());
job2.setVertexClass(PageRankVertex.class);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
index 5855fd3..9191fad 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
@@ -72,6 +72,7 @@
job.getConfiguration().setClass(PregelixJob.VERTEX_VALUE_CLASS, DoubleWritable.class, Writable.class);
job.getConfiguration().setClass(PregelixJob.EDGE_VALUE_CLASS, FloatWritable.class, Writable.class);
job.getConfiguration().setClass(PregelixJob.MESSAGE_VALUE_CLASS, DoubleWritable.class, Writable.class);
+ job.getConfiguration().set(PregelixJob.JOB_ID, "test_job");
}
public void setUp() throws Exception {
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index c7eff1e..3bedb49 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -79,7 +79,9 @@
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
job.setCheckpointHook(ConservativeCheckpointHook.class);
- job.setEnableDynamicOptimization(true);
+ job.setGroupByAlgorithm(false);
+ job.setGroupByMemoryLimit(3);
+ job.setFrameSize(1024);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -92,11 +94,11 @@
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
job.setFixedVertexValueSize(true);
+ job.setSkipCombinerKey(true);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
job.setCheckpointHook(ConservativeCheckpointHook.class);
- job.setEnableDynamicOptimization(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -112,6 +114,7 @@
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID, 0);
job.setDynamicVertexValueSize(true);
+ job.setSkipCombinerKey(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -122,11 +125,11 @@
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
job.setFixedVertexValueSize(true);
+ job.setSkipCombinerKey(true);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
job.setCheckpointHook(ConservativeCheckpointHook.class);
- job.setEnableDynamicOptimization(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -137,11 +140,10 @@
job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
- job.setDynamicVertexValueSize(true);
+ job.setSkipCombinerKey(true);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
- job.setEnableDynamicOptimization(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -153,11 +155,10 @@
job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
- job.setDynamicVertexValueSize(true);
+ job.setSkipCombinerKey(true);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
- job.setEnableDynamicOptimization(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -168,6 +169,8 @@
job.setVertexOutputFormatClass(SimpleReachibilityVertexOutputFormat.class);
job.setMessageCombinerClass(ReachabilityVertex.SimpleReachibilityCombiner.class);
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ job.setSkipCombinerKey(true);
+ job.setFixedVertexValueSize(true);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
@@ -232,7 +235,6 @@
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
- job.setEnableDynamicOptimization(true);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
index f077053..12195e6 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
@@ -94,11 +94,16 @@
@Test
public void test() throws Exception {
setUp();
- Plan[] plans = new Plan[] { Plan.INNER_JOIN, Plan.OUTER_JOIN, Plan.OUTER_JOIN_SINGLE_SORT, Plan.OUTER_JOIN_SORT };
+ Plan[] plans = new Plan[] { Plan.OUTER_JOIN, Plan.INNER_JOIN };
for (Plan plan : plans) {
+ job.setMergeConnector(true);
driver.runJob(job, plan, PregelixHyracksIntegrationUtil.CC_HOST,
PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT, false);
compareResults();
+ //job.setMergeConnector(false);
+ //driver.runJob(job, plan, PregelixHyracksIntegrationUtil.CC_HOST,
+ // PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT, false);
+ //compareResults();
}
tearDown();
waitawhile();
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-0 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-0
index f1f1d9b..0c89090 100755
--- a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-0
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-0
@@ -1,5 +1,5 @@
-0 0
-4 0
-8 0
-12 0
-16 0
+1 1
+5 1
+9 1
+13 0
+17 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-1 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-1
index 0fa02c1..6d2b709 100755
--- a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-1
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-1
@@ -1,5 +1,5 @@
-1 0
-5 0
-9 0
-13 0
-17 0
+2 1
+6 1
+10 1
+14 0
+18 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-2 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-2
index 542ccae..f90bfe0 100755
--- a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-2
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-2
@@ -1,5 +1,5 @@
-2 0
-6 0
-10 0
-14 0
-18 0
+3 1
+7 1
+11 0
+15 0
+19 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-3 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-3
index 1d5d6d9..503200b 100755
--- a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-3
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-3
@@ -1,5 +1,5 @@
-3 0
-7 0
-11 0
-15 0
-19 0
+0 0
+4 1
+8 1
+12 0
+16 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-0 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-0
index f1f1d9b..503200b 100755
--- a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-0
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-0
@@ -1,5 +1,5 @@
0 0
-4 0
-8 0
+4 1
+8 1
12 0
16 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-1 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-1
index 4e7d87a..4d86486 100755
--- a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-1
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-1
@@ -1,6 +1,6 @@
-1 0
-5 0
-9 0
+1 1
+5 1
+9 1
13 0
17 0
21 21
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-2 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-2
index 542ccae..6d2b709 100755
--- a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-2
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-2
@@ -1,5 +1,5 @@
-2 0
-6 0
-10 0
+2 1
+6 1
+10 1
14 0
18 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-3 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-3
index 513f3ff..af3a604 100755
--- a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-3
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-3
@@ -1,5 +1,5 @@
-3 0
-7 0
+3 1
+7 1
11 0
15 0
19 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-0 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-0
index 2c975de..ca71d2e 100755
--- a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-0
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-0
@@ -1,9 +1,9 @@
0 0
-2 0
-4 0
-6 0
-8 0
-10 0
+2 1
+4 1
+6 1
+8 1
+10 1
12 0
14 0
16 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-1 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-1
index 6976bc1..fae4a35 100755
--- a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-1
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex2/part-1
@@ -1,8 +1,8 @@
-1 0
-3 0
-5 0
-7 0
-9 0
+1 1
+3 1
+5 1
+7 1
+9 1
11 0
13 0
15 0
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
index 3091c83..1cef17a0 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
@@ -80,7 +80,6 @@
<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>pregelix.dynamicopt</name><value>true</value></property>
<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
<property><name>mapred.queue.names</name><value>default</value></property>
@@ -122,13 +121,13 @@
<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleConnectedComponentsVertexOutputFormat</value></property>
<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>pregelix.skipCombinerKey</name><value>true</value></property>
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
<property><name>topology.script.number.args</name><value>100</value></property>
<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index b6af65c..7b043b8 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -1,146 +1,145 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleMinCombiner</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>pregelix.partitionerClass</name><value>edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>pregelix.numVertices</name><value>23</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.job.name</name><value>ConnectedComponents</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>pregelix.dynamicopt</name><value>true</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
<property><name>mapred.input.dir</name><value>file:/webmapcomplex</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleConnectedComponentsVertexOutputFormat</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>ConnectedComponents</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>23</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleMinCombiner</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleConnectedComponentsVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>pregelix.skipCombinerKey</name><value>true</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>pregelix.partitionerClass</name><value>edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index 6fe04fb..857dc48 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -23,6 +23,7 @@
<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>pregelix.framesize</name><value>1024</value></property>
<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
<property><name>tasktracker.http.threads</name><value>40</value></property>
<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
@@ -45,6 +46,7 @@
<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>pregelix.groupmem</name><value>3</value></property>
<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
<property><name>pregelix.numVertices</name><value>20</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
@@ -85,7 +87,6 @@
<property><name>fs.checkpoint.period</name><value>3600</value></property>
<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>pregelix.dynamicopt</name><value>true</value></property>
<property><name>fs.s3.maxRetries</name><value>4</value></property>
<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
@@ -115,6 +116,7 @@
<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
<property><name>fs.s3.block.size</name><value>67108864</value></property>
<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>pregelix.groupalg</name><value>false</value></property>
<property><name>mapred.acls.enabled</name><value>false</value></property>
<property><name>mapred.queue.names</name><value>default</value></property>
<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index d0f9759..5e1fb161 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -81,12 +81,12 @@
<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
<property><name>io.map.index.skip</name><value>0</value></property>
<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>pregelix.skipCombinerKey</name><value>true</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
<property><name>fs.checkpoint.period</name><value>3600</value></property>
<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>pregelix.dynamicopt</name><value>true</value></property>
<property><name>fs.s3.maxRetries</name><value>4</value></property>
<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
index 0173390..c05a4da 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
@@ -80,7 +80,6 @@
<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>pregelix.dynamicopt</name><value>true</value></property>
<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
<property><name>mapred.queue.names</name><value>default</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
index a7a38e0..cd8ee02 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
@@ -80,7 +80,6 @@
<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>pregelix.dynamicopt</name><value>true</value></property>
<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
<property><name>mapred.queue.names</name><value>default</value></property>
@@ -122,6 +121,7 @@
<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>pregelix.skipCombinerKey</name><value>true</value></property>
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
index 225429a..8aa6a23 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
@@ -1,145 +1,147 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
-<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.input.dir</name><value>file:/webmapcomplex</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>fs.checkpoint.size</name><value>67108864</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>Reachibility</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>pregelix.numVertices</name><value>23</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>ReachibilityVertex.destId</name><value>10</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityCombiner</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>pregelix.numVertices</name><value>23</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.job.name</name><value>Reachibility</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>pregelix.incStateLength</name><value>false</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>pregelix.skipCombinerKey</name><value>true</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
<property><name>mapred.acls.enabled</name><value>false</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
+<property><name>ReachibilityVertex.destId</name><value>10</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmapcomplex</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachabilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
index b757514..41f7588 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
@@ -1,145 +1,146 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
-<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>mapred.output.dir</name><value>/result</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>fs.checkpoint.size</name><value>67108864</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>ShortestPaths</value></property>
-<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>pregelix.numVertices</name><value>20</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ShortestPathsVertex</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ShortestPathsVertex$SimpleMinCombiner</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>SimpleShortestPathsVertex.sourceId</name><value>0</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ShortestPathsVertex$SimpleMinCombiner</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.job.name</name><value>ShortestPaths</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>pregelix.skipCombinerKey</name><value>true</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>SimpleShortestPathsVertex.sourceId</name><value>0</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
<property><name>mapred.acls.enabled</name><value>false</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ShortestPathsVertex</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
</configuration>
\ No newline at end of file