passed readid groupby test
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/ReadIDPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/ReadIDPointable.java
new file mode 100644
index 0000000..b1e08c4
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/ReadIDPointable.java
@@ -0,0 +1,118 @@
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.data.std.api.AbstractPointable;
+import edu.uci.ics.hyracks.data.std.api.IComparable;
+import edu.uci.ics.hyracks.data.std.api.IHashable;
+import edu.uci.ics.hyracks.data.std.api.INumeric;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+
+public class ReadIDPointable extends AbstractPointable implements IHashable, IComparable, INumeric {
+ public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isFixedLength() {
+ return false;
+ }
+
+ @Override
+ public int getFixedLength() {
+ return -1;
+ }
+ };
+
+ public static final IPointableFactory FACTORY = new IPointableFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IPointable createPointable() {
+ return new IntegerPointable();
+ }
+
+ @Override
+ public ITypeTraits getTypeTraits() {
+ return TYPE_TRAITS;
+ }
+ };
+
+ public static int getInteger(byte[] bytes, int start) {
+ return Marshal.getInt(bytes, start);
+ }
+
+ public static void setInteger(byte[] bytes, int start, int value) {
+ Marshal.putInt(value, bytes, start);
+ }
+
+ public int getInteger() {
+ return getInteger(bytes, start);
+ }
+
+ public void setInteger(int value) {
+ setInteger(bytes, start, value);
+ }
+
+ public int preIncrement() {
+ int v = getInteger();
+ ++v;
+ setInteger(v);
+ return v;
+ }
+
+ public int postIncrement() {
+ int v = getInteger();
+ int ov = v++;
+ setInteger(v);
+ return ov;
+ }
+
+ @Override
+ public int compareTo(IPointable pointer) {
+ return compareTo(pointer.getByteArray(), pointer.getStartOffset(), pointer.getLength());
+ }
+
+ @Override
+ public int compareTo(byte[] bytes, int start, int length) {
+ int v = getInteger();
+ int ov = getInteger(bytes, start);
+ return v < ov ? -1 : (v > ov ? 1 : 0);
+ }
+
+ @Override
+ public int hash() {
+ return getInteger();
+ }
+
+ @Override
+ public byte byteValue() {
+ return (byte) getInteger();
+ }
+
+ @Override
+ public short shortValue() {
+ return (short) getInteger();
+ }
+
+ @Override
+ public int intValue() {
+ return getInteger();
+ }
+
+ @Override
+ public long longValue() {
+ return getInteger();
+ }
+
+ @Override
+ public float floatValue() {
+ return getInteger();
+ }
+
+ @Override
+ public double doubleValue() {
+ return getInteger();
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
index 84334c6..17ebde5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
@@ -99,6 +99,7 @@
for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
positionEntry.setNewReference(data, offsetPoslist + i);
if (positionEntry.getPosInRead() == 0) {
+ //TODO remove the pos of the same readID
zeroPositionCollection2.append(positionEntry);
} else {
noneZeroPositionCollection2.append(positionEntry);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
index fe9dab5..57ba91a 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
@@ -87,7 +87,9 @@
throws IOException {
int len = accessor.getFieldLength(tIndex, idField);
out.writeInt(len);
- out.write(accessor.getBuffer().array(), getOffSet(accessor, tIndex, idField), len);
+ if (len > 0) {
+ out.write(accessor.getBuffer().array(), getOffSet(accessor, tIndex, idField), len);
+ }
}
@Override
@@ -126,6 +128,7 @@
try {
fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
tupleBuilder.addFieldEndOffset();
+
} catch (IOException e) {
throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
index fb36898..f1ebdff 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -78,28 +78,37 @@
AggregateState state) throws HyracksDataException {
PositionArray positionArray = (PositionArray) state.state;
positionArray.reset();
- ArrayBackedValueStorage[] storages = positionArray.storages;
- int fieldOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, InputPositionListField);
+ pushIntoStorage(accessor, tIndex, positionArray);
+
+ // make fake fields
+ for (int i = 0; i < ValidPosCount; i++) {
+ tupleBuilder.addFieldEndOffset();
+ }
+ }
+
+ private void pushIntoStorage(IFrameTupleAccessor accessor, int tIndex, PositionArray positionArray)
+ throws HyracksDataException {
+ ArrayBackedValueStorage[] storages = positionArray.storages;
+ int leadbyte = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+ int fieldOffset = leadbyte + accessor.getFieldStartOffset(tIndex, InputPositionListField);
ByteBuffer fieldBuffer = accessor.getBuffer();
- while (fieldOffset < accessor.getFieldEndOffset(tIndex, InputPositionListField)) {
+ while (fieldOffset < leadbyte + accessor.getFieldEndOffset(tIndex, InputPositionListField)) {
byte posInRead = fieldBuffer.get(fieldOffset);
if (storages[posInRead].getLength() > 0) {
throw new IllegalArgumentException("Reentering into an exist storage");
}
fieldOffset += BYTE_SIZE;
+
// read poslist
fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
// read Kmer
fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
- positionArray.count++;
+
+ positionArray.count += 1;
}
- // make fake fields
- for (int i = 0; i < ValidPosCount; i++) {
- tupleBuilder.addFieldEndOffset();
- }
+
}
private int writeBytesToStorage(ArrayBackedValueStorage storage, ByteBuffer fieldBuffer, int fieldOffset)
@@ -108,7 +117,9 @@
try {
storage.getDataOutput().writeInt(lengthPosList);
fieldOffset += INTEGER_SIZE;
- storage.getDataOutput().write(fieldBuffer.array(), fieldOffset, lengthPosList);
+ if (lengthPosList > 0) {
+ storage.getDataOutput().write(fieldBuffer.array(), fieldOffset, lengthPosList);
+ }
} catch (IOException e) {
throw new HyracksDataException("Failed to write into temporary storage");
}
@@ -125,24 +136,7 @@
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
PositionArray positionArray = (PositionArray) state.state;
- ArrayBackedValueStorage[] storages = positionArray.storages;
-
- int fieldOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, InputPositionListField);
- ByteBuffer fieldBuffer = accessor.getBuffer();
-
- while (fieldOffset < accessor.getFieldEndOffset(tIndex, InputPositionListField)) {
- byte posInRead = fieldBuffer.get(fieldOffset);
- if (storages[posInRead].getLength() > 0) {
- throw new IllegalArgumentException("Reentering into an exist storage");
- }
- fieldOffset += BYTE_SIZE;
- // read poslist
- fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
- // read Kmer
- fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
- positionArray.count++;
- }
+ pushIntoStorage(accessor, tIndex, positionArray);
}
@Override
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index 8e45884..bc160cd 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -29,6 +29,7 @@
import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDNormarlizedComputeFactory;
import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
+import edu.uci.ics.genomix.hyracks.data.primitive.ReadIDPointable;
import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
@@ -57,8 +58,6 @@
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
-import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
-import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
@@ -173,8 +172,8 @@
.getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
- hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(readLength, kmerSize,
- bGenerateReversedKmer));
+ hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(readLength,
+ kmerSize, bGenerateReversedKmer));
} catch (Exception e) {
throw new HyracksDataException(e);
}
@@ -232,16 +231,17 @@
int[] keyFields = new int[] { 0 }; // the id of grouped key
// (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(ReadIDPointable.FACTORY) },
MapKmerPositionToReadOperator.readIDOutputRec);
connectOperators(jobSpec, mapKmerToRead, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
jobSpec));
RecordDescriptor readIDFinalRec = new RecordDescriptor(
- new ISerializerDeserializer[MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
+ new ISerializerDeserializer[1 + MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateReadIDAggregateFactory(),
- new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(), null,
- IntegerPointable.FACTORY, AggregateReadIDAggregateFactory.readIDAggregateRec, readIDFinalRec);
+ new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(),
+ new ReadIDNormarlizedComputeFactory(), ReadIDPointable.FACTORY,
+ AggregateReadIDAggregateFactory.readIDAggregateRec, readIDFinalRec);
AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
connectOperators(jobSpec, sorter, ncNodeNames, readLocalAggregator, ncNodeNames,
new OneToOneConnectorDescriptor(jobSpec));
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
index d5e52fa..06fcd8e 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
@@ -1,8 +1,12 @@
package edu.uci.ics.genomix.hyracks.job;
import java.io.DataOutput;
+import java.io.IOException;
import java.util.Map;
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -70,6 +74,9 @@
// TODO Auto-generated method stub
return new ITupleWriter() {
+ private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+ private PositionListWritable plist = new PositionListWritable();
+
@Override
public void open(DataOutput output) throws HyracksDataException {
// TODO Auto-generated method stub
@@ -78,8 +85,40 @@
@Override
public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
- // TODO Auto-generated method stub
+ int readId = Marshal.getInt(tuple.getFieldData(0), tuple.getFieldStart(0));
+ try {
+ output.write((Integer.toString(readId) + "\t").getBytes());
+ for (int i = 1; i < tuple.getFieldCount(); i++) {
+ int fieldOffset = tuple.getFieldStart(i);
+ while (fieldOffset < tuple.getFieldStart(i) + tuple.getFieldLength(i)) {
+ byte[] buffer = tuple.getFieldData(i);
+ // read poslist
+ int posCount = PositionListWritable.getCountByDataLength(Marshal.getInt(
+ buffer, fieldOffset));
+ fieldOffset += 4;
+ plist.setNewReference(posCount, buffer, fieldOffset);
+ fieldOffset += plist.getLength();
+ int kmerbytes = Marshal.getInt(buffer, fieldOffset);
+ if (kmer.getLength() != kmerbytes) {
+ throw new IllegalArgumentException("kmerlength is invalid");
+ }
+ fieldOffset += 4;
+ kmer.setNewReference(buffer, fieldOffset);
+ fieldOffset += kmer.getLength();
+
+ output.write(Integer.toString(i - 1).getBytes());
+ output.writeByte(' ');
+ output.write(plist.toString().getBytes());
+ output.writeByte(' ');
+ output.write(kmer.toString().getBytes());
+ output.writeByte('\t');
+ }
+ }
+ output.writeByte('\n');
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
}
@Override
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
index f2cad73..c399603 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
@@ -84,8 +84,10 @@
tuple.getFieldData(MapKmerPositionToReadOperator.OutputKmerField),
tuple.getFieldStart(MapKmerPositionToReadOperator.OutputKmerField));
- String out = readID + "\t" + (int) posInRead + "\t";
- output.write(out.getBytes());
+ output.write(Integer.toString(readID).getBytes());
+ output.writeByte('\t');
+ output.write(Integer.toString(posInRead).getBytes());
+ output.writeByte('\t');
output.write(plist.toString().getBytes());
output.writeByte('\t');
output.write(kmer.toString().getBytes());
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage b/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
index e69de29..82d4692 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
@@ -0,0 +1,6 @@
+1 0 [] AATAG 1 [] ATAGA 2 [] TAGAA 3 [(6,0)] AGAAG
+2 0 [] AATAG 1 [] ATAGA 2 [] TAGAA 3 [(6,0)] AGAAG
+3 0 [] AATAG 1 [] ATAGA 2 [] TAGAA 3 [(6,0)] AGAAG
+4 0 [] AATAG 1 [] ATAGA 2 [] TAGAA 3 [(6,0)] AGAAG
+5 0 [] AATAG 1 [] ATAGA 2 [] TAGAA 3 [(6,0)] AGAAG
+6 0 [(1,3),(2,3),(3,3),(5,3),(6,3),(4,3)] AGAAG 1 [] GAAGA 2 [] AAGAA 3 [(6,0)] AGAAG