git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2825 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
index 542bd20..1a3b927 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -3,7 +3,6 @@
 import java.io.BufferedReader;

 import java.io.File;

 import java.io.FileInputStream;

-import java.io.InputStream;

 import java.io.InputStreamReader;

 import java.nio.ByteBuffer;

 

@@ -17,7 +16,6 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;

 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;

-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;

 

@@ -25,6 +23,7 @@
 

     private static final long serialVersionUID = 1L;

     private int k;

+    private int byteNum;

     private String filename;

 

     public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k, String filename) {

@@ -32,10 +31,12 @@
         // TODO Auto-generated constructor stub

         this.k = k;

         this.filename = filename;

+        

+        byteNum = (byte)Math.ceil((double)k/4.0);

         //recordDescriptors[0] = news RecordDescriptor(

         //		new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });

         recordDescriptors[0] = new RecordDescriptor(new ISerializerDeserializer[] {

-                Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });

+                null, null});

     }

 

     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,

@@ -48,17 +49,11 @@
             private ArrayTupleBuilder tupleBuilder;

             private ByteBuffer outputBuffer;

             private FrameTupleAppender outputAppender;

-            private long window;

 

-            @Override

+            @SuppressWarnings("resource")

+			@Override

             public void initialize() {

 

-                window = 0;

-                for (int i = 0; i < k; i++) {

-                    window <<= 2;

-                    window |= 3;

-                }

-

                 tupleBuilder = new ArrayTupleBuilder(2);

                 outputBuffer = ctx.allocateFrame();

                 outputAppender = new FrameTupleAppender(ctx.getFrameSize());

@@ -98,9 +93,16 @@
                 }

             }

 

-            private long CompressKmer(byte[] array, int start) {

+            private byte[] CompressKmer(byte[] array, int start) {

                 // a: 00; c: 01; G: 10; T: 11

-                long l = 0;

+            	

+            	byte[] bytes = new byte[byteNum+1];

+            	bytes[0] = (byte) k;

+            	

+            	byte l = 0;

+            	int count = 0;

+            	int bcount = 0;

+            	

                 for (int i = start; i < start + k; i++) {

                     l <<= 2;

                     switch (array[i]) {

@@ -121,8 +123,15 @@
                             l |= 3;

                             break;

                     }

+                    count += 2;

+                    if(count%8==0){

+                    	bcount += 1;

+                    	bytes[bcount] = l;

+                    	count = 0;

+                    }

                 }

-                return l;

+                bytes[bcount + 1] = l;

+                return bytes;

             }

 

             private byte GetBitmap(byte t) {

@@ -170,21 +179,48 @@
                 }

                 return r;

             }

+            

+            void MoveKmer(byte[] bytes, byte c){            	

+            	byte filter0 = (byte) 0xC0;

+            	byte filter1 = (byte) 0xFC;

+            	byte filter2 = 0;

+            	

+            	int r = byteNum*8 - 2*k;

+            	r = 8 - r;

+            	for(int i = 0 ; i < r ; i++){

+            		filter2 <<= 1;

+            		filter2 |= 1;

+            	}

+

+                int i = byteNum;

+                bytes[i] <<= 2;

+                bytes[i] &= filter2;

+                i -= 1;

+            	while(i > 0){

+            		byte f = (byte) (bytes[i] & filter0);

+            		f >>= 6;

+                	bytes[i+1] |= f;

+            		bytes[i] <<= 2;

+            		bytes[i] &= filter1;

+            	}

+            	bytes[i+1] |= ConvertSymbol(c);

+            }

 

             private void SplitReads(byte[] array) {

                 try {

-                    long l = 0;

-

-                    byte pre = 0, next = 0;

+                	byte[] bytes=null;

+                	

+                	byte pre = 0, next = 0;

                     byte r;

 

                     for (int i = 0; i < array.length - k + 1; i++) {

                         if (0 == i) {

-                            l = CompressKmer(array, i);

+                            bytes = CompressKmer(array, i);

                         } else {

-                            l <<= 2;

+                        	MoveKmer(bytes, array[i + k - 1]);

+                            /*l <<= 2;

                             l &= window;

-                            l |= ConvertSymbol(array[i + k - 1]);

+                            l |= ConvertSymbol(array[i + k - 1]);*/

                             pre = GetBitmap(array[i - 1]);

                         }

                         if (i + k != array.length) {

@@ -203,8 +239,14 @@
 

                         tupleBuilder.reset();

 

-                        tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE, l);

+                        //tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE, l);

+                        tupleBuilder.addField(bytes, 0, byteNum + 1);

                         tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, r);

+                        

+                        

+                        //int[] a = tupleBuilder.getFieldEndOffsets();

+                        //int b = tupleBuilder.getSize();

+                        //byte[] c = tupleBuilder.getByteArray();

 

                         if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,

                                 tupleBuilder.getSize())) {

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
index da85170..fb0fc18 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
@@ -5,25 +5,41 @@
 import java.io.FileOutputStream;

 import java.io.IOException;

 import java.io.OutputStreamWriter;

+import java.nio.ByteBuffer;

+

 

 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

-import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;

 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;

 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.api.exceptions.HyracksException;

 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;

+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;

 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;

-import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;

-import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;

-import edu.uci.ics.hyracks.dataflow.std.util.StringSerializationUtils;

+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;

 

 public class PrinterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {

+ 

     private static final long serialVersionUID = 1L;

     private String filename;

     private boolean writeFile;

-    private BufferedWriter writer;

+    private BufferedWriter twriter;

     private FileOutputStream stream;

 

+

+    /**

+     * The constructor of HDFSWriteOperatorDescriptor.

+     * 

+     * @param spec

+     *            the JobSpecification object

+     * @param conf

+     *            the Hadoop JobConf which contains the output path

+     * @param tupleWriterFactory

+     *            the ITupleWriterFactory implementation object

+     * @throws HyracksException

+     */

     public PrinterOperatorDescriptor(IOperatorDescriptorRegistry spec) {

         super(spec, 1, 0);

         writeFile = false;

@@ -35,79 +51,118 @@
         writeFile = true;

     }

 

-    private class PrinterOperator implements IOpenableDataWriterOperator {

-        

-    	private int partition;

-    	public PrinterOperator(int partition){

-    		this.partition = partition;

-    	}

-    	

-    	@Override

-        public void open() throws HyracksDataException {

-        	if( true == writeFile){

-        		try {

-					filename = filename + String.valueOf(partition)+".txt";

-					//System.err.println(filename);

-        			stream = new FileOutputStream(filename);

-				} catch (FileNotFoundException e) {

-					// TODO Auto-generated catch block

-					e.printStackTrace();

-				}

-                writer = new BufferedWriter(new OutputStreamWriter(stream));

-        	}

-        }

-

-        @Override

-        public void close() throws HyracksDataException {

-        	//System.err.println("kick");

-        	if( true == writeFile){

-        		try {

-        			writer.close();

-					stream.close();

-				} catch (IOException e) {

-					// TODO Auto-generated catch block

-					e.printStackTrace();

-				}

-        	}

-        }

-

-        @Override

-        public void fail() throws HyracksDataException {

-        }

-

-        @Override

-        public void writeData(Object[] data) throws HyracksDataException {

-        	try{

-	        	if(true == writeFile){

-		            for (int i = 0; i < data.length; ++i) {

-		            	writer.write(String.valueOf(data[i]));

-		            	writer.write(", ");

-		            	writer.write("\n");

-		            }

-	        	}

-	        	else{

-		            for (int i = 0; i < data.length; ++i) {

-		                System.err.print(StringSerializationUtils.toString(data[i]));

-		                System.err.print(", ");

-		            }

-		            System.err.println();

-	        	}

-        	} catch (IOException e) {

-				// TODO Auto-generated catch block

-				e.printStackTrace();

-			}

-        }

-

-        @Override

-        public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {

-            throw new IllegalArgumentException();

-        }

-    }

-

     @Override

-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,

-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {

-        return new DeserializedOperatorNodePushable(ctx, new PrinterOperator(partition),

-                recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));

+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,

+            final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)

+            throws HyracksDataException {

+

+        return new AbstractUnaryInputSinkOperatorNodePushable() {

+            private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;

+            private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);

+            private FrameTupleReference tuple = new FrameTupleReference();

+

+

+            @Override

+            public void open() throws HyracksDataException {

+            	if( true == writeFile){

+            		try {

+    					filename = filename + String.valueOf(partition)+".txt";

+    					//System.err.println(filename);

+            			stream = new FileOutputStream(filename);

+    				} catch (FileNotFoundException e) {

+    					// TODO Auto-generated catch block

+    					e.printStackTrace();

+    				}

+                    twriter = new BufferedWriter(new OutputStreamWriter(stream));

+            	}

+            }

+

+            

+            private void PrintBytes(int no){

+                try{

+	            	

+	                byte[] bytes = tuple.getFieldData(no);

+	                int offset = tuple.getFieldStart(no);

+	                int length = tuple.getFieldLength(no);

+	                if(true == writeFile){

+	                	for(int j = offset ; j < offset + length ; j++){

+	                	   	twriter.write(String.valueOf((int)bytes[j]));

+	                	   	twriter.write(" ");

+	                	}

+	                	twriter.write("&&");

+	                }

+	                else{

+	                	for(int j = offset ; j < offset + length ;j++){

+	                	   	System.err.print(String.valueOf((int)bytes[j]));

+	                	   	System.err.print(" ");

+	                	}

+	                	System.err.print("&&");

+	                }

+                }

+	            catch(IOException e){

+	            	e.printStackTrace();

+	            }

+            }

+            

+            @Override

+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {

+                try{

+	            	accessor.reset(buffer);

+	                int tupleCount = accessor.getTupleCount();

+	                for (int i = 0; i < tupleCount; i++) {

+	                	tuple.reset(accessor, i);

+	                	int tj = tuple.getFieldCount();

+	                    for(int j = 0 ; j< tj ; j++){

+	                    	PrintBytes(j);

+	                    }

+	                    if(true == writeFile){

+	                    	twriter.write("\n");

+	                    }

+	                    else{

+	                    	System.err.println();

+	                    }

+	                }

+                }

+	            catch(IOException e){

+	            	e.printStackTrace();

+	            }

+            }

+

+            @Override

+            public void fail() throws HyracksDataException {

+

+            }

+

+            @Override

+            public void close() throws HyracksDataException {

+               	if( true == writeFile){

+            		try {

+            			twriter.close();

+    					stream.close();

+    				} catch (IOException e) {

+    					// TODO Auto-generated catch block

+    					e.printStackTrace();

+    				}

+            	}

+            }

+

+        };

     }

-}
\ No newline at end of file
+}

+	

+	

+	

+	

+	

+	

+	

+	

+	

+	

+	

+	

+	

+	

+	

+	

+	
\ No newline at end of file
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index efeb7ae..06bde29 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -1,21 +1,15 @@
 package edu.uci.ics.genomix.dataflow;

 

-import java.io.BufferedReader;

-import java.io.BufferedWriter;

 import java.io.File;

-import java.io.FileInputStream;

-import java.io.FileOutputStream;

-import java.io.InputStream;

-import java.io.InputStreamReader;

-import java.io.OutputStreamWriter;

 import java.util.logging.Level;

 import java.util.logging.Logger;

 

-import edu.uci.ics.genomix.data.normalizers.Integer64NormalizedKeyComputerFactory;

+import edu.uci.ics.genomix.data.normalizers.VLongNormalizedKeyComputerFactory;

 import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;

 import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

 import edu.uci.ics.genomix.data.std.accessors.LongBinaryHashFunctionFamily;

-import edu.uci.ics.genomix.data.std.accessors.MurmurHash3BinaryHashFunctionFamily;

+import edu.uci.ics.genomix.data.std.accessors.VLongBinaryHashFunctionFamily;

+import edu.uci.ics.genomix.data.std.primitive.VLongPointable;

 import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;

 import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;

 import edu.uci.ics.hyracks.api.client.HyracksConnection;

@@ -28,15 +22,11 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;

 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;

 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;

 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;

-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;

 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;

-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;

 import edu.uci.ics.hyracks.api.job.JobId;

 import edu.uci.ics.hyracks.api.job.JobSpecification;

 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;

@@ -45,18 +35,12 @@
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;

 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;

 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;

-import edu.uci.ics.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;

-import edu.uci.ics.hyracks.data.std.primitive.LongPointable;

-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;

-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;

 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;

-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

 import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;

@@ -140,6 +124,7 @@
         ccConfig.ccRoot = ccRoot.getAbsolutePath();

         cc = new ClusterControllerService(ccConfig);

         cc.start();

+        ccConfig.defaultMaxJobAttempts = 0;

 

         NCConfig ncConfig1 = new NCConfig();

         ncConfig1.ccHost = "localhost";

@@ -187,31 +172,31 @@
     private static JobSpecification createJob(String filename, int k, int page_num, int type) throws HyracksDataException {

         JobSpecification spec = new JobSpecification();

 

-        spec.setFrameSize(32768);

+        //spec.setFrameSize(32768);

+        spec.setFrameSize(64);

 

         FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);

         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

         //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID);

 

-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {

-                Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,

-                ByteSerializerDeserializer.INSTANCE });

+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {null, ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE});

+                //Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,

+                //ByteSerializerDeserializer.INSTANCE });

 

-        int[] keyFields = new int[] { 0 };

+       int[] keyFields = new int[] { 0 };

         int frameLimits = 4096;

         int tableSize = 10485767;

 

         AbstractOperatorDescriptor single_grouper;

         IConnectorDescriptor conn_partition;

         AbstractOperatorDescriptor cross_grouper;

-        

 

         

         if(0 == type){//external group by

             single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

                     frameLimits,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

-                    new Integer64NormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

+                    new VLongNormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),

                     // new IntSumFieldAggregatorFactory(1, false) }),

 

                     new DistributedMergeLmerAggregateFactory(),

@@ -219,14 +204,14 @@
                     outputRec, new HashSpillableTableFactory(

                             new FieldHashPartitionComputerFactory(keyFields,

                                     new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-                                            .of(LongPointable.FACTORY) }), tableSize), true);

+                                            .of(VLongPointable.FACTORY) }), tableSize), true);

         

             conn_partition = new MToNPartitioningConnectorDescriptor(spec,

                     new KmerHashPartitioncomputerFactory());

             cross_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

                     frameLimits,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

-                    new Integer64NormalizedKeyComputerFactory(), new DistributedMergeLmerAggregateFactory(),

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

+                    new VLongNormalizedKeyComputerFactory(), new DistributedMergeLmerAggregateFactory(),

                     // new IntSumFieldAggregatorFactory(1, false) }),

 

                     new DistributedMergeLmerAggregateFactory(),

@@ -234,39 +219,39 @@
                     outputRec, new HashSpillableTableFactory(

                             new FieldHashPartitionComputerFactory(keyFields,

                                     new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-                                            .of(LongPointable.FACTORY) }), tableSize), true);

+                                            .of(VLongPointable.FACTORY) }), tableSize), true);

         }

         else if( 1 == type){

             single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

                     frameLimits,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

-                    new Integer64NormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

+                    new VLongNormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),

                     // new IntSumFieldAggregatorFactory(1, false) }),

                     new DistributedMergeLmerAggregateFactory(),

                     // new IntSumFieldAggregatorFactory(1, false) }),

                     outputRec, new HashSpillableTableFactory(

                             new FieldHashPartitionComputerFactory(keyFields,

                                     new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-                                            .of(LongPointable.FACTORY) }), tableSize), true);

+                                            .of(VLongPointable.FACTORY) }), tableSize), true);

             conn_partition = new  MToNPartitioningMergingConnectorDescriptor(spec, new KmerHashPartitioncomputerFactory(), 

-                  keyFields, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY)} );

+                  keyFields, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY)} );

             cross_grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields, 

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },  

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },  

                     new DistributedMergeLmerAggregateFactory(), 

                     outputRec);

         }

         else{

             long inputSizeInRawRecords = 154000000;

             long inputSizeInUniqueKeys = 38500000;

-            int recordSizeInBytes = 9;

+            int recordSizeInBytes = 4;

             int hashfuncStartLevel = 1;

             single_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,

                     frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

-                    new IBinaryHashFunctionFamily[] {new LongBinaryHashFunctionFamily()},

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

+                    new IBinaryHashFunctionFamily[] {new VLongBinaryHashFunctionFamily()},

                     //new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

                     hashfuncStartLevel, 

-                    new Integer64NormalizedKeyComputerFactory(),

+                    new VLongNormalizedKeyComputerFactory(),

                     new MergeKmerAggregateFactory(),

                     new DistributedMergeLmerAggregateFactory(),

                     outputRec, true);

@@ -275,11 +260,11 @@
             recordSizeInBytes = 13;

             cross_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,

                     frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

-                    new IBinaryHashFunctionFamily[] {new LongBinaryHashFunctionFamily()},

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

+                    new IBinaryHashFunctionFamily[] {new VLongBinaryHashFunctionFamily()},

                     //new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

                     hashfuncStartLevel, 

-                    new Integer64NormalizedKeyComputerFactory(),

+                    new VLongNormalizedKeyComputerFactory(),

                     new DistributedMergeLmerAggregateFactory(),

                     new DistributedMergeLmerAggregateFactory(),

                     outputRec, true);            

@@ -290,18 +275,20 @@
         

         IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(spec);

         spec.connect(readfileConn, scan, 0, single_grouper, 0);

+        

 

         //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper,NC1_ID);

         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

         spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);

 

-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec, "G:\\data\\result");

-        //PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);

+        //PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec, "G:\\data\\result");

+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);

         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

         //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);

 

         IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);

         spec.connect(printConn, cross_grouper, 0, printer, 0);

+        //spec.connect(readfileConn, scan, 0, printer, 0);

 

         spec.addRoot(printer);

 

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
index f40c301..1182cb1 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -52,9 +52,12 @@
                 byte count = 0;

                 int tupleOffset = accessor.getTupleStartOffset(tIndex);

                 int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

-                bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),

-                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);

+                                

+                bitmap |= accessor.getBuffer().get(tupleOffset + accessor.getFieldSlotsLength() 

+                                                       + fieldStart);

+                

                	count += 1;

+               	

                 DataOutput fieldOutput = tupleBuilder.getDataOutput();

                 try {

                     fieldOutput.writeByte(bitmap);

@@ -76,14 +79,16 @@
 

                 int tupleOffset = accessor.getTupleStartOffset(tIndex);

                 int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

-                int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;

-

-                bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);

-                count += 1;

+                        

+                bitmap |= accessor.getBuffer().get(tupleOffset + accessor.getFieldSlotsLength() 

+                                                       + fieldStart);

 

                 int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);

                 int statefieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);

                 int stateoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + statefieldStart;

+                

+                

+                count += 1;

 

                 byte[] data = stateAccessor.getBuffer().array();

 

@@ -113,7 +118,6 @@
 

                 int offset = fieldOffset + accessor.getFieldSlotsLength() + tupleOffset;

                 bitmap = ByteSerializerDeserializer.getByte(data, offset);

-

                 count = ByteSerializerDeserializer.getByte(data, offset + 1);

                 try {

                     fieldOutput.writeByte(bitmap);