git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2825 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
index 542bd20..1a3b927 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -3,7 +3,6 @@
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
-import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
@@ -17,7 +16,6 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -25,6 +23,7 @@
private static final long serialVersionUID = 1L;
private int k;
+ private int byteNum;
private String filename;
public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k, String filename) {
@@ -32,10 +31,12 @@
// TODO Auto-generated constructor stub
this.k = k;
this.filename = filename;
+
+ byteNum = (byte)Math.ceil((double)k/4.0);
//recordDescriptors[0] = news RecordDescriptor(
// new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
recordDescriptors[0] = new RecordDescriptor(new ISerializerDeserializer[] {
- Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
+ null, null});
}
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
@@ -48,17 +49,11 @@
private ArrayTupleBuilder tupleBuilder;
private ByteBuffer outputBuffer;
private FrameTupleAppender outputAppender;
- private long window;
- @Override
+ @SuppressWarnings("resource")
+ @Override
public void initialize() {
- window = 0;
- for (int i = 0; i < k; i++) {
- window <<= 2;
- window |= 3;
- }
-
tupleBuilder = new ArrayTupleBuilder(2);
outputBuffer = ctx.allocateFrame();
outputAppender = new FrameTupleAppender(ctx.getFrameSize());
@@ -98,9 +93,16 @@
}
}
- private long CompressKmer(byte[] array, int start) {
+ private byte[] CompressKmer(byte[] array, int start) {
// a: 00; c: 01; G: 10; T: 11
- long l = 0;
+
+ byte[] bytes = new byte[byteNum+1];
+ bytes[0] = (byte) k;
+
+ byte l = 0;
+ int count = 0;
+ int bcount = 0;
+
for (int i = start; i < start + k; i++) {
l <<= 2;
switch (array[i]) {
@@ -121,8 +123,15 @@
l |= 3;
break;
}
+ count += 2;
+ if(count%8==0){
+ bcount += 1;
+ bytes[bcount] = l;
+ count = 0;
+ }
}
- return l;
+ bytes[bcount + 1] = l;
+ return bytes;
}
private byte GetBitmap(byte t) {
@@ -170,21 +179,48 @@
}
return r;
}
+
+ void MoveKmer(byte[] bytes, byte c){
+ byte filter0 = (byte) 0xC0;
+ byte filter1 = (byte) 0xFC;
+ byte filter2 = 0;
+
+ int r = byteNum*8 - 2*k;
+ r = 8 - r;
+ for(int i = 0 ; i < r ; i++){
+ filter2 <<= 1;
+ filter2 |= 1;
+ }
+
+ int i = byteNum;
+ bytes[i] <<= 2;
+ bytes[i] &= filter2;
+ i -= 1;
+ while(i > 0){
+ byte f = (byte) (bytes[i] & filter0);
+ f >>= 6;
+ bytes[i+1] |= f;
+ bytes[i] <<= 2;
+ bytes[i] &= filter1;
+ }
+ bytes[i+1] |= ConvertSymbol(c);
+ }
private void SplitReads(byte[] array) {
try {
- long l = 0;
-
- byte pre = 0, next = 0;
+ byte[] bytes=null;
+
+ byte pre = 0, next = 0;
byte r;
for (int i = 0; i < array.length - k + 1; i++) {
if (0 == i) {
- l = CompressKmer(array, i);
+ bytes = CompressKmer(array, i);
} else {
- l <<= 2;
+ MoveKmer(bytes, array[i + k - 1]);
+ /*l <<= 2;
l &= window;
- l |= ConvertSymbol(array[i + k - 1]);
+ l |= ConvertSymbol(array[i + k - 1]);*/
pre = GetBitmap(array[i - 1]);
}
if (i + k != array.length) {
@@ -203,8 +239,14 @@
tupleBuilder.reset();
- tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE, l);
+ //tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE, l);
+ tupleBuilder.addField(bytes, 0, byteNum + 1);
tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, r);
+
+
+ //int[] a = tupleBuilder.getFieldEndOffsets();
+ //int b = tupleBuilder.getSize();
+ //byte[] c = tupleBuilder.getByteArray();
if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
tupleBuilder.getSize())) {
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
index da85170..fb0fc18 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
@@ -5,25 +5,41 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.nio.ByteBuffer;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
-import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.util.StringSerializationUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
public class PrinterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
private static final long serialVersionUID = 1L;
private String filename;
private boolean writeFile;
- private BufferedWriter writer;
+ private BufferedWriter twriter;
private FileOutputStream stream;
+
+ /**
+ * The constructor of HDFSWriteOperatorDescriptor.
+ *
+ * @param spec
+ * the JobSpecification object
+ * @param conf
+ * the Hadoop JobConf which contains the output path
+ * @param tupleWriterFactory
+ * the ITupleWriterFactory implementation object
+ * @throws HyracksException
+ */
public PrinterOperatorDescriptor(IOperatorDescriptorRegistry spec) {
super(spec, 1, 0);
writeFile = false;
@@ -35,79 +51,118 @@
writeFile = true;
}
- private class PrinterOperator implements IOpenableDataWriterOperator {
-
- private int partition;
- public PrinterOperator(int partition){
- this.partition = partition;
- }
-
- @Override
- public void open() throws HyracksDataException {
- if( true == writeFile){
- try {
- filename = filename + String.valueOf(partition)+".txt";
- //System.err.println(filename);
- stream = new FileOutputStream(filename);
- } catch (FileNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- writer = new BufferedWriter(new OutputStreamWriter(stream));
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- //System.err.println("kick");
- if( true == writeFile){
- try {
- writer.close();
- stream.close();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- }
-
- @Override
- public void writeData(Object[] data) throws HyracksDataException {
- try{
- if(true == writeFile){
- for (int i = 0; i < data.length; ++i) {
- writer.write(String.valueOf(data[i]));
- writer.write(", ");
- writer.write("\n");
- }
- }
- else{
- for (int i = 0; i < data.length; ++i) {
- System.err.print(StringSerializationUtils.toString(data[i]));
- System.err.print(", ");
- }
- System.err.println();
- }
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- @Override
- public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
- throw new IllegalArgumentException();
- }
- }
-
@Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new DeserializedOperatorNodePushable(ctx, new PrinterOperator(partition),
- recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+ throws HyracksDataException {
+
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+ private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
+ private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
+ private FrameTupleReference tuple = new FrameTupleReference();
+
+
+ @Override
+ public void open() throws HyracksDataException {
+ if( true == writeFile){
+ try {
+ filename = filename + String.valueOf(partition)+".txt";
+ //System.err.println(filename);
+ stream = new FileOutputStream(filename);
+ } catch (FileNotFoundException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ twriter = new BufferedWriter(new OutputStreamWriter(stream));
+ }
+ }
+
+
+ private void PrintBytes(int no){
+ try{
+
+ byte[] bytes = tuple.getFieldData(no);
+ int offset = tuple.getFieldStart(no);
+ int length = tuple.getFieldLength(no);
+ if(true == writeFile){
+ for(int j = offset ; j < offset + length ; j++){
+ twriter.write(String.valueOf((int)bytes[j]));
+ twriter.write(" ");
+ }
+ twriter.write("&&");
+ }
+ else{
+ for(int j = offset ; j < offset + length ;j++){
+ System.err.print(String.valueOf((int)bytes[j]));
+ System.err.print(" ");
+ }
+ System.err.print("&&");
+ }
+ }
+ catch(IOException e){
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ try{
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ tuple.reset(accessor, i);
+ int tj = tuple.getFieldCount();
+ for(int j = 0 ; j< tj ; j++){
+ PrintBytes(j);
+ }
+ if(true == writeFile){
+ twriter.write("\n");
+ }
+ else{
+ System.err.println();
+ }
+ }
+ }
+ catch(IOException e){
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if( true == writeFile){
+ try {
+ twriter.close();
+ stream.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+
+ };
}
-}
\ No newline at end of file
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index efeb7ae..06bde29 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -1,21 +1,15 @@
package edu.uci.ics.genomix.dataflow;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
import java.util.logging.Level;
import java.util.logging.Logger;
-import edu.uci.ics.genomix.data.normalizers.Integer64NormalizedKeyComputerFactory;
+import edu.uci.ics.genomix.data.normalizers.VLongNormalizedKeyComputerFactory;
import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;
import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
import edu.uci.ics.genomix.data.std.accessors.LongBinaryHashFunctionFamily;
-import edu.uci.ics.genomix.data.std.accessors.MurmurHash3BinaryHashFunctionFamily;
+import edu.uci.ics.genomix.data.std.accessors.VLongBinaryHashFunctionFamily;
+import edu.uci.ics.genomix.data.std.primitive.VLongPointable;
import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -28,15 +22,11 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
@@ -45,18 +35,12 @@
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
-import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
@@ -140,6 +124,7 @@
ccConfig.ccRoot = ccRoot.getAbsolutePath();
cc = new ClusterControllerService(ccConfig);
cc.start();
+ ccConfig.defaultMaxJobAttempts = 0;
NCConfig ncConfig1 = new NCConfig();
ncConfig1.ccHost = "localhost";
@@ -187,31 +172,31 @@
private static JobSpecification createJob(String filename, int k, int page_num, int type) throws HyracksDataException {
JobSpecification spec = new JobSpecification();
- spec.setFrameSize(32768);
+ //spec.setFrameSize(32768);
+ spec.setFrameSize(64);
FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
//PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,
- ByteSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {null, ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE});
+ //Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,
+ //ByteSerializerDeserializer.INSTANCE });
- int[] keyFields = new int[] { 0 };
+ int[] keyFields = new int[] { 0 };
int frameLimits = 4096;
int tableSize = 10485767;
AbstractOperatorDescriptor single_grouper;
IConnectorDescriptor conn_partition;
AbstractOperatorDescriptor cross_grouper;
-
if(0 == type){//external group by
single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
- new Integer64NormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
+ new VLongNormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),
// new IntSumFieldAggregatorFactory(1, false) }),
new DistributedMergeLmerAggregateFactory(),
@@ -219,14 +204,14 @@
outputRec, new HashSpillableTableFactory(
new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(LongPointable.FACTORY) }), tableSize), true);
+ .of(VLongPointable.FACTORY) }), tableSize), true);
conn_partition = new MToNPartitioningConnectorDescriptor(spec,
new KmerHashPartitioncomputerFactory());
cross_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
- new Integer64NormalizedKeyComputerFactory(), new DistributedMergeLmerAggregateFactory(),
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
+ new VLongNormalizedKeyComputerFactory(), new DistributedMergeLmerAggregateFactory(),
// new IntSumFieldAggregatorFactory(1, false) }),
new DistributedMergeLmerAggregateFactory(),
@@ -234,39 +219,39 @@
outputRec, new HashSpillableTableFactory(
new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(LongPointable.FACTORY) }), tableSize), true);
+ .of(VLongPointable.FACTORY) }), tableSize), true);
}
else if( 1 == type){
single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
- new Integer64NormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
+ new VLongNormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),
// new IntSumFieldAggregatorFactory(1, false) }),
new DistributedMergeLmerAggregateFactory(),
// new IntSumFieldAggregatorFactory(1, false) }),
outputRec, new HashSpillableTableFactory(
new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(LongPointable.FACTORY) }), tableSize), true);
+ .of(VLongPointable.FACTORY) }), tableSize), true);
conn_partition = new MToNPartitioningMergingConnectorDescriptor(spec, new KmerHashPartitioncomputerFactory(),
- keyFields, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY)} );
+ keyFields, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY)} );
cross_grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
new DistributedMergeLmerAggregateFactory(),
outputRec);
}
else{
long inputSizeInRawRecords = 154000000;
long inputSizeInUniqueKeys = 38500000;
- int recordSizeInBytes = 9;
+ int recordSizeInBytes = 4;
int hashfuncStartLevel = 1;
single_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,
frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
- new IBinaryHashFunctionFamily[] {new LongBinaryHashFunctionFamily()},
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
+ new IBinaryHashFunctionFamily[] {new VLongBinaryHashFunctionFamily()},
//new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
hashfuncStartLevel,
- new Integer64NormalizedKeyComputerFactory(),
+ new VLongNormalizedKeyComputerFactory(),
new MergeKmerAggregateFactory(),
new DistributedMergeLmerAggregateFactory(),
outputRec, true);
@@ -275,11 +260,11 @@
recordSizeInBytes = 13;
cross_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,
frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
- new IBinaryHashFunctionFamily[] {new LongBinaryHashFunctionFamily()},
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
+ new IBinaryHashFunctionFamily[] {new VLongBinaryHashFunctionFamily()},
//new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
hashfuncStartLevel,
- new Integer64NormalizedKeyComputerFactory(),
+ new VLongNormalizedKeyComputerFactory(),
new DistributedMergeLmerAggregateFactory(),
new DistributedMergeLmerAggregateFactory(),
outputRec, true);
@@ -290,18 +275,20 @@
IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(spec);
spec.connect(readfileConn, scan, 0, single_grouper, 0);
+
//PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper,NC1_ID);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec, "G:\\data\\result");
- //PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ //PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec, "G:\\data\\result");
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
//PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);
spec.connect(printConn, cross_grouper, 0, printer, 0);
+ //spec.connect(readfileConn, scan, 0, printer, 0);
spec.addRoot(printer);
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
index f40c301..1182cb1 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -52,9 +52,12 @@
byte count = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
- bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+
+ bitmap |= accessor.getBuffer().get(tupleOffset + accessor.getFieldSlotsLength()
+ + fieldStart);
+
count += 1;
+
DataOutput fieldOutput = tupleBuilder.getDataOutput();
try {
fieldOutput.writeByte(bitmap);
@@ -76,14 +79,16 @@
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
- int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
-
- bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
- count += 1;
+
+ bitmap |= accessor.getBuffer().get(tupleOffset + accessor.getFieldSlotsLength()
+ + fieldStart);
int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
int statefieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
int stateoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + statefieldStart;
+
+
+ count += 1;
byte[] data = stateAccessor.getBuffer().array();
@@ -113,7 +118,6 @@
int offset = fieldOffset + accessor.getFieldSlotsLength() + tupleOffset;
bitmap = ByteSerializerDeserializer.getByte(data, offset);
-
count = ByteSerializerDeserializer.getByte(data, offset + 1);
try {
fieldOutput.writeByte(bitmap);