local aggregator completed
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
index b019409..fc968fd 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -19,6 +19,7 @@
import java.io.IOException;
import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -31,18 +32,28 @@
public class AggregateKmerAggregateFactory implements IAggregatorDescriptorFactory {
+
/**
*
*/
private static final long serialVersionUID = 1L;
+ private final int readLength;
+ private final int kmerSize;
+
+ public AggregateKmerAggregateFactory(int readlength, int k) {
+ this.readLength = readlength;
+ this.kmerSize = k;
+ }
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
throws HyracksDataException {
return new IAggregatorDescriptor() {
- private PositionReference position = new PositionReference();
-
+// private PositionReference position = new PositionReference();
+
+ private NodeWritable readNode = new NodeWritable(kmerSize);
+
protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
@@ -61,27 +72,36 @@
@Override
public AggregateState createAggregateStates() {
- return new AggregateState(new ArrayBackedValueStorage());
+ return new AggregateState(new NodeWritable());
}
@Override
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
- inputVal.reset();
- position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
- inputVal.append(position);
+ NodeWritable localUniNode = (NodeWritable) state.state;
+ localUniNode.reset(kmerSize);
+ readNode.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));//????
+ localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
+ localUniNode.getFFList().appendList(readNode.getFFList());
+ localUniNode.getFRList().appendList(readNode.getFRList());
+ localUniNode.getRFList().appendList(readNode.getRFList());
+ localUniNode.getRRList().appendList(readNode.getRRList());
+// inputVal.append(position);
// make an empty field
- tupleBuilder.addFieldEndOffset();
+ tupleBuilder.addFieldEndOffset();///???
}
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
- ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
- position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
- inputVal.append(position);
+ NodeWritable localUniNode = (NodeWritable) state.state;
+ readNode.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));//????
+ localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
+ localUniNode.getFFList().appendList(readNode.getFFList());
+ localUniNode.getFRList().appendList(readNode.getFRList());
+ localUniNode.getRFList().appendList(readNode.getRFList());
+ localUniNode.getRRList().appendList(readNode.getRRList());
}
@Override
@@ -94,9 +114,9 @@
public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
DataOutput fieldOutput = tupleBuilder.getDataOutput();
- ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ NodeWritable localUniNode = (ArrayBackedValueSNodeWritabletorage) state.state;
try {
- fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+ fieldOutput.write(localUniNode.getByteArray(), localUniNode.getStartOffset(), localUniNode.getLength());
tupleBuilder.addFieldEndOffset();
} catch (IOException e) {