update for genomix

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2777 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
index e7a8e6c..542bd20 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -1,6 +1,7 @@
 package edu.uci.ics.genomix.dataflow;

 

 import java.io.BufferedReader;

+import java.io.File;

 import java.io.FileInputStream;

 import java.io.InputStream;

 import java.io.InputStreamReader;

@@ -24,11 +25,13 @@
 

     private static final long serialVersionUID = 1L;

     private int k;

+    private String filename;

 

-    public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k) {

+    public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k, String filename) {

         super(spec, 0, 1);

         // TODO Auto-generated constructor stub

         this.k = k;

+        this.filename = filename;

         //recordDescriptors[0] = news RecordDescriptor(

         //		new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });

         recordDescriptors[0] = new RecordDescriptor(new ISerializerDeserializer[] {

@@ -61,28 +64,15 @@
                 outputAppender = new FrameTupleAppender(ctx.getFrameSize());

                 outputAppender.reset(outputBuffer, true);

                 try {// one try with multiple catch?

-                     //FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(

-                     //        FileScanDescriptor.class.getSimpleName());

-                     //writer = new RunFileWriter(file, ctx.getIOManager());

                     writer.open();

-                    // read the file

-                    InputStream filenames;

-                    /*File roots = new File("G:\\data");

-                    for (File file : roots.listFiles())

-                    	System.out.println(file);

-                    String s = "G:" + File.separator + "data"

-                    		+ File.separator + "filename.txt";*/

+                    String s = filename + String.valueOf(temp);

+                    

+                    File tf = new File(s);

+                    

+                    File[] fa = tf.listFiles();

 

-                    String s = "g:\\data\\filename" + String.valueOf(temp) + ".txt";

-

-                    filenames = new FileInputStream(s);

-                    // filenames = new FileInputStream("filename.txt");

-

-                    String line;

-                    BufferedReader reader = new BufferedReader(new InputStreamReader(filenames));

-                    line = reader.readLine();

-                    while (line != null) {

-                        BufferedReader readsfile = new BufferedReader(new InputStreamReader(new FileInputStream(line)));

+                    for(int i = 0 ; i < fa.length ; i++){

+                        BufferedReader readsfile = new BufferedReader(new InputStreamReader(new FileInputStream(fa[i])));

                         String read = readsfile.readLine();

                         while (read != null) {

                             read = readsfile.readLine();

@@ -94,11 +84,7 @@
 

                             read = readsfile.readLine();

                         }

-                        line = reader.readLine();

-                        readsfile.close();

                     }

-                    reader.close();

-                    filenames.close();

                     if (outputAppender.getTupleCount() > 0) {

                         FrameUtils.flushFrame(outputBuffer, writer);

                     }

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index c458f90..efeb7ae 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -60,7 +60,6 @@
 import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;

-import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;

 

 public class Tester {

 

@@ -113,7 +112,7 @@
         long end = System.currentTimeMillis();

         System.err.println(start + " " + end + " " + (end - start));

         

-    /*    FileOutputStream filenames;

+    /*   

 

         String s = "g:\\data\\results.txt" ;

 

@@ -190,13 +189,13 @@
 

         spec.setFrameSize(32768);

 

-        FileScanDescriptor scan = new FileScanDescriptor(spec, k);

+        FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);

         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

         //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID);

 

         RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {

                 Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,

-                IntegerSerializerDeserializer.INSTANCE });

+                ByteSerializerDeserializer.INSTANCE });

 

         int[] keyFields = new int[] { 0 };

         int frameLimits = 4096;

@@ -264,8 +263,8 @@
             single_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,

                     frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,

                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

-                    //new IBinaryHashFunctionFamily[] {new LongBinaryHashFunctionFamily()},

-                    new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

+                    new IBinaryHashFunctionFamily[] {new LongBinaryHashFunctionFamily()},

+                    //new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

                     hashfuncStartLevel, 

                     new Integer64NormalizedKeyComputerFactory(),

                     new MergeKmerAggregateFactory(),

@@ -277,8 +276,8 @@
             cross_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,

                     frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,

                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

-                    //new IBinaryHashFunctionFamily[] {new LongBinaryHashFunctionFamily()},

-                    new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

+                    new IBinaryHashFunctionFamily[] {new LongBinaryHashFunctionFamily()},

+                    //new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

                     hashfuncStartLevel, 

                     new Integer64NormalizedKeyComputerFactory(),

                     new DistributedMergeLmerAggregateFactory(),

@@ -296,7 +295,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

         spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);

 

-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);

+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec, "G:\\data\\result");

+        //PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);

         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

         //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);

 

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
index 2c70a6e..792d033 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -17,6 +17,7 @@
 

 public class DistributedMergeLmerAggregateFactory implements IAggregatorDescriptorFactory {

     private static final long serialVersionUID = 1L;

+    private static final int max = 255;

 

     public DistributedMergeLmerAggregateFactory() {

     }

@@ -48,7 +49,7 @@
             public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

                     AggregateState state) throws HyracksDataException {

                 byte bitmap = 0;

-                int count = 0;

+                byte count = 0;

                 int tupleOffset = accessor.getTupleStartOffset(tIndex);

                 int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

                 bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),

@@ -58,13 +59,13 @@
                 fieldStart = accessor.getFieldStartOffset(tIndex, 2);

                 int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();

 

-                count += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), offset);

+                count += ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);

 

                 DataOutput fieldOutput = tupleBuilder.getDataOutput();

                 try {

                     fieldOutput.writeByte(bitmap);

                     tupleBuilder.addFieldEndOffset();

-                    fieldOutput.writeInt(count);

+                    fieldOutput.writeByte(count);

                     tupleBuilder.addFieldEndOffset();

                 } catch (IOException e) {

                     throw new HyracksDataException("I/O exception when initializing the aggregator.");

@@ -76,8 +77,9 @@
             public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,

                     int stateTupleIndex, AggregateState state) throws HyracksDataException {

                 // TODO Auto-generated method stub

-                byte bitmap = 0;

-                int count = 0;

+               

+            	byte bitmap = 0;

+                byte count = 0;

 

                 int tupleOffset = accessor.getTupleStartOffset(tIndex);

                 int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

@@ -87,7 +89,8 @@
                 tupleOffset = accessor.getTupleStartOffset(tIndex);

                 fieldStart = accessor.getFieldStartOffset(tIndex, 2);

                 offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();

-                count += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), offset);

+                count = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);

+                

 

                 int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);

                 int statefieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);

@@ -97,9 +100,15 @@
 

                 ByteBuffer buf = ByteBuffer.wrap(data);

                 bitmap |= buf.getChar(stateoffset);

-                count += buf.getInt(stateoffset + 1);

+                buf.position(stateoffset+1);

+                count += buf.get();

+                

+                if(count > max){

+                	count = (byte) max;

+                }

+                

                 buf.put(stateoffset, bitmap);

-                buf.putInt(stateoffset + 1, count);

+                buf.put(stateoffset + 1, count);

             }

 

             @Override

@@ -107,7 +116,7 @@
                     AggregateState state) throws HyracksDataException {

                 // TODO Auto-generated method stub

                 byte bitmap;

-                int count;

+                byte count;

                 DataOutput fieldOutput = tupleBuilder.getDataOutput();

                 byte[] data = accessor.getBuffer().array();

                 int tupleOffset = accessor.getTupleStartOffset(tIndex);

@@ -116,11 +125,11 @@
                 int offset = fieldOffset + accessor.getFieldSlotsLength() + tupleOffset;

                 bitmap = ByteSerializerDeserializer.getByte(data, offset);

 

-                count = IntegerSerializerDeserializer.getInt(data, offset + 1);

+                count = ByteSerializerDeserializer.getByte(data, offset + 1);

                 try {

                     fieldOutput.writeByte(bitmap);

                     tupleBuilder.addFieldEndOffset();

-                    fieldOutput.writeInt(count);

+                    fieldOutput.writeByte(count);

                     tupleBuilder.addFieldEndOffset();

                 } catch (IOException e) {

                     throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

@@ -133,7 +142,7 @@
                     AggregateState state) throws HyracksDataException {

                 // TODO Auto-generated method stub

                 byte bitmap;

-                int count;

+                byte count;

 

                 byte[] data = accessor.getBuffer().array();

                 int tupleOffset = accessor.getTupleStartOffset(tIndex);

@@ -141,13 +150,13 @@
                 int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldOffset;

 

                 bitmap = ByteSerializerDeserializer.getByte(data, offset);

-                count = IntegerSerializerDeserializer.getInt(data, offset + 1);

+                count = ByteSerializerDeserializer.getByte(data, offset + 1);

 

                 DataOutput fieldOutput = tupleBuilder.getDataOutput();

                 try {

                     fieldOutput.writeByte(bitmap);

                     tupleBuilder.addFieldEndOffset();

-                    fieldOutput.writeInt(count);

+                    fieldOutput.writeByte(count);

                     tupleBuilder.addFieldEndOffset();

                 } catch (IOException e) {

                     throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
index b6d3492..f40c301 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -17,6 +17,7 @@
 

 public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {

     private static final long serialVersionUID = 1L;

+    private static final int max =  255;

 

     public MergeKmerAggregateFactory() {

     }

@@ -48,17 +49,17 @@
             public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

                     AggregateState state) throws HyracksDataException {

                 byte bitmap = 0;

-                int count = 0;

+                byte count = 0;

                 int tupleOffset = accessor.getTupleStartOffset(tIndex);

                 int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

                 bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),

                         tupleOffset + accessor.getFieldSlotsLength() + fieldStart);

-                count += 1;

+               	count += 1;

                 DataOutput fieldOutput = tupleBuilder.getDataOutput();

                 try {

                     fieldOutput.writeByte(bitmap);

                     tupleBuilder.addFieldEndOffset();

-                    fieldOutput.writeInt(count);

+                    fieldOutput.writeByte(count);

                     tupleBuilder.addFieldEndOffset();

                 } catch (IOException e) {

                     throw new HyracksDataException("I/O exception when initializing the aggregator.");

@@ -71,7 +72,7 @@
                     int stateTupleIndex, AggregateState state) throws HyracksDataException {

                 // TODO Auto-generated method stub

                 byte bitmap = 0;

-                int count = 0;

+                byte count = 0;

 

                 int tupleOffset = accessor.getTupleStartOffset(tIndex);

                 int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

@@ -88,9 +89,15 @@
 

                 ByteBuffer buf = ByteBuffer.wrap(data);

                 bitmap |= buf.getChar(stateoffset);

-                count += buf.getInt(stateoffset + 1);

+                buf.position(stateoffset+1);

+                count +=  buf.get();

+                

+                if( count > max){

+                	count = (byte)max;

+                }

+                

                 buf.put(stateoffset, bitmap);

-                buf.putInt(stateoffset + 1, count);

+                buf.put(stateoffset + 1, count);

             }

 

             @Override

@@ -98,7 +105,7 @@
                     AggregateState state) throws HyracksDataException {

                 // TODO Auto-generated method stub

                 byte bitmap;

-                int count;

+                byte count;

                 DataOutput fieldOutput = tupleBuilder.getDataOutput();

                 byte[] data = accessor.getBuffer().array();

                 int tupleOffset = accessor.getTupleStartOffset(tIndex);

@@ -107,11 +114,11 @@
                 int offset = fieldOffset + accessor.getFieldSlotsLength() + tupleOffset;

                 bitmap = ByteSerializerDeserializer.getByte(data, offset);

 

-                count = IntegerSerializerDeserializer.getInt(data, offset + 1);

+                count = ByteSerializerDeserializer.getByte(data, offset + 1);

                 try {

                     fieldOutput.writeByte(bitmap);

                     tupleBuilder.addFieldEndOffset();

-                    fieldOutput.writeInt(count);

+                    fieldOutput.writeByte(count);

                     tupleBuilder.addFieldEndOffset();

                 } catch (IOException e) {

                     throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

@@ -124,7 +131,7 @@
                     AggregateState state) throws HyracksDataException {

                 // TODO Auto-generated method stub

                 byte bitmap;

-                int count;

+                byte count;

 

                 byte[] data = accessor.getBuffer().array();

                 int tupleOffset = accessor.getTupleStartOffset(tIndex);

@@ -132,13 +139,13 @@
                 int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldOffset;

 

                 bitmap = ByteSerializerDeserializer.getByte(data, offset);

-                count = IntegerSerializerDeserializer.getInt(data, offset + 1);

+                count = ByteSerializerDeserializer.getByte(data, offset + 1);

 

                 DataOutput fieldOutput = tupleBuilder.getDataOutput();

                 try {

                     fieldOutput.writeByte(bitmap);

                     tupleBuilder.addFieldEndOffset();

-                    fieldOutput.writeInt(count);

+                    fieldOutput.writeByte(count);

                     tupleBuilder.addFieldEndOffset();

                 } catch (IOException e) {

                     throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");