update for genomix
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2777 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
index e7a8e6c..542bd20 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -1,6 +1,7 @@
package edu.uci.ics.genomix.dataflow;
import java.io.BufferedReader;
+import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -24,11 +25,13 @@
private static final long serialVersionUID = 1L;
private int k;
+ private String filename;
- public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k) {
+ public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k, String filename) {
super(spec, 0, 1);
// TODO Auto-generated constructor stub
this.k = k;
+ this.filename = filename;
//recordDescriptors[0] = news RecordDescriptor(
// new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
recordDescriptors[0] = new RecordDescriptor(new ISerializerDeserializer[] {
@@ -61,28 +64,15 @@
outputAppender = new FrameTupleAppender(ctx.getFrameSize());
outputAppender.reset(outputBuffer, true);
try {// one try with multiple catch?
- //FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
- // FileScanDescriptor.class.getSimpleName());
- //writer = new RunFileWriter(file, ctx.getIOManager());
writer.open();
- // read the file
- InputStream filenames;
- /*File roots = new File("G:\\data");
- for (File file : roots.listFiles())
- System.out.println(file);
- String s = "G:" + File.separator + "data"
- + File.separator + "filename.txt";*/
+ String s = filename + String.valueOf(temp);
+
+ File tf = new File(s);
+
+ File[] fa = tf.listFiles();
- String s = "g:\\data\\filename" + String.valueOf(temp) + ".txt";
-
- filenames = new FileInputStream(s);
- // filenames = new FileInputStream("filename.txt");
-
- String line;
- BufferedReader reader = new BufferedReader(new InputStreamReader(filenames));
- line = reader.readLine();
- while (line != null) {
- BufferedReader readsfile = new BufferedReader(new InputStreamReader(new FileInputStream(line)));
+ for(int i = 0 ; i < fa.length ; i++){
+ BufferedReader readsfile = new BufferedReader(new InputStreamReader(new FileInputStream(fa[i])));
String read = readsfile.readLine();
while (read != null) {
read = readsfile.readLine();
@@ -94,11 +84,7 @@
read = readsfile.readLine();
}
- line = reader.readLine();
- readsfile.close();
}
- reader.close();
- filenames.close();
if (outputAppender.getTupleCount() > 0) {
FrameUtils.flushFrame(outputBuffer, writer);
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index c458f90..efeb7ae 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -60,7 +60,6 @@
import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
public class Tester {
@@ -113,7 +112,7 @@
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
- /* FileOutputStream filenames;
+ /*
String s = "g:\\data\\results.txt" ;
@@ -190,13 +189,13 @@
spec.setFrameSize(32768);
- FileScanDescriptor scan = new FileScanDescriptor(spec, k);
+ FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
//PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID);
RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ ByteSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int frameLimits = 4096;
@@ -264,8 +263,8 @@
single_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,
frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
- //new IBinaryHashFunctionFamily[] {new LongBinaryHashFunctionFamily()},
- new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
+ new IBinaryHashFunctionFamily[] {new LongBinaryHashFunctionFamily()},
+ //new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
hashfuncStartLevel,
new Integer64NormalizedKeyComputerFactory(),
new MergeKmerAggregateFactory(),
@@ -277,8 +276,8 @@
cross_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,
frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
- //new IBinaryHashFunctionFamily[] {new LongBinaryHashFunctionFamily()},
- new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
+ new IBinaryHashFunctionFamily[] {new LongBinaryHashFunctionFamily()},
+ //new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
hashfuncStartLevel,
new Integer64NormalizedKeyComputerFactory(),
new DistributedMergeLmerAggregateFactory(),
@@ -296,7 +295,8 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec, "G:\\data\\result");
+ //PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
//PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
index 2c70a6e..792d033 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -17,6 +17,7 @@
public class DistributedMergeLmerAggregateFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
+ private static final int max = 255;
public DistributedMergeLmerAggregateFactory() {
}
@@ -48,7 +49,7 @@
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
byte bitmap = 0;
- int count = 0;
+ byte count = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),
@@ -58,13 +59,13 @@
fieldStart = accessor.getFieldStartOffset(tIndex, 2);
int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
- count += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), offset);
+ count += ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
DataOutput fieldOutput = tupleBuilder.getDataOutput();
try {
fieldOutput.writeByte(bitmap);
tupleBuilder.addFieldEndOffset();
- fieldOutput.writeInt(count);
+ fieldOutput.writeByte(count);
tupleBuilder.addFieldEndOffset();
} catch (IOException e) {
throw new HyracksDataException("I/O exception when initializing the aggregator.");
@@ -76,8 +77,9 @@
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
// TODO Auto-generated method stub
- byte bitmap = 0;
- int count = 0;
+
+ byte bitmap = 0;
+ byte count = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
@@ -87,7 +89,8 @@
tupleOffset = accessor.getTupleStartOffset(tIndex);
fieldStart = accessor.getFieldStartOffset(tIndex, 2);
offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
- count += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), offset);
+ count = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
+
int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
int statefieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
@@ -97,9 +100,15 @@
ByteBuffer buf = ByteBuffer.wrap(data);
bitmap |= buf.getChar(stateoffset);
- count += buf.getInt(stateoffset + 1);
+ buf.position(stateoffset+1);
+ count += buf.get();
+
+ if(count > max){
+ count = (byte) max;
+ }
+
buf.put(stateoffset, bitmap);
- buf.putInt(stateoffset + 1, count);
+ buf.put(stateoffset + 1, count);
}
@Override
@@ -107,7 +116,7 @@
AggregateState state) throws HyracksDataException {
// TODO Auto-generated method stub
byte bitmap;
- int count;
+ byte count;
DataOutput fieldOutput = tupleBuilder.getDataOutput();
byte[] data = accessor.getBuffer().array();
int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -116,11 +125,11 @@
int offset = fieldOffset + accessor.getFieldSlotsLength() + tupleOffset;
bitmap = ByteSerializerDeserializer.getByte(data, offset);
- count = IntegerSerializerDeserializer.getInt(data, offset + 1);
+ count = ByteSerializerDeserializer.getByte(data, offset + 1);
try {
fieldOutput.writeByte(bitmap);
tupleBuilder.addFieldEndOffset();
- fieldOutput.writeInt(count);
+ fieldOutput.writeByte(count);
tupleBuilder.addFieldEndOffset();
} catch (IOException e) {
throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
@@ -133,7 +142,7 @@
AggregateState state) throws HyracksDataException {
// TODO Auto-generated method stub
byte bitmap;
- int count;
+ byte count;
byte[] data = accessor.getBuffer().array();
int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -141,13 +150,13 @@
int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldOffset;
bitmap = ByteSerializerDeserializer.getByte(data, offset);
- count = IntegerSerializerDeserializer.getInt(data, offset + 1);
+ count = ByteSerializerDeserializer.getByte(data, offset + 1);
DataOutput fieldOutput = tupleBuilder.getDataOutput();
try {
fieldOutput.writeByte(bitmap);
tupleBuilder.addFieldEndOffset();
- fieldOutput.writeInt(count);
+ fieldOutput.writeByte(count);
tupleBuilder.addFieldEndOffset();
} catch (IOException e) {
throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
index b6d3492..f40c301 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -17,6 +17,7 @@
public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
+ private static final int max = 255;
public MergeKmerAggregateFactory() {
}
@@ -48,17 +49,17 @@
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
byte bitmap = 0;
- int count = 0;
+ byte count = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),
tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
- count += 1;
+ count += 1;
DataOutput fieldOutput = tupleBuilder.getDataOutput();
try {
fieldOutput.writeByte(bitmap);
tupleBuilder.addFieldEndOffset();
- fieldOutput.writeInt(count);
+ fieldOutput.writeByte(count);
tupleBuilder.addFieldEndOffset();
} catch (IOException e) {
throw new HyracksDataException("I/O exception when initializing the aggregator.");
@@ -71,7 +72,7 @@
int stateTupleIndex, AggregateState state) throws HyracksDataException {
// TODO Auto-generated method stub
byte bitmap = 0;
- int count = 0;
+ byte count = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
@@ -88,9 +89,15 @@
ByteBuffer buf = ByteBuffer.wrap(data);
bitmap |= buf.getChar(stateoffset);
- count += buf.getInt(stateoffset + 1);
+ buf.position(stateoffset+1);
+ count += buf.get();
+
+ if( count > max){
+ count = (byte)max;
+ }
+
buf.put(stateoffset, bitmap);
- buf.putInt(stateoffset + 1, count);
+ buf.put(stateoffset + 1, count);
}
@Override
@@ -98,7 +105,7 @@
AggregateState state) throws HyracksDataException {
// TODO Auto-generated method stub
byte bitmap;
- int count;
+ byte count;
DataOutput fieldOutput = tupleBuilder.getDataOutput();
byte[] data = accessor.getBuffer().array();
int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -107,11 +114,11 @@
int offset = fieldOffset + accessor.getFieldSlotsLength() + tupleOffset;
bitmap = ByteSerializerDeserializer.getByte(data, offset);
- count = IntegerSerializerDeserializer.getInt(data, offset + 1);
+ count = ByteSerializerDeserializer.getByte(data, offset + 1);
try {
fieldOutput.writeByte(bitmap);
tupleBuilder.addFieldEndOffset();
- fieldOutput.writeInt(count);
+ fieldOutput.writeByte(count);
tupleBuilder.addFieldEndOffset();
} catch (IOException e) {
throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
@@ -124,7 +131,7 @@
AggregateState state) throws HyracksDataException {
// TODO Auto-generated method stub
byte bitmap;
- int count;
+ byte count;
byte[] data = accessor.getBuffer().array();
int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -132,13 +139,13 @@
int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldOffset;
bitmap = ByteSerializerDeserializer.getByte(data, offset);
- count = IntegerSerializerDeserializer.getInt(data, offset + 1);
+ count = ByteSerializerDeserializer.getByte(data, offset + 1);
DataOutput fieldOutput = tupleBuilder.getDataOutput();
try {
fieldOutput.writeByte(bitmap);
tupleBuilder.addFieldEndOffset();
- fieldOutput.writeInt(count);
+ fieldOutput.writeByte(count);
tupleBuilder.addFieldEndOffset();
} catch (IOException e) {
throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");