Support big vertex in Pregelix.
--For those vertice beyond page size, we store them on HDFS as immutable files.
--Updates on those big vertice will trigger creations of new immutable files.

Change-Id: I6b6f0528b6b5360c96dcdace1fa360d42c517f22
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/72
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Pouria Pirzadeh <pouria.pirzadeh@gmail.com>
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index bc4adc6..8ba4472 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -54,8 +54,8 @@
 public class PageRankVertex extends Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
 
     public static final String ITERATIONS = "HyracksPageRankVertex.iteration";
-    private DoubleWritable outputValue = new DoubleWritable();
-    private DoubleWritable tmpVertexValue = new DoubleWritable();
+    private final DoubleWritable outputValue = new DoubleWritable();
+    private final DoubleWritable tmpVertexValue = new DoubleWritable();
     private int maxIteration = -1;
 
     /**
@@ -63,7 +63,7 @@
      */
     public static class SimpleSumCombiner extends MessageCombiner<VLongWritable, DoubleWritable, DoubleWritable> {
         private double sum = 0.0;
-        private DoubleWritable agg = new DoubleWritable();
+        private final DoubleWritable agg = new DoubleWritable();
         private MsgList<DoubleWritable> msgList;
 
         @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -114,7 +114,7 @@
             return agg;
         }
     }
-    
+
     @Override
     public void configure(Configuration conf){
         maxIteration = conf.getInt(ITERATIONS, 10);
@@ -131,7 +131,7 @@
             while (msgIterator.hasNext()) {
                 sum += msgIterator.next().get();
             }
-            tmpVertexValue.set((0.15 / getNumVertices()) + 0.85 * sum);
+            tmpVertexValue.set(0.15 / getNumVertices() + 0.85 * sum);
             setVertexValue(tmpVertexValue);
         }
 
@@ -151,7 +151,7 @@
             GeneratedVertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
         /** Class logger */
         private static final Logger LOG = Logger.getLogger(SimulatedPageRankVertexReader.class.getName());
-        private Map<VLongWritable, FloatWritable> edges = Maps.newHashMap();
+        private final Map<VLongWritable, FloatWritable> edges = Maps.newHashMap();
 
         public SimulatedPageRankVertexReader() {
             super();
@@ -168,7 +168,7 @@
             Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> vertex = BspUtils
                     .createVertex(configuration);
 
-            VLongWritable vertexId = new VLongWritable((inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+            VLongWritable vertexId = new VLongWritable(inputSplit.getSplitIndex() * totalRecords + recordsRead);
             DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
             long destVertexId = (vertexId.get() + 1) % (inputSplit.getNumSplits() * totalRecords);
             float edgeValue = vertexId.get() * 100f;
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/SkewTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/SkewTest.java
new file mode 100644
index 0000000..4024ec6
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/SkewTest.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * This test case tests big vertex, using PageRankVertex.
+ *
+ * @author yingyib
+ */
+public class SkewTest {
+    private static String INPUTPATH = "data/skew";
+    private static String OUTPUTPAH = "actual/result";
+    private static String EXPECTEDPATH = "src/test/resources/expected/skew";
+
+    @Test
+    public void test() throws Exception {
+        TestCluster testCluster = new TestCluster();
+        try {
+            PregelixJob job = new PregelixJob(PageRankVertex.class.getName());
+            job.setVertexClass(PageRankVertex.class);
+            job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+            job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+            job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+            job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+            FileInputFormat.setInputPaths(job, INPUTPATH);
+            FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+            job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 21);
+            job.setCheckpointHook(ConservativeCheckpointHook.class);
+            job.setFixedVertexValueSize(true);
+            job.setFrameSize(16384);
+
+            testCluster.setUp();
+            Driver driver = new Driver(PageRankVertex.class);
+            driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+            TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+        } catch (Exception e) {
+            throw e;
+        } finally {
+            testCluster.tearDown();
+        }
+    }
+}
diff --git a/pregelix/pregelix-example/src/test/resources/cluster/cluster.properties b/pregelix/pregelix-example/src/test/resources/cluster/cluster.properties
index baa8a24..c75e65c 100644
--- a/pregelix/pregelix-example/src/test/resources/cluster/cluster.properties
+++ b/pregelix/pregelix-example/src/test/resources/cluster/cluster.properties
@@ -42,6 +42,9 @@
 #The frame size of the internal dataflow engine
 FRAME_SIZE=65536
 
+#The frame size of the internal vertex storage
+VFRAME_SIZE=65536
+
 #CC JAVA_OPTS
 CCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx3g -Djava.util.logging.config.file=logging.properties"
 # Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
diff --git a/pregelix/pregelix-example/src/test/resources/expected/skew/part-0 b/pregelix/pregelix-example/src/test/resources/expected/skew/part-0
new file mode 100755
index 0000000..c2c0cc0
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/skew/part-0
@@ -0,0 +1,6 @@
+1	0.1538057869748262
+5	0.021540394083972653
+9	0.008851770565007962
+13	0.08308297649373161
+17	0.012504121497877515
+21	0.009166666666666667
diff --git a/pregelix/pregelix-example/src/test/resources/expected/skew/part-1 b/pregelix/pregelix-example/src/test/resources/expected/skew/part-1
new file mode 100755
index 0000000..5a792e7
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/skew/part-1
@@ -0,0 +1,6 @@
+2	0.14475932277139475
+6	0.015003751200304748
+10	0.0078953714534803
+14	0.040205598595470717
+18	0.010592752407987198
+22	0.007142857142857143
diff --git a/pregelix/pregelix-example/src/test/resources/expected/skew/part-2 b/pregelix/pregelix-example/src/test/resources/expected/skew/part-2
new file mode 100755
index 0000000..c21220f
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/skew/part-2
@@ -0,0 +1,5 @@
+3	0.07941505736441216
+7	0.011947009713572225
+11	0.1561028584551254
+15	0.022550744820502584
+19	0.013158324980956872
diff --git a/pregelix/pregelix-example/src/test/resources/expected/skew/part-3 b/pregelix/pregelix-example/src/test/resources/expected/skew/part-3
new file mode 100755
index 0000000..39a8af1
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/skew/part-3
@@ -0,0 +1,6 @@
+0	0.008261482871440377
+4	0.038415704685818675
+8	0.010122315837328184
+12	0.15141534420136182
+16	0.015706612612729605
+412454	0.007142857142857143