merge from zheilbron/hyracks_msr
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index 54e2256..6564eb0 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -21,7 +21,7 @@
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
</parent>
@@ -88,89 +88,89 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-api</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-dataflow-std</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>pregelix-dataflow</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-std</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-data-std</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-btree</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-lsm-common</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-cc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-nc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-ipc</artifactId>
- <version>0.2.7-SNAPSHOT</version>
+ <version>0.2.10-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 16ecf6c..d46457c 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -32,8 +32,8 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.util.ArrayListWritable;
import edu.uci.ics.pregelix.api.util.ArrayListWritable.ArrayIterator;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
@@ -168,11 +168,16 @@
tbAlive.reset();
vertex = (Vertex) tuple[3];
+
+ if (vertex.isPartitionTerminated()) {
+ vertex.voteToHalt();
+ return;
+ }
vertex.setOutputWriters(writers);
vertex.setOutputAppenders(appenders);
vertex.setOutputTupleBuilders(tbs);
- ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
+ MsgList msgContentList = (MsgList) tuple[1];
msgContentList.reset(msgIterator);
if (!msgIterator.hasNext() && vertex.isHalted()) {
@@ -183,9 +188,15 @@
}
try {
+ if (msgContentList.segmentStart()) {
+ vertex.open();
+ }
vertex.compute(msgIterator);
+ if (msgContentList.segmentEnd()) {
+ vertex.close();
+ }
vertex.finishCompute();
- } catch (IOException e) {
+ } catch (Exception e) {
throw new HyracksDataException(e);
}
@@ -194,7 +205,6 @@
*/
if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
terminate = false;
-
aggregator.step(vertex);
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index fa7e0a1..eba75c9 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -172,6 +172,10 @@
tbAlive.reset();
vertex = (Vertex) tuple[1];
+ if (vertex.isPartitionTerminated()) {
+ vertex.voteToHalt();
+ return;
+ }
vertex.setOutputWriters(writers);
vertex.setOutputAppenders(appenders);
vertex.setOutputTupleBuilders(tbs);
@@ -184,12 +188,13 @@
}
try {
+ vertex.open();
vertex.compute(msgIterator);
+ vertex.close();
vertex.finishCompute();
- } catch (IOException e) {
+ } catch (Exception e) {
throw new HyracksDataException(e);
}
-
/**
* this partition should not terminate
*/
@@ -200,6 +205,7 @@
* call the global aggregator
*/
aggregator.step(vertex);
+
}
@Override
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index 77f28e4..3d52a45 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -14,22 +14,27 @@
*/
package edu.uci.ics.pregelix.runtime.simpleagg;
+import java.nio.ByteBuffer;
+
import org.apache.commons.lang3.tuple.Pair;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunctionFactory;
-public class AccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+public class AccumulatingAggregatorFactory implements IClusteredAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private IAggregateFunctionFactory[] aggFactories;
@@ -41,52 +46,56 @@
@SuppressWarnings("unchecked")
@Override
public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
- RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException {
+ RecordDescriptor outRecordDescriptor, final int[] groupFields, int[] partialgroupFields,
+ final IFrameWriter writer, final ByteBuffer outputFrame, final FrameTupleAppender appender)
+ throws HyracksDataException {
+ final int frameSize = ctx.getFrameSize();
+ final ArrayTupleBuilder internalTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
return new IAggregatorDescriptor() {
-
private FrameTupleReference ftr = new FrameTupleReference();
+ private int groupKeySize = 0;
+ private int metaSlotSize = 4;
+
+ @Override
+ public AggregateState createAggregateStates() {
+ IAggregateFunction[] agg = new IAggregateFunction[aggFactories.length];
+ ArrayBackedValueStorage[] aggOutput = new ArrayBackedValueStorage[aggFactories.length];
+ for (int i = 0; i < agg.length; i++) {
+ aggOutput[i] = new ArrayBackedValueStorage();
+ try {
+ agg[i] = aggFactories[i].createAggregateFunction(ctx, aggOutput[i], writer);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ return new AggregateState(Pair.of(aggOutput, agg));
+ }
@Override
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
- ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
- IAggregateFunction[] agg = aggState.getRight();
-
- // initialize aggregate functions
- for (int i = 0; i < agg.length; i++) {
- aggOutput[i].reset();
- try {
- agg[i].init();
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
+ setGroupKeySize(accessor, tIndex);
+ initAggregateFunctions(state, true);
+ int stateSize = estimateStep(accessor, tIndex, state);
+ if (stateSize > frameSize) {
+ throw new HyracksDataException(
+ "Message combiner intermediate data size "
+ + stateSize
+ + " is larger than frame size! Check the size estimattion implementation in the message combiner.");
}
-
- ftr.reset(accessor, tIndex);
- for (int i = 0; i < agg.length; i++) {
- try {
- agg[i].step(ftr);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
+ singleStep(accessor, tIndex, state);
}
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
- Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
- IAggregateFunction[] agg = aggState.getRight();
- ftr.reset(accessor, tIndex);
- for (int i = 0; i < agg.length; i++) {
- try {
- agg[i].step(ftr);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
+ int stateSize = estimateStep(accessor, tIndex, state);
+ if (stateSize > frameSize) {
+ emitResultTuple(accessor, tIndex, state);
+ initAggregateFunctions(state, false);
}
+ singleStep(accessor, tIndex, state);
}
@Override
@@ -97,7 +106,7 @@
IAggregateFunction[] agg = aggState.getRight();
for (int i = 0; i < agg.length; i++) {
try {
- agg[i].finish();
+ agg[i].finishAll();
tupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
aggOutput[i].getLength());
} catch (Exception e) {
@@ -107,21 +116,6 @@
}
@Override
- public AggregateState createAggregateStates() {
- IAggregateFunction[] agg = new IAggregateFunction[aggFactories.length];
- ArrayBackedValueStorage[] aggOutput = new ArrayBackedValueStorage[aggFactories.length];
- for (int i = 0; i < agg.length; i++) {
- aggOutput[i] = new ArrayBackedValueStorage();
- try {
- agg[i] = aggFactories[i].createAggregateFunction(ctx, aggOutput[i]);
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- return new AggregateState(Pair.of(aggOutput, agg));
- }
-
- @Override
public void reset() {
}
@@ -137,6 +131,97 @@
}
+ private void initAggregateFunctions(AggregateState state, boolean all) throws HyracksDataException {
+ Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+ ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+ IAggregateFunction[] agg = aggState.getRight();
+
+ /**
+ * initialize aggregate functions
+ */
+ for (int i = 0; i < agg.length; i++) {
+ aggOutput[i].reset();
+ try {
+ if (all) {
+ agg[i].initAll();
+ } else {
+ agg[i].init();
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ private void singleStep(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+ IAggregateFunction[] agg = aggState.getRight();
+ ftr.reset(accessor, tIndex);
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ agg[i].step(ftr);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ private int estimateStep(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ int size = metaSlotSize + groupKeySize;
+ Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+ IAggregateFunction[] agg = aggState.getRight();
+ ftr.reset(accessor, tIndex);
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ size += agg[i].estimateStep(ftr) + metaSlotSize;
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ return size;
+ }
+
+ private void emitResultTuple(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ internalTupleBuilder.reset();
+ for (int j = 0; j < groupFields.length; j++) {
+ internalTupleBuilder.addField(accessor, tIndex, groupFields[j]);
+ }
+ Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+ ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
+ IAggregateFunction[] agg = aggState.getRight();
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ agg[i].finish();
+ internalTupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
+ aggOutput[i].getLength());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ if (!appender.appendSkipEmptyField(internalTupleBuilder.getFieldEndOffsets(),
+ internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputFrame, writer);
+ appender.reset(outputFrame, true);
+ if (!appender.appendSkipEmptyField(internalTupleBuilder.getFieldEndOffsets(),
+ internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+ throw new HyracksDataException("The output cannot be fit into a frame.");
+ }
+ }
+ }
+
+ public void setGroupKeySize(IFrameTupleAccessor accessor, int tIndex) {
+ groupKeySize = 0;
+ for (int i = 0; i < groupFields.length; i++) {
+ int fIndex = groupFields[i];
+ int fStartOffset = accessor.getFieldStartOffset(tIndex, fIndex);
+ int fLen = accessor.getFieldEndOffset(tIndex, fIndex) - fStartOffset;
+ groupKeySize += fLen + metaSlotSize;
+ }
+ }
+
};
}
}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
index 8090dff..5bc30a2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
@@ -33,6 +34,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
@@ -54,10 +56,11 @@
private MsgList msgList = new MsgList();
private boolean keyRead = false;
- public AggregationFunction(IHyracksTaskContext ctx, IConfigurationFactory confFactory, DataOutput output,
- boolean isFinalStage, boolean partialAggAsInput) throws HyracksDataException {
+ public AggregationFunction(IHyracksTaskContext ctx, IConfigurationFactory confFactory, DataOutput tmpOutput,
+ IFrameWriter groupByOutputWriter, boolean isFinalStage, boolean partialAggAsInput)
+ throws HyracksDataException {
this.conf = confFactory.createConfiguration(ctx);
- this.output = output;
+ this.output = tmpOutput;
this.isFinalStage = isFinalStage;
this.partialAggAsInput = partialAggAsInput;
msgList.setConf(this.conf);
@@ -68,6 +71,12 @@
}
@Override
+ public void initAll() throws HyracksDataException {
+ keyRead = false;
+ combiner.initAll(msgList);
+ }
+
+ @Override
public void init() throws HyracksDataException {
keyRead = false;
combiner.init(msgList);
@@ -75,6 +84,43 @@
@Override
public void step(IFrameTupleReference tuple) throws HyracksDataException {
+ if (!partialAggAsInput) {
+ combiner.stepPartial(key, (WritableSizable) value);
+ } else {
+ combiner.stepFinal(key, value);
+ }
+ }
+
+ @Override
+ public void finish() throws HyracksDataException {
+ try {
+ if (!isFinalStage) {
+ combinedResult = combiner.finishPartial();
+ } else {
+ combinedResult = combiner.finishFinal();
+ }
+ combinedResult.write(output);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void finishAll() throws HyracksDataException {
+ try {
+ if (!isFinalStage) {
+ combinedResult = combiner.finishPartial();
+ } else {
+ combinedResult = combiner.finishFinalAll();
+ }
+ combinedResult.write(output);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public int estimateStep(IFrameTupleReference tuple) throws HyracksDataException {
FrameTupleReference ftr = (FrameTupleReference) tuple;
IFrameTupleAccessor fta = ftr.getFrameTupleAccessor();
ByteBuffer buffer = fta.getBuffer();
@@ -94,28 +140,13 @@
}
value.readFields(valueInput);
if (!partialAggAsInput) {
- combiner.stepPartial(key, value);
+ return combiner.estimateAccumulatedStateByteSizePartial(key, (WritableSizable) value);
} else {
- combiner.stepFinal(key, value);
+ return combiner.estimateAccumulatedStateByteSizeFinal(key, value);
}
} catch (IOException e) {
throw new HyracksDataException(e);
}
-
- }
-
- @Override
- public void finish() throws HyracksDataException {
- try {
- if (!isFinalStage) {
- combinedResult = combiner.finishPartial();
- } else {
- combinedResult = combiner.finishFinal();
- }
- combinedResult.write(output);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
}
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
index 33dfa5d..54eccf5 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
@@ -17,6 +17,7 @@
import java.io.DataOutput;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
@@ -37,9 +38,9 @@
}
@Override
- public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IDataOutputProvider provider)
- throws HyracksException {
+ public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IDataOutputProvider provider,
+ IFrameWriter writer) throws HyracksException {
DataOutput output = provider.getDataOutput();
- return new AggregationFunction(ctx, confFactory, output, isFinalStage, partialAggAsInput);
+ return new AggregationFunction(ctx, confFactory, output, writer, isFinalStage, partialAggAsInput);
}
}