merge aggregater completes
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
index 0371348..2cad0ca 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -17,11 +17,18 @@
 
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.HashSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.hsqldb.lib.Iterator;
 
 import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -35,7 +42,15 @@
 public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
     private static final long serialVersionUID = 1L;
     private static final Log LOG = LogFactory.getLog(MergeKmerAggregateFactory.class);
-
+    
+    private final int readLength;
+    private final int kmerSize;
+    
+    public MergeKmerAggregateFactory(int readlength, int k) {
+        this.readLength = readlength;
+        this.kmerSize = k;
+    }
+    
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
@@ -43,23 +58,34 @@
         final int frameSize = ctx.getFrameSize();
         return new IAggregatorDescriptor() {
 
-            private PositionReference position = new PositionReference();
-
+//            private PositionReference position = new PositionReference();
+            
+            private NodeWritable readNode = new NodeWritable(kmerSize);
+            private HashSet set = new HashSet();
+            private PositionListWritable uniNodeIdList = new PositionListWritable();
+            private KmerListWritable uniEdgeList = new KmerListWritable(kmerSize);
+            private KmerBytesWritable tempKmer = new KmerBytesWritable(kmerSize);
+            private PositionWritable tempPos = new PositionWritable();
+            
             @Override
             public AggregateState createAggregateStates() {
-                return new AggregateState(new ArrayBackedValueStorage());
+                return new AggregateState(new NodeWritable());
             }
 
             @Override
             public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
-                inputVal.reset();
+                NodeWritable localUniNode = (NodeWritable) state.state;
+                localUniNode.reset(kmerSize);
                 int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
                 for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
                         1); offset += PositionReference.LENGTH) {
-                    position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
-                    inputVal.append(position);
+                    readNode.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
+                    localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
+                    localUniNode.getFFList().appendList(readNode.getFFList());
+                    localUniNode.getFRList().appendList(readNode.getFRList());
+                    localUniNode.getRFList().appendList(readNode.getRFList());
+                    localUniNode.getRRList().appendList(readNode.getRRList());
                 }
                 //make a fake feild to cheat caller
                 tupleBuilder.addFieldEndOffset();
@@ -73,12 +99,16 @@
             @Override
             public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
                     int stateTupleIndex, AggregateState state) throws HyracksDataException {
-                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                NodeWritable localUniNode = (NodeWritable) state.state;
                 int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
                 for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
                         1); offset += PositionReference.LENGTH) {
                     position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
-                    inputVal.append(position);
+                    localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
+                    localUniNode.getFFList().appendList(readNode.getFFList());
+                    localUniNode.getFRList().appendList(readNode.getFRList());
+                    localUniNode.getRFList().appendList(readNode.getRFList());
+                    localUniNode.getRRList().appendList(readNode.getRRList());
                 }
             }
 
@@ -88,16 +118,61 @@
                 throw new IllegalStateException("partial result method should not be called");
             }
 
+            @SuppressWarnings("unchecked")
             @Override
             public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 DataOutput fieldOutput = tupleBuilder.getDataOutput();
-                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                NodeWritable localUniNode = (NodeWritable) state.state;
+                uniNodeIdList.reset();
+                for(java.util.Iterator<PositionWritable> iter = localUniNode.getNodeIdList().iterator(); iter.hasNext();){
+                    tempPos.set(iter.next());
+                    if(set.add(tempPos))
+                        uniNodeIdList.append(tempPos);
+                }
+                localUniNode.getNodeIdList().reset();
+                localUniNode.getNodeIdList().set(uniNodeIdList);
+                uniEdgeList.reset();
+                for(java.util.Iterator<KmerBytesWritable> iter = localUniNode.getFFList().iterator(); iter.hasNext();){
+                    tempKmer.set(iter.next());
+                    if(set.add(tempKmer))
+                        uniEdgeList.append(tempKmer);
+                }
+                localUniNode.getFFList().reset();
+                localUniNode.getFFList().set(uniEdgeList);
+                
+                uniEdgeList.reset();
+                for(java.util.Iterator<KmerBytesWritable> iter = localUniNode.getFRList().iterator(); iter.hasNext();){
+                    tempKmer.set(iter.next());
+                    if(set.add(tempKmer))
+                        uniEdgeList.append(tempKmer);
+                }
+                localUniNode.getFRList().reset();
+                localUniNode.getFRList().set(uniEdgeList);
+                
+                uniEdgeList.reset();
+                for(java.util.Iterator<KmerBytesWritable> iter = localUniNode.getRFList().iterator(); iter.hasNext();){
+                    tempKmer.set(iter.next());
+                    if(set.add(tempKmer))
+                        uniEdgeList.append(tempKmer);
+                }
+                localUniNode.getRFList().reset();
+                localUniNode.getRFList().set(uniEdgeList);
+                
+                uniEdgeList.reset();
+                for(java.util.Iterator<KmerBytesWritable> iter = localUniNode.getRRList().iterator(); iter.hasNext();){
+                    tempKmer.set(iter.next());
+                    if(set.add(tempKmer))
+                        uniEdgeList.append(tempKmer);
+                }
+                localUniNode.getRRList().reset();
+                localUniNode.getRRList().set(uniEdgeList);
+                
                 try {
-                    if (inputVal.getLength() > frameSize / 2) {
+                    if (localUniNode.getLength() > frameSize / 2) {
                         LOG.warn("MergeKmer: output data kmerByteSize is too big: " + inputVal.getLength());
                     }
-                    fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+                    fieldOutput.write(localUniNode.getByteArray(), localUniNode.getStartOffset(), localUniNode.getLength());
                     tupleBuilder.addFieldEndOffset();
 
                 } catch (IOException e) {