cross merge fullstack_release_candidate into trunk
git-svn-id: https://hyracks.googlecode.com/svn/trunk@3208 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/pregelix/pregelix-runtime/pom.xml b/fullstack/pregelix/pregelix-runtime/pom.xml
index d12cb36..e75f98b 100644
--- a/fullstack/pregelix/pregelix-runtime/pom.xml
+++ b/fullstack/pregelix/pregelix-runtime/pom.xml
@@ -22,8 +22,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
</configuration>
</plugin>
<plugin>
@@ -42,6 +43,7 @@
</plugin>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
+ <version>2.5</version>
<configuration>
<filesets>
<fileset>
@@ -116,13 +118,6 @@
<version>0.2.3-SNAPSHOT</version>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-common</artifactId>
<version>0.2.3-SNAPSHOT</version>
diff --git a/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java b/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
index e7caeaf..76c725e 100644
--- a/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
+++ b/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
@@ -34,7 +34,7 @@
@Override
public void stop() throws Exception {
- LOGGER.info("Stopping Giraph NC Bootstrap");
+ LOGGER.info("Stopping NC Bootstrap");
RuntimeContext rCtx = (RuntimeContext) appCtx.getApplicationObject();
rCtx.close();
}
diff --git a/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 105d3e2..f7958d9 100644
--- a/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -58,6 +59,8 @@
private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
+ private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
// for writing out to message channel
private IFrameWriter writerMsg;
@@ -82,6 +85,16 @@
private ByteBuffer bufferGlobalAggregate;
private GlobalAggregator aggregator;
+ // for writing out to insert vertex channel
+ private IFrameWriter writerInsert;
+ private FrameTupleAppender appenderInsert;
+ private ByteBuffer bufferInsert;
+
+ // for writing out to delete vertex channel
+ private IFrameWriter writerDelete;
+ private FrameTupleAppender appenderDelete;
+ private ByteBuffer bufferDelete;
+
private Vertex vertex;
private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
private DataOutput output = new DataOutputStream(bbos);
@@ -90,11 +103,15 @@
private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+ private Configuration conf;
+ private boolean dynamicStateLength;
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
- this.aggregator = BspUtils.createGlobalAggregator(confFactory.createConfiguration());
+ this.conf = confFactory.createConfiguration();
+ this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
+ this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
this.writerMsg = writers[0];
@@ -114,8 +131,22 @@
this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
- if (writers.length > 3) {
- this.writerAlive = writers[3];
+ this.writerInsert = writers[3];
+ this.bufferInsert = ctx.allocateFrame();
+ this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderInsert.reset(bufferInsert, true);
+ this.writers.add(writerInsert);
+ this.appenders.add(appenderInsert);
+
+ this.writerDelete = writers[4];
+ this.bufferDelete = ctx.allocateFrame();
+ this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderDelete.reset(bufferDelete, true);
+ this.writers.add(writerDelete);
+ this.appenders.add(appenderDelete);
+
+ if (writers.length > 5) {
+ this.writerAlive = writers[5];
this.bufferAlive = ctx.allocateFrame();
this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
this.appenderAlive.reset(bufferAlive, true);
@@ -125,6 +156,8 @@
}
tbs.add(tbMsg);
+ tbs.add(tbInsert);
+ tbs.add(tbDelete);
tbs.add(tbAlive);
}
@@ -155,7 +188,7 @@
/**
* this partition should not terminate
*/
- if (terminate && (!vertex.isHalted() || vertex.hasMessage()))
+ if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
terminate = false;
aggregator.step(vertex);
@@ -164,6 +197,9 @@
@Override
public void close() throws HyracksDataException {
FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+ FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
+ FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
+
if (pushAlive)
FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
if (!terminate) {
@@ -177,7 +213,8 @@
private void writeOutGlobalAggregate() throws HyracksDataException {
try {
/**
- * get partial aggregate result and flush to the final aggregator
+ * get partial aggregate result and flush to the final
+ * aggregator
*/
Writable agg = aggregator.finishPartial();
agg.write(tbGlobalAggregate.getDataOutput());
@@ -203,15 +240,27 @@
}
@Override
- public void update(ITupleReference tupleRef) throws HyracksDataException {
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
try {
if (vertex != null && vertex.hasUpdate()) {
- int fieldCount = tupleRef.getFieldCount();
- for (int i = 1; i < fieldCount; i++) {
- byte[] data = tupleRef.getFieldData(i);
- int offset = tupleRef.getFieldStart(i);
- bbos.setByteArray(data, offset);
- vertex.write(output);
+ if (!dynamicStateLength) {
+ // in-place update
+ int fieldCount = tupleRef.getFieldCount();
+ for (int i = 1; i < fieldCount; i++) {
+ byte[] data = tupleRef.getFieldData(i);
+ int offset = tupleRef.getFieldStart(i);
+ bbos.setByteArray(data, offset);
+ vertex.write(output);
+ }
+ } else {
+ // write the vertex id
+ DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+ vertex.getVertexId().write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
+
+ // write the vertex value
+ vertex.write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
}
}
} catch (IOException e) {
diff --git a/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index f72b059..0cf64a0 100644
--- a/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -58,6 +59,8 @@
private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
+ private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
// for writing out to message channel
private IFrameWriter writerMsg;
@@ -76,12 +79,22 @@
private ByteBuffer bufferGlobalAggregate;
private GlobalAggregator aggregator;
- //for writing out the global aggregate
+ // for writing out the global aggregate
private IFrameWriter writerTerminate;
private FrameTupleAppender appenderTerminate;
private ByteBuffer bufferTerminate;
private boolean terminate = true;
+ // for writing out to insert vertex channel
+ private IFrameWriter writerInsert;
+ private FrameTupleAppender appenderInsert;
+ private ByteBuffer bufferInsert;
+
+ // for writing out to delete vertex channel
+ private IFrameWriter writerDelete;
+ private FrameTupleAppender appenderDelete;
+ private ByteBuffer bufferDelete;
+
// dummy empty msgList
private MsgList msgList = new MsgList();
private ArrayIterator msgIterator = new ArrayIterator();
@@ -93,11 +106,15 @@
private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+ private Configuration conf;
+ private boolean dynamicStateLength;
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
- this.aggregator = BspUtils.createGlobalAggregator(confFactory.createConfiguration());
+ this.conf = confFactory.createConfiguration();
+ this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
+ this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
this.writerMsg = writers[0];
@@ -117,8 +134,22 @@
this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
- if (writers.length > 3) {
- this.writerAlive = writers[3];
+ this.writerInsert = writers[3];
+ this.bufferInsert = ctx.allocateFrame();
+ this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderInsert.reset(bufferInsert, true);
+ this.writers.add(writerInsert);
+ this.appenders.add(appenderInsert);
+
+ this.writerDelete = writers[4];
+ this.bufferDelete = ctx.allocateFrame();
+ this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderDelete.reset(bufferDelete, true);
+ this.writers.add(writerDelete);
+ this.appenders.add(appenderDelete);
+
+ if (writers.length > 5) {
+ this.writerAlive = writers[5];
this.bufferAlive = ctx.allocateFrame();
this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
this.appenderAlive.reset(bufferAlive, true);
@@ -129,6 +160,8 @@
msgList.reset(msgIterator);
tbs.add(tbMsg);
+ tbs.add(tbInsert);
+ tbs.add(tbDelete);
tbs.add(tbAlive);
}
@@ -156,7 +189,7 @@
/**
* this partition should not terminate
*/
- if (terminate && (!vertex.isHalted() || vertex.hasMessage()))
+ if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
terminate = false;
/**
@@ -168,20 +201,24 @@
@Override
public void close() throws HyracksDataException {
FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+ FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
+ FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
+
if (pushAlive)
FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
if (!terminate) {
writeOutTerminationState();
}
-
- /**write out global aggregate value*/
+
+ /** write out global aggregate value */
writeOutGlobalAggregate();
}
private void writeOutGlobalAggregate() throws HyracksDataException {
try {
/**
- * get partial aggregate result and flush to the final aggregator
+ * get partial aggregate result and flush to the final
+ * aggregator
*/
Writable agg = aggregator.finishPartial();
agg.write(tbGlobalAggregate.getDataOutput());
@@ -207,15 +244,27 @@
}
@Override
- public void update(ITupleReference tupleRef) throws HyracksDataException {
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
try {
if (vertex != null && vertex.hasUpdate()) {
- int fieldCount = tupleRef.getFieldCount();
- for (int i = 1; i < fieldCount; i++) {
- byte[] data = tupleRef.getFieldData(i);
- int offset = tupleRef.getFieldStart(i);
- bbos.setByteArray(data, offset);
- vertex.write(output);
+ if (!dynamicStateLength) {
+ // in-place update
+ int fieldCount = tupleRef.getFieldCount();
+ for (int i = 1; i < fieldCount; i++) {
+ byte[] data = tupleRef.getFieldData(i);
+ int offset = tupleRef.getFieldStart(i);
+ bbos.setByteArray(data, offset);
+ vertex.write(output);
+ }
+ } else {
+ // write the vertex id
+ DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+ vertex.getVertexId().write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
+
+ // write the vertex value
+ vertex.write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
}
}
} catch (IOException e) {
@@ -224,5 +273,4 @@
}
};
}
-
}
diff --git a/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index f7d0018..c025f85 100644
--- a/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/fullstack/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -15,12 +15,12 @@
package edu.uci.ics.pregelix.runtime.touchpoint;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
@@ -40,14 +40,13 @@
public IRuntimeHook createRuntimeHook() {
return new IRuntimeHook() {
+ private ContextFactory ctxFactory = new ContextFactory();
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
Configuration conf = confFactory.createConfiguration();
try {
- Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
- null);
+ TaskAttemptContext mapperContext = ctxFactory.createContext(conf, new TaskAttemptID());
Vertex.setContext(mapperContext);
BspUtils.setDefaultConfiguration(conf);
} catch (Exception e) {