merge aggregater completes
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
index 0371348..2cad0ca 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -17,11 +17,18 @@
import java.io.DataOutput;
import java.io.IOException;
+import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.hsqldb.lib.Iterator;
import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -35,7 +42,15 @@
public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(MergeKmerAggregateFactory.class);
-
+
+ private final int readLength;
+ private final int kmerSize;
+
+ public MergeKmerAggregateFactory(int readlength, int k) {
+ this.readLength = readlength;
+ this.kmerSize = k;
+ }
+
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
@@ -43,23 +58,34 @@
final int frameSize = ctx.getFrameSize();
return new IAggregatorDescriptor() {
- private PositionReference position = new PositionReference();
-
+// private PositionReference position = new PositionReference();
+
+ private NodeWritable readNode = new NodeWritable(kmerSize);
+ private HashSet set = new HashSet();
+ private PositionListWritable uniNodeIdList = new PositionListWritable();
+ private KmerListWritable uniEdgeList = new KmerListWritable(kmerSize);
+ private KmerBytesWritable tempKmer = new KmerBytesWritable(kmerSize);
+ private PositionWritable tempPos = new PositionWritable();
+
@Override
public AggregateState createAggregateStates() {
- return new AggregateState(new ArrayBackedValueStorage());
+ return new AggregateState(new NodeWritable());
}
@Override
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
- inputVal.reset();
+ NodeWritable localUniNode = (NodeWritable) state.state;
+ localUniNode.reset(kmerSize);
int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
1); offset += PositionReference.LENGTH) {
- position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
- inputVal.append(position);
+ readNode.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
+ localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
+ localUniNode.getFFList().appendList(readNode.getFFList());
+ localUniNode.getFRList().appendList(readNode.getFRList());
+ localUniNode.getRFList().appendList(readNode.getRFList());
+ localUniNode.getRRList().appendList(readNode.getRRList());
}
//make a fake feild to cheat caller
tupleBuilder.addFieldEndOffset();
@@ -73,12 +99,16 @@
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
- ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ NodeWritable localUniNode = (NodeWritable) state.state;
int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
1); offset += PositionReference.LENGTH) {
position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
- inputVal.append(position);
+ localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
+ localUniNode.getFFList().appendList(readNode.getFFList());
+ localUniNode.getFRList().appendList(readNode.getFRList());
+ localUniNode.getRFList().appendList(readNode.getRFList());
+ localUniNode.getRRList().appendList(readNode.getRRList());
}
}
@@ -88,16 +118,61 @@
throw new IllegalStateException("partial result method should not be called");
}
+ @SuppressWarnings("unchecked")
@Override
public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
DataOutput fieldOutput = tupleBuilder.getDataOutput();
- ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ NodeWritable localUniNode = (NodeWritable) state.state;
+ uniNodeIdList.reset();
+ for(java.util.Iterator<PositionWritable> iter = localUniNode.getNodeIdList().iterator(); iter.hasNext();){
+ tempPos.set(iter.next());
+ if(set.add(tempPos))
+ uniNodeIdList.append(tempPos);
+ }
+ localUniNode.getNodeIdList().reset();
+ localUniNode.getNodeIdList().set(uniNodeIdList);
+ uniEdgeList.reset();
+ for(java.util.Iterator<KmerBytesWritable> iter = localUniNode.getFFList().iterator(); iter.hasNext();){
+ tempKmer.set(iter.next());
+ if(set.add(tempKmer))
+ uniEdgeList.append(tempKmer);
+ }
+ localUniNode.getFFList().reset();
+ localUniNode.getFFList().set(uniEdgeList);
+
+ uniEdgeList.reset();
+ for(java.util.Iterator<KmerBytesWritable> iter = localUniNode.getFRList().iterator(); iter.hasNext();){
+ tempKmer.set(iter.next());
+ if(set.add(tempKmer))
+ uniEdgeList.append(tempKmer);
+ }
+ localUniNode.getFRList().reset();
+ localUniNode.getFRList().set(uniEdgeList);
+
+ uniEdgeList.reset();
+ for(java.util.Iterator<KmerBytesWritable> iter = localUniNode.getRFList().iterator(); iter.hasNext();){
+ tempKmer.set(iter.next());
+ if(set.add(tempKmer))
+ uniEdgeList.append(tempKmer);
+ }
+ localUniNode.getRFList().reset();
+ localUniNode.getRFList().set(uniEdgeList);
+
+ uniEdgeList.reset();
+ for(java.util.Iterator<KmerBytesWritable> iter = localUniNode.getRRList().iterator(); iter.hasNext();){
+ tempKmer.set(iter.next());
+ if(set.add(tempKmer))
+ uniEdgeList.append(tempKmer);
+ }
+ localUniNode.getRRList().reset();
+ localUniNode.getRRList().set(uniEdgeList);
+
try {
- if (inputVal.getLength() > frameSize / 2) {
+ if (localUniNode.getLength() > frameSize / 2) {
LOG.warn("MergeKmer: output data kmerByteSize is too big: " + inputVal.getLength());
}
- fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+ fieldOutput.write(localUniNode.getByteArray(), localUniNode.getStartOffset(), localUniNode.getLength());
tupleBuilder.addFieldEndOffset();
} catch (IOException e) {