cross merge fullstack_release_candidate into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@3208 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/pregelix/pregelix-runtime/pom.xml b/fullstack/pregelix/pregelix-runtime/pom.xml
index d12cb36..e75f98b 100644
--- a/fullstack/pregelix/pregelix-runtime/pom.xml
+++ b/fullstack/pregelix/pregelix-runtime/pom.xml
@@ -22,8 +22,9 @@
 				<artifactId>maven-compiler-plugin</artifactId>
 				<version>2.0.2</version>
 				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
+					<source>1.7</source>
+					<target>1.7</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
@@ -42,6 +43,7 @@
 			</plugin>
 			<plugin>
 				<artifactId>maven-clean-plugin</artifactId>
+				<version>2.5</version>
 				<configuration>
 					<filesets>
 						<fileset>
@@ -116,13 +118,6 @@
 			<version>0.2.3-SNAPSHOT</version>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-core</artifactId>
-			<version>0.20.2</version>
-			<type>jar</type>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-storage-am-common</artifactId>
 			<version>0.2.3-SNAPSHOT</version>
diff --git a/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java b/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
index e7caeaf..76c725e 100644
--- a/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
+++ b/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
@@ -34,7 +34,7 @@
 
     @Override
     public void stop() throws Exception {
-        LOGGER.info("Stopping Giraph NC Bootstrap");
+        LOGGER.info("Stopping NC Bootstrap");
         RuntimeContext rCtx = (RuntimeContext) appCtx.getApplicationObject();
         rCtx.close();
     }
diff --git a/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 105d3e2..f7958d9 100644
--- a/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -58,6 +59,8 @@
             private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
             private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
             private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
+            private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
+            private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
 
             // for writing out to message channel
             private IFrameWriter writerMsg;
@@ -82,6 +85,16 @@
             private ByteBuffer bufferGlobalAggregate;
             private GlobalAggregator aggregator;
 
+            // for writing out to insert vertex channel
+            private IFrameWriter writerInsert;
+            private FrameTupleAppender appenderInsert;
+            private ByteBuffer bufferInsert;
+
+            // for writing out to delete vertex channel
+            private IFrameWriter writerDelete;
+            private FrameTupleAppender appenderDelete;
+            private ByteBuffer bufferDelete;
+
             private Vertex vertex;
             private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
             private DataOutput output = new DataOutputStream(bbos);
@@ -90,11 +103,15 @@
             private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
             private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
             private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+            private Configuration conf;
+            private boolean dynamicStateLength;
 
             @Override
             public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
                     throws HyracksDataException {
-                this.aggregator = BspUtils.createGlobalAggregator(confFactory.createConfiguration());
+                this.conf = confFactory.createConfiguration();
+                this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
+                this.aggregator = BspUtils.createGlobalAggregator(conf);
                 this.aggregator.init();
 
                 this.writerMsg = writers[0];
@@ -114,8 +131,22 @@
                 this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
                 this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
 
-                if (writers.length > 3) {
-                    this.writerAlive = writers[3];
+                this.writerInsert = writers[3];
+                this.bufferInsert = ctx.allocateFrame();
+                this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderInsert.reset(bufferInsert, true);
+                this.writers.add(writerInsert);
+                this.appenders.add(appenderInsert);
+
+                this.writerDelete = writers[4];
+                this.bufferDelete = ctx.allocateFrame();
+                this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderDelete.reset(bufferDelete, true);
+                this.writers.add(writerDelete);
+                this.appenders.add(appenderDelete);
+
+                if (writers.length > 5) {
+                    this.writerAlive = writers[5];
                     this.bufferAlive = ctx.allocateFrame();
                     this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
                     this.appenderAlive.reset(bufferAlive, true);
@@ -125,6 +156,8 @@
                 }
 
                 tbs.add(tbMsg);
+                tbs.add(tbInsert);
+                tbs.add(tbDelete);
                 tbs.add(tbAlive);
             }
 
@@ -155,7 +188,7 @@
                 /**
                  * this partition should not terminate
                  */
-                if (terminate && (!vertex.isHalted() || vertex.hasMessage()))
+                if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
                     terminate = false;
 
                 aggregator.step(vertex);
@@ -164,6 +197,9 @@
             @Override
             public void close() throws HyracksDataException {
                 FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+                FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
+                FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
+
                 if (pushAlive)
                     FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
                 if (!terminate) {
@@ -177,7 +213,8 @@
             private void writeOutGlobalAggregate() throws HyracksDataException {
                 try {
                     /**
-                     * get partial aggregate result and flush to the final aggregator
+                     * get partial aggregate result and flush to the final
+                     * aggregator
                      */
                     Writable agg = aggregator.finishPartial();
                     agg.write(tbGlobalAggregate.getDataOutput());
@@ -203,15 +240,27 @@
             }
 
             @Override
-            public void update(ITupleReference tupleRef) throws HyracksDataException {
+            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
                 try {
                     if (vertex != null && vertex.hasUpdate()) {
-                        int fieldCount = tupleRef.getFieldCount();
-                        for (int i = 1; i < fieldCount; i++) {
-                            byte[] data = tupleRef.getFieldData(i);
-                            int offset = tupleRef.getFieldStart(i);
-                            bbos.setByteArray(data, offset);
-                            vertex.write(output);
+                        if (!dynamicStateLength) {
+                            // in-place update
+                            int fieldCount = tupleRef.getFieldCount();
+                            for (int i = 1; i < fieldCount; i++) {
+                                byte[] data = tupleRef.getFieldData(i);
+                                int offset = tupleRef.getFieldStart(i);
+                                bbos.setByteArray(data, offset);
+                                vertex.write(output);
+                            }
+                        } else {
+                            // write the vertex id
+                            DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+                            vertex.getVertexId().write(tbOutput);
+                            cloneUpdateTb.addFieldEndOffset();
+
+                            // write the vertex value
+                            vertex.write(tbOutput);
+                            cloneUpdateTb.addFieldEndOffset();
                         }
                     }
                 } catch (IOException e) {
diff --git a/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index f72b059..0cf64a0 100644
--- a/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -58,6 +59,8 @@
             private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
             private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
             private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
+            private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
+            private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
 
             // for writing out to message channel
             private IFrameWriter writerMsg;
@@ -76,12 +79,22 @@
             private ByteBuffer bufferGlobalAggregate;
             private GlobalAggregator aggregator;
 
-            //for writing out the global aggregate
+            // for writing out the global aggregate
             private IFrameWriter writerTerminate;
             private FrameTupleAppender appenderTerminate;
             private ByteBuffer bufferTerminate;
             private boolean terminate = true;
 
+            // for writing out to insert vertex channel
+            private IFrameWriter writerInsert;
+            private FrameTupleAppender appenderInsert;
+            private ByteBuffer bufferInsert;
+
+            // for writing out to delete vertex channel
+            private IFrameWriter writerDelete;
+            private FrameTupleAppender appenderDelete;
+            private ByteBuffer bufferDelete;
+
             // dummy empty msgList
             private MsgList msgList = new MsgList();
             private ArrayIterator msgIterator = new ArrayIterator();
@@ -93,11 +106,15 @@
             private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
             private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
             private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+            private Configuration conf;
+            private boolean dynamicStateLength;
 
             @Override
             public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
                     throws HyracksDataException {
-                this.aggregator = BspUtils.createGlobalAggregator(confFactory.createConfiguration());
+                this.conf = confFactory.createConfiguration();
+                this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
+                this.aggregator = BspUtils.createGlobalAggregator(conf);
                 this.aggregator.init();
 
                 this.writerMsg = writers[0];
@@ -117,8 +134,22 @@
                 this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
                 this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
 
-                if (writers.length > 3) {
-                    this.writerAlive = writers[3];
+                this.writerInsert = writers[3];
+                this.bufferInsert = ctx.allocateFrame();
+                this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderInsert.reset(bufferInsert, true);
+                this.writers.add(writerInsert);
+                this.appenders.add(appenderInsert);
+
+                this.writerDelete = writers[4];
+                this.bufferDelete = ctx.allocateFrame();
+                this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderDelete.reset(bufferDelete, true);
+                this.writers.add(writerDelete);
+                this.appenders.add(appenderDelete);
+
+                if (writers.length > 5) {
+                    this.writerAlive = writers[5];
                     this.bufferAlive = ctx.allocateFrame();
                     this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
                     this.appenderAlive.reset(bufferAlive, true);
@@ -129,6 +160,8 @@
                 msgList.reset(msgIterator);
 
                 tbs.add(tbMsg);
+                tbs.add(tbInsert);
+                tbs.add(tbDelete);
                 tbs.add(tbAlive);
             }
 
@@ -156,7 +189,7 @@
                 /**
                  * this partition should not terminate
                  */
-                if (terminate && (!vertex.isHalted() || vertex.hasMessage()))
+                if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
                     terminate = false;
 
                 /**
@@ -168,20 +201,24 @@
             @Override
             public void close() throws HyracksDataException {
                 FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+                FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
+                FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
+
                 if (pushAlive)
                     FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
                 if (!terminate) {
                     writeOutTerminationState();
                 }
-                
-                /**write out global aggregate value*/
+
+                /** write out global aggregate value */
                 writeOutGlobalAggregate();
             }
 
             private void writeOutGlobalAggregate() throws HyracksDataException {
                 try {
                     /**
-                     * get partial aggregate result and flush to the final aggregator
+                     * get partial aggregate result and flush to the final
+                     * aggregator
                      */
                     Writable agg = aggregator.finishPartial();
                     agg.write(tbGlobalAggregate.getDataOutput());
@@ -207,15 +244,27 @@
             }
 
             @Override
-            public void update(ITupleReference tupleRef) throws HyracksDataException {
+            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
                 try {
                     if (vertex != null && vertex.hasUpdate()) {
-                        int fieldCount = tupleRef.getFieldCount();
-                        for (int i = 1; i < fieldCount; i++) {
-                            byte[] data = tupleRef.getFieldData(i);
-                            int offset = tupleRef.getFieldStart(i);
-                            bbos.setByteArray(data, offset);
-                            vertex.write(output);
+                        if (!dynamicStateLength) {
+                            // in-place update
+                            int fieldCount = tupleRef.getFieldCount();
+                            for (int i = 1; i < fieldCount; i++) {
+                                byte[] data = tupleRef.getFieldData(i);
+                                int offset = tupleRef.getFieldStart(i);
+                                bbos.setByteArray(data, offset);
+                                vertex.write(output);
+                            }
+                        } else {
+                            // write the vertex id
+                            DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+                            vertex.getVertexId().write(tbOutput);
+                            cloneUpdateTb.addFieldEndOffset();
+
+                            // write the vertex value
+                            vertex.write(tbOutput);
+                            cloneUpdateTb.addFieldEndOffset();
                         }
                     }
                 } catch (IOException e) {
@@ -224,5 +273,4 @@
             }
         };
     }
-
 }
diff --git a/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index f7d0018..c025f85 100644
--- a/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -15,12 +15,12 @@
 package edu.uci.ics.pregelix.runtime.touchpoint;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
@@ -40,14 +40,13 @@
     public IRuntimeHook createRuntimeHook() {
 
         return new IRuntimeHook() {
+            private ContextFactory ctxFactory = new ContextFactory();
 
-            @SuppressWarnings({ "rawtypes", "unchecked" })
             @Override
             public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
                 Configuration conf = confFactory.createConfiguration();
                 try {
-                    Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
-                            null);
+                    TaskAttemptContext mapperContext = ctxFactory.createContext(conf, new TaskAttemptID());
                     Vertex.setContext(mapperContext);
                     BspUtils.setDefaultConfiguration(conf);
                 } catch (Exception e) {