reformat the code in in r3269
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3270 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index af95064..0c09757 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -44,249 +44,233 @@
@SuppressWarnings({ "rawtypes", "unchecked" })
public class ComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
- private static final long serialVersionUID = 1L;
- private final IConfigurationFactory confFactory;
+ private static final long serialVersionUID = 1L;
+ private final IConfigurationFactory confFactory;
- public ComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
- this.confFactory = confFactory;
- }
+ public ComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
+ this.confFactory = confFactory;
+ }
- @Override
- public IUpdateFunction createFunction() {
- return new IUpdateFunction() {
- // for writing intermediate data
- private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
- private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
- private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(
- 1);
- private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(
- 1);
- private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
- private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
+ @Override
+ public IUpdateFunction createFunction() {
+ return new IUpdateFunction() {
+ // for writing intermediate data
+ private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
+ private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
+ private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
- // for writing out to message channel
- private IFrameWriter writerMsg;
- private FrameTupleAppender appenderMsg;
- private ByteBuffer bufferMsg;
+ // for writing out to message channel
+ private IFrameWriter writerMsg;
+ private FrameTupleAppender appenderMsg;
+ private ByteBuffer bufferMsg;
- // for writing out to alive message channel
- private IFrameWriter writerAlive;
- private FrameTupleAppender appenderAlive;
- private ByteBuffer bufferAlive;
- private boolean pushAlive;
+ // for writing out to alive message channel
+ private IFrameWriter writerAlive;
+ private FrameTupleAppender appenderAlive;
+ private ByteBuffer bufferAlive;
+ private boolean pushAlive;
- // for writing out termination detection control channel
- private IFrameWriter writerTerminate;
- private FrameTupleAppender appenderTerminate;
- private ByteBuffer bufferTerminate;
- private boolean terminate = true;
+ // for writing out termination detection control channel
+ private IFrameWriter writerTerminate;
+ private FrameTupleAppender appenderTerminate;
+ private ByteBuffer bufferTerminate;
+ private boolean terminate = true;
- // for writing out termination detection control channel
- private IFrameWriter writerGlobalAggregate;
- private FrameTupleAppender appenderGlobalAggregate;
- private ByteBuffer bufferGlobalAggregate;
- private GlobalAggregator aggregator;
+ // for writing out termination detection control channel
+ private IFrameWriter writerGlobalAggregate;
+ private FrameTupleAppender appenderGlobalAggregate;
+ private ByteBuffer bufferGlobalAggregate;
+ private GlobalAggregator aggregator;
- // for writing out to insert vertex channel
- private IFrameWriter writerInsert;
- private FrameTupleAppender appenderInsert;
- private ByteBuffer bufferInsert;
+ // for writing out to insert vertex channel
+ private IFrameWriter writerInsert;
+ private FrameTupleAppender appenderInsert;
+ private ByteBuffer bufferInsert;
- // for writing out to delete vertex channel
- private IFrameWriter writerDelete;
- private FrameTupleAppender appenderDelete;
- private ByteBuffer bufferDelete;
+ // for writing out to delete vertex channel
+ private IFrameWriter writerDelete;
+ private FrameTupleAppender appenderDelete;
+ private ByteBuffer bufferDelete;
- private Vertex vertex;
- private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
- private DataOutput output = new DataOutputStream(bbos);
+ private Vertex vertex;
+ private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
+ private DataOutput output = new DataOutputStream(bbos);
- private ArrayIterator msgIterator = new ArrayIterator();
- private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
- private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
- private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
- private Configuration conf;
- private boolean dynamicStateLength;
+ private ArrayIterator msgIterator = new ArrayIterator();
+ private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
+ private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
+ private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+ private Configuration conf;
+ private boolean dynamicStateLength;
- @Override
- public void open(IHyracksTaskContext ctx, RecordDescriptor rd,
- IFrameWriter... writers) throws HyracksDataException {
- this.conf = confFactory.createConfiguration();
- this.dynamicStateLength = BspUtils
- .getDynamicVertexValueSize(conf);
- this.aggregator = BspUtils.createGlobalAggregator(conf);
- this.aggregator.init();
+ @Override
+ public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
+ throws HyracksDataException {
+ this.conf = confFactory.createConfiguration();
+ this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
+ this.aggregator = BspUtils.createGlobalAggregator(conf);
+ this.aggregator.init();
- this.writerMsg = writers[0];
- this.bufferMsg = ctx.allocateFrame();
- this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderMsg.reset(bufferMsg, true);
- this.writers.add(writerMsg);
- this.appenders.add(appenderMsg);
+ this.writerMsg = writers[0];
+ this.bufferMsg = ctx.allocateFrame();
+ this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderMsg.reset(bufferMsg, true);
+ this.writers.add(writerMsg);
+ this.appenders.add(appenderMsg);
- this.writerTerminate = writers[1];
- this.bufferTerminate = ctx.allocateFrame();
- this.appenderTerminate = new FrameTupleAppender(
- ctx.getFrameSize());
- this.appenderTerminate.reset(bufferTerminate, true);
+ this.writerTerminate = writers[1];
+ this.bufferTerminate = ctx.allocateFrame();
+ this.appenderTerminate = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderTerminate.reset(bufferTerminate, true);
- this.writerGlobalAggregate = writers[2];
- this.bufferGlobalAggregate = ctx.allocateFrame();
- this.appenderGlobalAggregate = new FrameTupleAppender(
- ctx.getFrameSize());
- this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+ this.writerGlobalAggregate = writers[2];
+ this.bufferGlobalAggregate = ctx.allocateFrame();
+ this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
- this.writerInsert = writers[3];
- this.bufferInsert = ctx.allocateFrame();
- this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderInsert.reset(bufferInsert, true);
- this.writers.add(writerInsert);
- this.appenders.add(appenderInsert);
+ this.writerInsert = writers[3];
+ this.bufferInsert = ctx.allocateFrame();
+ this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderInsert.reset(bufferInsert, true);
+ this.writers.add(writerInsert);
+ this.appenders.add(appenderInsert);
- this.writerDelete = writers[4];
- this.bufferDelete = ctx.allocateFrame();
- this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderDelete.reset(bufferDelete, true);
- this.writers.add(writerDelete);
- this.appenders.add(appenderDelete);
+ this.writerDelete = writers[4];
+ this.bufferDelete = ctx.allocateFrame();
+ this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderDelete.reset(bufferDelete, true);
+ this.writers.add(writerDelete);
+ this.appenders.add(appenderDelete);
- if (writers.length > 5) {
- this.writerAlive = writers[5];
- this.bufferAlive = ctx.allocateFrame();
- this.appenderAlive = new FrameTupleAppender(
- ctx.getFrameSize());
- this.appenderAlive.reset(bufferAlive, true);
- this.pushAlive = true;
- this.writers.add(writerAlive);
- this.appenders.add(appenderAlive);
- }
+ if (writers.length > 5) {
+ this.writerAlive = writers[5];
+ this.bufferAlive = ctx.allocateFrame();
+ this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderAlive.reset(bufferAlive, true);
+ this.pushAlive = true;
+ this.writers.add(writerAlive);
+ this.appenders.add(appenderAlive);
+ }
- tbs.add(tbMsg);
- tbs.add(tbInsert);
- tbs.add(tbDelete);
- tbs.add(tbAlive);
- }
+ tbs.add(tbMsg);
+ tbs.add(tbInsert);
+ tbs.add(tbDelete);
+ tbs.add(tbAlive);
+ }
- @Override
- public void process(Object[] tuple) throws HyracksDataException {
- // vertex Id, msg content List, vertex Id, vertex
- tbMsg.reset();
- tbAlive.reset();
+ @Override
+ public void process(Object[] tuple) throws HyracksDataException {
+ // vertex Id, msg content List, vertex Id, vertex
+ tbMsg.reset();
+ tbAlive.reset();
- vertex = (Vertex) tuple[3];
- vertex.setOutputWriters(writers);
- vertex.setOutputAppenders(appenders);
- vertex.setOutputTupleBuilders(tbs);
+ vertex = (Vertex) tuple[3];
+ vertex.setOutputWriters(writers);
+ vertex.setOutputAppenders(appenders);
+ vertex.setOutputTupleBuilders(tbs);
- ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
- msgContentList.reset(msgIterator);
+ ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
+ msgContentList.reset(msgIterator);
- if (!msgIterator.hasNext() && vertex.isHalted()) {
- return;
- }
- if (vertex.isHalted()) {
- vertex.activate();
- }
+ if (!msgIterator.hasNext() && vertex.isHalted()) {
+ return;
+ }
+ if (vertex.isHalted()) {
+ vertex.activate();
+ }
- try {
- vertex.compute(msgIterator);
- vertex.finishCompute();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
+ try {
+ vertex.compute(msgIterator);
+ vertex.finishCompute();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
- /**
- * this partition should not terminate
- */
- if (terminate
- && (!vertex.isHalted() || vertex.hasMessage() || vertex
- .createdNewLiveVertex()))
- terminate = false;
+ /**
+ * this partition should not terminate
+ */
+ if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
+ terminate = false;
- aggregator.step(vertex);
- }
+ aggregator.step(vertex);
+ }
- @Override
- public void close() throws HyracksDataException {
- FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
- FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
- FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
+ @Override
+ public void close() throws HyracksDataException {
+ FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+ FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
+ FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
- if (pushAlive)
- FrameTupleUtils
- .flushTuplesFinal(appenderAlive, writerAlive);
- if (!terminate) {
- writeOutTerminationState();
- }
+ if (pushAlive)
+ FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
+ if (!terminate) {
+ writeOutTerminationState();
+ }
- /** write out global aggregate value */
- writeOutGlobalAggregate();
- }
+ /** write out global aggregate value */
+ writeOutGlobalAggregate();
+ }
- private void writeOutGlobalAggregate() throws HyracksDataException {
- try {
- /**
- * get partial aggregate result and flush to the final
- * aggregator
- */
- Writable agg = aggregator.finishPartial();
- agg.write(tbGlobalAggregate.getDataOutput());
- tbGlobalAggregate.addFieldEndOffset();
- appenderGlobalAggregate.append(
- tbGlobalAggregate.getFieldEndOffsets(),
- tbGlobalAggregate.getByteArray(), 0,
- tbGlobalAggregate.getSize());
- FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate,
- writerGlobalAggregate);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
+ private void writeOutGlobalAggregate() throws HyracksDataException {
+ try {
+ /**
+ * get partial aggregate result and flush to the final
+ * aggregator
+ */
+ Writable agg = aggregator.finishPartial();
+ agg.write(tbGlobalAggregate.getDataOutput());
+ tbGlobalAggregate.addFieldEndOffset();
+ appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
+ tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
+ FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- private void writeOutTerminationState() throws HyracksDataException {
- try {
- tbTerminate.getDataOutput().writeLong(0);
- tbTerminate.addFieldEndOffset();
- appenderTerminate.append(tbTerminate.getFieldEndOffsets(),
- tbTerminate.getByteArray(), 0,
- tbTerminate.getSize());
- FrameTupleUtils.flushTuplesFinal(appenderTerminate,
- writerTerminate);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
+ private void writeOutTerminationState() throws HyracksDataException {
+ try {
+ tbTerminate.getDataOutput().writeLong(0);
+ tbTerminate.addFieldEndOffset();
+ appenderTerminate.append(tbTerminate.getFieldEndOffsets(), tbTerminate.getByteArray(), 0,
+ tbTerminate.getSize());
+ FrameTupleUtils.flushTuplesFinal(appenderTerminate, writerTerminate);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- @Override
- public void update(ITupleReference tupleRef,
- ArrayTupleBuilder cloneUpdateTb)
- throws HyracksDataException {
- try {
- if (vertex != null && vertex.hasUpdate()) {
- if (!dynamicStateLength) {
- // in-place update
- int fieldCount = tupleRef.getFieldCount();
- for (int i = 1; i < fieldCount; i++) {
- byte[] data = tupleRef.getFieldData(i);
- int offset = tupleRef.getFieldStart(i);
- bbos.setByteArray(data, offset);
- vertex.write(output);
- }
- } else {
- // write the vertex id
- DataOutput tbOutput = cloneUpdateTb.getDataOutput();
- vertex.getVertexId().write(tbOutput);
- cloneUpdateTb.addFieldEndOffset();
+ @Override
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+ try {
+ if (vertex != null && vertex.hasUpdate()) {
+ if (!dynamicStateLength) {
+ // in-place update
+ int fieldCount = tupleRef.getFieldCount();
+ for (int i = 1; i < fieldCount; i++) {
+ byte[] data = tupleRef.getFieldData(i);
+ int offset = tupleRef.getFieldStart(i);
+ bbos.setByteArray(data, offset);
+ vertex.write(output);
+ }
+ } else {
+ // write the vertex id
+ DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+ vertex.getVertexId().write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
- // write the vertex value
- vertex.write(tbOutput);
- cloneUpdateTb.addFieldEndOffset();
- }
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
- };
- }
+ // write the vertex value
+ vertex.write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
+ }
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index a241c9c..1bf6a2b 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -43,255 +43,238 @@
import edu.uci.ics.pregelix.dataflow.util.ResetableByteArrayOutputStream;
@SuppressWarnings({ "rawtypes", "unchecked" })
-public class StartComputeUpdateFunctionFactory implements
- IUpdateFunctionFactory {
- private static final long serialVersionUID = 1L;
- private final IConfigurationFactory confFactory;
+public class StartComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
+ private static final long serialVersionUID = 1L;
+ private final IConfigurationFactory confFactory;
- public StartComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
- this.confFactory = confFactory;
- }
+ public StartComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
+ this.confFactory = confFactory;
+ }
- @Override
- public IUpdateFunction createFunction() {
- return new IUpdateFunction() {
- // for writing intermediate data
- private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
- private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
- private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(
- 1);
- private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(
- 1);
- private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
- private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
+ @Override
+ public IUpdateFunction createFunction() {
+ return new IUpdateFunction() {
+ // for writing intermediate data
+ private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
+ private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
+ private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
- // for writing out to message channel
- private IFrameWriter writerMsg;
- private FrameTupleAppender appenderMsg;
- private ByteBuffer bufferMsg;
+ // for writing out to message channel
+ private IFrameWriter writerMsg;
+ private FrameTupleAppender appenderMsg;
+ private ByteBuffer bufferMsg;
- // for writing out to alive message channel
- private IFrameWriter writerAlive;
- private FrameTupleAppender appenderAlive;
- private ByteBuffer bufferAlive;
- private boolean pushAlive;
+ // for writing out to alive message channel
+ private IFrameWriter writerAlive;
+ private FrameTupleAppender appenderAlive;
+ private ByteBuffer bufferAlive;
+ private boolean pushAlive;
- // for writing out termination detection control channel
- private IFrameWriter writerGlobalAggregate;
- private FrameTupleAppender appenderGlobalAggregate;
- private ByteBuffer bufferGlobalAggregate;
- private GlobalAggregator aggregator;
+ // for writing out termination detection control channel
+ private IFrameWriter writerGlobalAggregate;
+ private FrameTupleAppender appenderGlobalAggregate;
+ private ByteBuffer bufferGlobalAggregate;
+ private GlobalAggregator aggregator;
- // for writing out the global aggregate
- private IFrameWriter writerTerminate;
- private FrameTupleAppender appenderTerminate;
- private ByteBuffer bufferTerminate;
- private boolean terminate = true;
+ // for writing out the global aggregate
+ private IFrameWriter writerTerminate;
+ private FrameTupleAppender appenderTerminate;
+ private ByteBuffer bufferTerminate;
+ private boolean terminate = true;
- // for writing out to insert vertex channel
- private IFrameWriter writerInsert;
- private FrameTupleAppender appenderInsert;
- private ByteBuffer bufferInsert;
+ // for writing out to insert vertex channel
+ private IFrameWriter writerInsert;
+ private FrameTupleAppender appenderInsert;
+ private ByteBuffer bufferInsert;
- // for writing out to delete vertex channel
- private IFrameWriter writerDelete;
- private FrameTupleAppender appenderDelete;
- private ByteBuffer bufferDelete;
+ // for writing out to delete vertex channel
+ private IFrameWriter writerDelete;
+ private FrameTupleAppender appenderDelete;
+ private ByteBuffer bufferDelete;
- // dummy empty msgList
- private MsgList msgList = new MsgList();
- private ArrayIterator msgIterator = new ArrayIterator();
+ // dummy empty msgList
+ private MsgList msgList = new MsgList();
+ private ArrayIterator msgIterator = new ArrayIterator();
- private Vertex vertex;
- private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
- private DataOutput output = new DataOutputStream(bbos);
+ private Vertex vertex;
+ private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
+ private DataOutput output = new DataOutputStream(bbos);
- private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
- private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
- private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
- private Configuration conf;
- private boolean dynamicStateLength;
+ private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
+ private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
+ private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+ private Configuration conf;
+ private boolean dynamicStateLength;
- @Override
- public void open(IHyracksTaskContext ctx, RecordDescriptor rd,
- IFrameWriter... writers) throws HyracksDataException {
- this.conf = confFactory.createConfiguration();
- this.dynamicStateLength = BspUtils
- .getDynamicVertexValueSize(conf);
- this.aggregator = BspUtils.createGlobalAggregator(conf);
- this.aggregator.init();
+ @Override
+ public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
+ throws HyracksDataException {
+ this.conf = confFactory.createConfiguration();
+ this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
+ this.aggregator = BspUtils.createGlobalAggregator(conf);
+ this.aggregator.init();
- this.writerMsg = writers[0];
- this.bufferMsg = ctx.allocateFrame();
- this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderMsg.reset(bufferMsg, true);
- this.writers.add(writerMsg);
- this.appenders.add(appenderMsg);
+ this.writerMsg = writers[0];
+ this.bufferMsg = ctx.allocateFrame();
+ this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderMsg.reset(bufferMsg, true);
+ this.writers.add(writerMsg);
+ this.appenders.add(appenderMsg);
- this.writerTerminate = writers[1];
- this.bufferTerminate = ctx.allocateFrame();
- this.appenderTerminate = new FrameTupleAppender(
- ctx.getFrameSize());
- this.appenderTerminate.reset(bufferTerminate, true);
+ this.writerTerminate = writers[1];
+ this.bufferTerminate = ctx.allocateFrame();
+ this.appenderTerminate = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderTerminate.reset(bufferTerminate, true);
- this.writerGlobalAggregate = writers[2];
- this.bufferGlobalAggregate = ctx.allocateFrame();
- this.appenderGlobalAggregate = new FrameTupleAppender(
- ctx.getFrameSize());
- this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+ this.writerGlobalAggregate = writers[2];
+ this.bufferGlobalAggregate = ctx.allocateFrame();
+ this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
- this.writerInsert = writers[3];
- this.bufferInsert = ctx.allocateFrame();
- this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderInsert.reset(bufferInsert, true);
- this.writers.add(writerInsert);
- this.appenders.add(appenderInsert);
+ this.writerInsert = writers[3];
+ this.bufferInsert = ctx.allocateFrame();
+ this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderInsert.reset(bufferInsert, true);
+ this.writers.add(writerInsert);
+ this.appenders.add(appenderInsert);
- this.writerDelete = writers[4];
- this.bufferDelete = ctx.allocateFrame();
- this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderDelete.reset(bufferDelete, true);
- this.writers.add(writerDelete);
- this.appenders.add(appenderDelete);
+ this.writerDelete = writers[4];
+ this.bufferDelete = ctx.allocateFrame();
+ this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderDelete.reset(bufferDelete, true);
+ this.writers.add(writerDelete);
+ this.appenders.add(appenderDelete);
- if (writers.length > 5) {
- this.writerAlive = writers[5];
- this.bufferAlive = ctx.allocateFrame();
- this.appenderAlive = new FrameTupleAppender(
- ctx.getFrameSize());
- this.appenderAlive.reset(bufferAlive, true);
- this.pushAlive = true;
- this.writers.add(writerAlive);
- this.appenders.add(appenderAlive);
- }
- msgList.reset(msgIterator);
+ if (writers.length > 5) {
+ this.writerAlive = writers[5];
+ this.bufferAlive = ctx.allocateFrame();
+ this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderAlive.reset(bufferAlive, true);
+ this.pushAlive = true;
+ this.writers.add(writerAlive);
+ this.appenders.add(appenderAlive);
+ }
+ msgList.reset(msgIterator);
- tbs.add(tbMsg);
- tbs.add(tbInsert);
- tbs.add(tbDelete);
- tbs.add(tbAlive);
- }
+ tbs.add(tbMsg);
+ tbs.add(tbInsert);
+ tbs.add(tbDelete);
+ tbs.add(tbAlive);
+ }
- @Override
- public void process(Object[] tuple) throws HyracksDataException {
- // vertex Id, vertex
- tbMsg.reset();
- tbAlive.reset();
+ @Override
+ public void process(Object[] tuple) throws HyracksDataException {
+ // vertex Id, vertex
+ tbMsg.reset();
+ tbAlive.reset();
- vertex = (Vertex) tuple[1];
- vertex.setOutputWriters(writers);
- vertex.setOutputAppenders(appenders);
- vertex.setOutputTupleBuilders(tbs);
+ vertex = (Vertex) tuple[1];
+ vertex.setOutputWriters(writers);
+ vertex.setOutputAppenders(appenders);
+ vertex.setOutputTupleBuilders(tbs);
- if (!msgIterator.hasNext() && vertex.isHalted()) {
- return;
- }
- if (vertex.isHalted()) {
- vertex.activate();
- }
+ if (!msgIterator.hasNext() && vertex.isHalted()) {
+ return;
+ }
+ if (vertex.isHalted()) {
+ vertex.activate();
+ }
- try {
- vertex.compute(msgIterator);
- vertex.finishCompute();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
+ try {
+ vertex.compute(msgIterator);
+ vertex.finishCompute();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
- /**
- * this partition should not terminate
- */
- if (terminate
- && (!vertex.isHalted() || vertex.hasMessage() || vertex
- .createdNewLiveVertex()))
- terminate = false;
+ /**
+ * this partition should not terminate
+ */
+ if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
+ terminate = false;
- /**
- * call the global aggregator
- */
- aggregator.step(vertex);
- }
+ /**
+ * call the global aggregator
+ */
+ aggregator.step(vertex);
+ }
- @Override
- public void close() throws HyracksDataException {
- FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
- FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
- FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
+ @Override
+ public void close() throws HyracksDataException {
+ FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+ FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
+ FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
- if (pushAlive)
- FrameTupleUtils
- .flushTuplesFinal(appenderAlive, writerAlive);
- if (!terminate) {
- writeOutTerminationState();
- }
+ if (pushAlive)
+ FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
+ if (!terminate) {
+ writeOutTerminationState();
+ }
- /** write out global aggregate value */
- writeOutGlobalAggregate();
- }
+ /** write out global aggregate value */
+ writeOutGlobalAggregate();
+ }
- private void writeOutGlobalAggregate() throws HyracksDataException {
- try {
- /**
- * get partial aggregate result and flush to the final
- * aggregator
- */
- Writable agg = aggregator.finishPartial();
- agg.write(tbGlobalAggregate.getDataOutput());
- tbGlobalAggregate.addFieldEndOffset();
- appenderGlobalAggregate.append(
- tbGlobalAggregate.getFieldEndOffsets(),
- tbGlobalAggregate.getByteArray(), 0,
- tbGlobalAggregate.getSize());
- FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate,
- writerGlobalAggregate);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
+ private void writeOutGlobalAggregate() throws HyracksDataException {
+ try {
+ /**
+ * get partial aggregate result and flush to the final
+ * aggregator
+ */
+ Writable agg = aggregator.finishPartial();
+ agg.write(tbGlobalAggregate.getDataOutput());
+ tbGlobalAggregate.addFieldEndOffset();
+ appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
+ tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
+ FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- private void writeOutTerminationState() throws HyracksDataException {
- try {
- tbTerminate.getDataOutput().writeLong(0);
- tbTerminate.addFieldEndOffset();
- appenderTerminate.append(tbTerminate.getFieldEndOffsets(),
- tbTerminate.getByteArray(), 0,
- tbTerminate.getSize());
- FrameTupleUtils.flushTuplesFinal(appenderTerminate,
- writerTerminate);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
+ private void writeOutTerminationState() throws HyracksDataException {
+ try {
+ tbTerminate.getDataOutput().writeLong(0);
+ tbTerminate.addFieldEndOffset();
+ appenderTerminate.append(tbTerminate.getFieldEndOffsets(), tbTerminate.getByteArray(), 0,
+ tbTerminate.getSize());
+ FrameTupleUtils.flushTuplesFinal(appenderTerminate, writerTerminate);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- @Override
- public void update(ITupleReference tupleRef,
- ArrayTupleBuilder cloneUpdateTb)
- throws HyracksDataException {
- try {
- if (vertex != null && vertex.hasUpdate()) {
- if (!dynamicStateLength) {
- // in-place update
- int fieldCount = tupleRef.getFieldCount();
- for (int i = 1; i < fieldCount; i++) {
- byte[] data = tupleRef.getFieldData(i);
- int offset = tupleRef.getFieldStart(i);
- bbos.setByteArray(data, offset);
- vertex.write(output);
- }
- } else {
- // write the vertex id
- DataOutput tbOutput = cloneUpdateTb.getDataOutput();
- vertex.getVertexId().write(tbOutput);
- cloneUpdateTb.addFieldEndOffset();
+ @Override
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+ try {
+ if (vertex != null && vertex.hasUpdate()) {
+ if (!dynamicStateLength) {
+ // in-place update
+ int fieldCount = tupleRef.getFieldCount();
+ for (int i = 1; i < fieldCount; i++) {
+ byte[] data = tupleRef.getFieldData(i);
+ int offset = tupleRef.getFieldStart(i);
+ bbos.setByteArray(data, offset);
+ vertex.write(output);
+ }
+ } else {
+ // write the vertex id
+ DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+ vertex.getVertexId().write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
- // write the vertex value
- vertex.write(tbOutput);
- cloneUpdateTb.addFieldEndOffset();
- }
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
- };
- }
+ // write the vertex value
+ vertex.write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
+ }
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
}