rewrite the DistributeAggeregator to not inherite from local
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3277 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
index 04349b0..62680ed 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -1,9 +1,15 @@
package edu.uci.ics.genomix.dataflow.aggregators;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
@@ -18,12 +24,109 @@
public DistributedMergeLmerAggregateFactory() {
}
- public class DistributeAggregatorDescriptor extends
- LocalAggregatorDescriptor {
+ public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {
+ private static final int MAX = 127;
+
@Override
- protected byte getCount(IFrameTupleAccessor accessor, int tIndex) {
- return super.getField(accessor, tIndex, 2);
+ public void reset() {
}
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState(new Object() {
+ });
+ }
+
+ protected byte getField(IFrameTupleAccessor accessor, int tIndex,
+ int fieldId) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+ int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+ byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer()
+ .array(), offset);
+ return data;
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ byte count = getField(accessor, tIndex, 2);
+
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when initializing the aggregator.");
+ }
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+ IFrameTupleAccessor stateAccessor, int stateTupleIndex,
+ AggregateState state) throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ short count = getField(accessor, tIndex, 2);
+
+ int statetupleOffset = stateAccessor
+ .getTupleStartOffset(stateTupleIndex);
+ int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex,
+ 1);
+ int countfieldStart = stateAccessor.getFieldStartOffset(
+ stateTupleIndex, 2);
+ int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength()
+ + bitfieldStart;
+ int countoffset = statetupleOffset
+ + stateAccessor.getFieldSlotsLength() + countfieldStart;
+
+ byte[] data = stateAccessor.getBuffer().array();
+
+ bitmap |= data[bitoffset];
+ count += data[countoffset];
+ if (count >= MAX) {
+ count = (byte) MAX;
+ }
+ data[bitoffset] = bitmap;
+ data[countoffset] = (byte) count;
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ byte count = getField(accessor, tIndex, 2);
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when writing aggregation to the output buffer.");
+ }
+
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ outputPartialResult(tupleBuilder, accessor, tIndex, state);
+ }
+
}
@Override
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
index a592cc2..330f950 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
@@ -39,16 +39,12 @@
return data;
}
- protected byte getCount(IFrameTupleAccessor accessor, int tIndex) {
- return 1;
- }
-
@Override
public void init(ArrayTupleBuilder tupleBuilder,
IFrameTupleAccessor accessor, int tIndex, AggregateState state)
throws HyracksDataException {
byte bitmap = getField(accessor, tIndex, 1);
- byte count = getCount(accessor, tIndex);
+ byte count = 1;
DataOutput fieldOutput = tupleBuilder.getDataOutput();
try {
@@ -67,7 +63,7 @@
IFrameTupleAccessor stateAccessor, int stateTupleIndex,
AggregateState state) throws HyracksDataException {
byte bitmap = getField(accessor, tIndex, 1);
- short count = getCount(accessor, tIndex);
+ short count = 1;
int statetupleOffset = stateAccessor
.getTupleStartOffset(stateTupleIndex);