rewrite the DistributeAggeregator to not inherite from local

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3277 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
index 04349b0..62680ed 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -1,9 +1,15 @@
 package edu.uci.ics.genomix.dataflow.aggregators;

 

+import java.io.DataOutput;

+import java.io.IOException;

+

+import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;

 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;

 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

 

@@ -18,12 +24,109 @@
 	public DistributedMergeLmerAggregateFactory() {

 	}

 

-	public class DistributeAggregatorDescriptor extends

-			LocalAggregatorDescriptor {

+	public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {

+		private static final int MAX = 127;

+

 		@Override

-		protected byte getCount(IFrameTupleAccessor accessor, int tIndex) {

-			return super.getField(accessor, tIndex, 2);

+		public void reset() {

 		}

+

+		@Override

+		public void close() {

+			// TODO Auto-generated method stub

+

+		}

+

+		@Override

+		public AggregateState createAggregateStates() {

+			return new AggregateState(new Object() {

+			});

+		}

+

+		protected byte getField(IFrameTupleAccessor accessor, int tIndex,

+				int fieldId) {

+			int tupleOffset = accessor.getTupleStartOffset(tIndex);

+			int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);

+			int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();

+			byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer()

+					.array(), offset);

+			return data;

+		}

+		

+		@Override

+		public void init(ArrayTupleBuilder tupleBuilder,

+				IFrameTupleAccessor accessor, int tIndex, AggregateState state)

+				throws HyracksDataException {

+			byte bitmap = getField(accessor, tIndex, 1);

+			byte count = getField(accessor, tIndex, 2);

+

+			DataOutput fieldOutput = tupleBuilder.getDataOutput();

+			try {

+				fieldOutput.writeByte(bitmap);

+				tupleBuilder.addFieldEndOffset();

+				fieldOutput.writeByte(count);

+				tupleBuilder.addFieldEndOffset();

+			} catch (IOException e) {

+				throw new HyracksDataException(

+						"I/O exception when initializing the aggregator.");

+			}

+		}

+

+		@Override

+		public void aggregate(IFrameTupleAccessor accessor, int tIndex,

+				IFrameTupleAccessor stateAccessor, int stateTupleIndex,

+				AggregateState state) throws HyracksDataException {

+			byte bitmap = getField(accessor, tIndex, 1);

+			short count = getField(accessor, tIndex, 2);

+

+			int statetupleOffset = stateAccessor

+					.getTupleStartOffset(stateTupleIndex);

+			int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex,

+					1);

+			int countfieldStart = stateAccessor.getFieldStartOffset(

+					stateTupleIndex, 2);

+			int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength()

+					+ bitfieldStart;

+			int countoffset = statetupleOffset

+					+ stateAccessor.getFieldSlotsLength() + countfieldStart;

+

+			byte[] data = stateAccessor.getBuffer().array();

+

+			bitmap |= data[bitoffset];

+			count += data[countoffset];

+			if (count >= MAX) {

+				count = (byte) MAX;

+			}

+			data[bitoffset] = bitmap;

+			data[countoffset] = (byte) count;

+		}

+

+		@Override

+		public void outputPartialResult(ArrayTupleBuilder tupleBuilder,

+				IFrameTupleAccessor accessor, int tIndex, AggregateState state)

+				throws HyracksDataException {

+			byte bitmap = getField(accessor, tIndex, 1);

+			byte count = getField(accessor, tIndex, 2);

+			DataOutput fieldOutput = tupleBuilder.getDataOutput();

+			try {

+				fieldOutput.writeByte(bitmap);

+				tupleBuilder.addFieldEndOffset();

+				fieldOutput.writeByte(count);

+				tupleBuilder.addFieldEndOffset();

+			} catch (IOException e) {

+				throw new HyracksDataException(

+						"I/O exception when writing aggregation to the output buffer.");

+			}

+

+		}

+

+		@Override

+		public void outputFinalResult(ArrayTupleBuilder tupleBuilder,

+				IFrameTupleAccessor accessor, int tIndex, AggregateState state)

+				throws HyracksDataException {

+			outputPartialResult(tupleBuilder, accessor, tIndex, state);

+		}

+		

 	}

 

 	@Override

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
index a592cc2..330f950 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
@@ -39,16 +39,12 @@
 		return data;
 	}
 
-	protected byte getCount(IFrameTupleAccessor accessor, int tIndex) {
-		return 1;
-	}
-
 	@Override
 	public void init(ArrayTupleBuilder tupleBuilder,
 			IFrameTupleAccessor accessor, int tIndex, AggregateState state)
 			throws HyracksDataException {
 		byte bitmap = getField(accessor, tIndex, 1);
-		byte count = getCount(accessor, tIndex);
+		byte count = 1;
 
 		DataOutput fieldOutput = tupleBuilder.getDataOutput();
 		try {
@@ -67,7 +63,7 @@
 			IFrameTupleAccessor stateAccessor, int stateTupleIndex,
 			AggregateState state) throws HyracksDataException {
 		byte bitmap = getField(accessor, tIndex, 1);
-		short count = getCount(accessor, tIndex);
+		short count = 1;
 
 		int statetupleOffset = stateAccessor
 				.getTupleStartOffset(stateTupleIndex);