Support big vertex in Pregelix.
--For those vertice beyond page size, we store them on HDFS as immutable files.
--Updates on those big vertice will trigger creations of new immutable files.
Change-Id: I6b6f0528b6b5360c96dcdace1fa360d42c517f22
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/72
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Pouria Pirzadeh <pouria.pirzadeh@gmail.com>
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index bc4adc6..8ba4472 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -54,8 +54,8 @@
public class PageRankVertex extends Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
public static final String ITERATIONS = "HyracksPageRankVertex.iteration";
- private DoubleWritable outputValue = new DoubleWritable();
- private DoubleWritable tmpVertexValue = new DoubleWritable();
+ private final DoubleWritable outputValue = new DoubleWritable();
+ private final DoubleWritable tmpVertexValue = new DoubleWritable();
private int maxIteration = -1;
/**
@@ -63,7 +63,7 @@
*/
public static class SimpleSumCombiner extends MessageCombiner<VLongWritable, DoubleWritable, DoubleWritable> {
private double sum = 0.0;
- private DoubleWritable agg = new DoubleWritable();
+ private final DoubleWritable agg = new DoubleWritable();
private MsgList<DoubleWritable> msgList;
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -114,7 +114,7 @@
return agg;
}
}
-
+
@Override
public void configure(Configuration conf){
maxIteration = conf.getInt(ITERATIONS, 10);
@@ -131,7 +131,7 @@
while (msgIterator.hasNext()) {
sum += msgIterator.next().get();
}
- tmpVertexValue.set((0.15 / getNumVertices()) + 0.85 * sum);
+ tmpVertexValue.set(0.15 / getNumVertices() + 0.85 * sum);
setVertexValue(tmpVertexValue);
}
@@ -151,7 +151,7 @@
GeneratedVertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
/** Class logger */
private static final Logger LOG = Logger.getLogger(SimulatedPageRankVertexReader.class.getName());
- private Map<VLongWritable, FloatWritable> edges = Maps.newHashMap();
+ private final Map<VLongWritable, FloatWritable> edges = Maps.newHashMap();
public SimulatedPageRankVertexReader() {
super();
@@ -168,7 +168,7 @@
Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> vertex = BspUtils
.createVertex(configuration);
- VLongWritable vertexId = new VLongWritable((inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+ VLongWritable vertexId = new VLongWritable(inputSplit.getSplitIndex() * totalRecords + recordsRead);
DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
long destVertexId = (vertexId.get() + 1) % (inputSplit.getNumSplits() * totalRecords);
float edgeValue = vertexId.get() * 100f;
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/SkewTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/SkewTest.java
new file mode 100644
index 0000000..4024ec6
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/SkewTest.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * This test case tests big vertex, using PageRankVertex.
+ *
+ * @author yingyib
+ */
+public class SkewTest {
+ private static String INPUTPATH = "data/skew";
+ private static String OUTPUTPAH = "actual/result";
+ private static String EXPECTEDPATH = "src/test/resources/expected/skew";
+
+ @Test
+ public void test() throws Exception {
+ TestCluster testCluster = new TestCluster();
+ try {
+ PregelixJob job = new PregelixJob(PageRankVertex.class.getName());
+ job.setVertexClass(PageRankVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+ FileInputFormat.setInputPaths(job, INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 21);
+ job.setCheckpointHook(ConservativeCheckpointHook.class);
+ job.setFixedVertexValueSize(true);
+ job.setFrameSize(16384);
+
+ testCluster.setUp();
+ Driver driver = new Driver(PageRankVertex.class);
+ driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+ TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ testCluster.tearDown();
+ }
+ }
+}
diff --git a/pregelix/pregelix-example/src/test/resources/cluster/cluster.properties b/pregelix/pregelix-example/src/test/resources/cluster/cluster.properties
index baa8a24..c75e65c 100644
--- a/pregelix/pregelix-example/src/test/resources/cluster/cluster.properties
+++ b/pregelix/pregelix-example/src/test/resources/cluster/cluster.properties
@@ -42,6 +42,9 @@
#The frame size of the internal dataflow engine
FRAME_SIZE=65536
+#The frame size of the internal vertex storage
+VFRAME_SIZE=65536
+
#CC JAVA_OPTS
CCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx3g -Djava.util.logging.config.file=logging.properties"
# Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
diff --git a/pregelix/pregelix-example/src/test/resources/expected/skew/part-0 b/pregelix/pregelix-example/src/test/resources/expected/skew/part-0
new file mode 100755
index 0000000..c2c0cc0
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/skew/part-0
@@ -0,0 +1,6 @@
+1 0.1538057869748262
+5 0.021540394083972653
+9 0.008851770565007962
+13 0.08308297649373161
+17 0.012504121497877515
+21 0.009166666666666667
diff --git a/pregelix/pregelix-example/src/test/resources/expected/skew/part-1 b/pregelix/pregelix-example/src/test/resources/expected/skew/part-1
new file mode 100755
index 0000000..5a792e7
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/skew/part-1
@@ -0,0 +1,6 @@
+2 0.14475932277139475
+6 0.015003751200304748
+10 0.0078953714534803
+14 0.040205598595470717
+18 0.010592752407987198
+22 0.007142857142857143
diff --git a/pregelix/pregelix-example/src/test/resources/expected/skew/part-2 b/pregelix/pregelix-example/src/test/resources/expected/skew/part-2
new file mode 100755
index 0000000..c21220f
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/skew/part-2
@@ -0,0 +1,5 @@
+3 0.07941505736441216
+7 0.011947009713572225
+11 0.1561028584551254
+15 0.022550744820502584
+19 0.013158324980956872
diff --git a/pregelix/pregelix-example/src/test/resources/expected/skew/part-3 b/pregelix/pregelix-example/src/test/resources/expected/skew/part-3
new file mode 100755
index 0000000..39a8af1
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/skew/part-3
@@ -0,0 +1,6 @@
+0 0.008261482871440377
+4 0.038415704685818675
+8 0.010122315837328184
+12 0.15141534420136182
+16 0.015706612612729605
+412454 0.007142857142857143