passed map kmer to readid operator
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java
index 4d00731..1f3efbf 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java
@@ -1,8 +1,8 @@
package edu.uci.ics.genomix.hyracks.data.accessors;
+import edu.uci.ics.genomix.data.Marshal;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
public class ReadIDNormarlizedComputeFactory implements INormalizedKeyComputerFactory{
@@ -17,7 +17,7 @@
@Override
public int normalize(byte[] bytes, int start, int length) {
- return IntegerSerializerDeserializer.getInt(bytes, start);
+ return Marshal.getInt(bytes, start);
}
};
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java
index d328bd1..7229364 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java
@@ -2,10 +2,10 @@
import java.nio.ByteBuffer;
+import edu.uci.ics.genomix.data.Marshal;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
public class ReadIDPartitionComputerFactory implements ITuplePartitionComputerFactory {
@@ -26,7 +26,7 @@
ByteBuffer buf = accessor.getBuffer();
- int hash = IntegerSerializerDeserializer.getInt(buf.array(), startOffset + fieldOffset + slotLength);
+ int hash = Marshal.getInt(buf.array(), startOffset + fieldOffset + slotLength);
if (hash < 0) {
hash = -(hash + 1);
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
index 09c0d2e..84334c6 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
@@ -18,186 +18,160 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-public class MapKmerPositionToReadOperator extends
- AbstractSingleActivityOperatorDescriptor {
+public class MapKmerPositionToReadOperator extends AbstractSingleActivityOperatorDescriptor {
- public MapKmerPositionToReadOperator(IOperatorDescriptorRegistry spec,
- RecordDescriptor recDesc) {
- super(spec, 1, 1);
- recordDescriptors[0] = recDesc;
- }
+ public MapKmerPositionToReadOperator(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = recDesc;
+ }
- private static final long serialVersionUID = 1L;
- public static final int InputKmerField = 0;
- public static final int InputPosListField = 1;
+ private static final long serialVersionUID = 1L;
+ public static final int InputKmerField = 0;
+ public static final int InputPosListField = 1;
- public static final int OutputReadIDField = 0;
- public static final int OutputPosInReadField = 1;
- public static final int OutputOtherReadIDListField = 2;
- public static final int OutputKmerField = 3; // may not needed
+ public static final int OutputReadIDField = 0;
+ public static final int OutputPosInReadField = 1;
+ public static final int OutputOtherReadIDListField = 2;
+ public static final int OutputKmerField = 3; // may not needed
- public static final RecordDescriptor readIDOutputRec = new RecordDescriptor(
- new ISerializerDeserializer[] { null, null, null, null });
+ public static final RecordDescriptor readIDOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
+ null, null, null });
- /**
- * Map (Kmer, {(ReadID,PosInRead),...}) into
- * (ReadID,PosInRead,{OtherReadID,...},*Kmer*) OtherReadID appears only when
- * otherReadID.otherPos==0
- */
- public class MapKmerPositionToReadNodePushable extends
- AbstractUnaryInputUnaryOutputOperatorNodePushable {
- private final IHyracksTaskContext ctx;
- private final RecordDescriptor inputRecDesc;
- private final RecordDescriptor outputRecDesc;
+ /**
+ * Map (Kmer, {(ReadID,PosInRead),...}) into
+ * (ReadID,PosInRead,{OtherReadID,...},*Kmer*) OtherReadID appears only when
+ * otherReadID.otherPos==0
+ */
+ public class MapKmerPositionToReadNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ private final IHyracksTaskContext ctx;
+ private final RecordDescriptor inputRecDesc;
+ private final RecordDescriptor outputRecDesc;
- private FrameTupleAccessor accessor;
- private ByteBuffer writeBuffer;
- private ArrayTupleBuilder builder;
- private FrameTupleAppender appender;
+ private FrameTupleAccessor accessor;
+ private ByteBuffer writeBuffer;
+ private ArrayTupleBuilder builder;
+ private FrameTupleAppender appender;
- private PositionReference positionEntry;
- private ArrayBackedValueStorage posListEntry;
- private ArrayBackedValueStorage zeroPositionCollection;
- private ArrayBackedValueStorage noneZeroPositionCollection;
+ private PositionReference positionEntry;
+ private ArrayBackedValueStorage posListEntry;
+ private ArrayBackedValueStorage zeroPositionCollection;
+ private ArrayBackedValueStorage noneZeroPositionCollection;
- public MapKmerPositionToReadNodePushable(IHyracksTaskContext ctx,
- RecordDescriptor inputRecDesc, RecordDescriptor outputRecDesc) {
- this.ctx = ctx;
- this.inputRecDesc = inputRecDesc;
- this.outputRecDesc = outputRecDesc;
- this.positionEntry = new PositionReference();
- this.posListEntry = new ArrayBackedValueStorage();
- this.zeroPositionCollection = new ArrayBackedValueStorage();
- this.noneZeroPositionCollection = new ArrayBackedValueStorage();
- }
+ public MapKmerPositionToReadNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
+ RecordDescriptor outputRecDesc) {
+ this.ctx = ctx;
+ this.inputRecDesc = inputRecDesc;
+ this.outputRecDesc = outputRecDesc;
+ this.positionEntry = new PositionReference();
+ this.posListEntry = new ArrayBackedValueStorage();
+ this.zeroPositionCollection = new ArrayBackedValueStorage();
+ this.noneZeroPositionCollection = new ArrayBackedValueStorage();
+ }
- @Override
- public void open() throws HyracksDataException {
- accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
- writeBuffer = ctx.allocateFrame();
- builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
- appender = new FrameTupleAppender(ctx.getFrameSize());
- appender.reset(writeBuffer, true);
- writer.open();
- posListEntry.reset();
- }
+ @Override
+ public void open() throws HyracksDataException {
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+ writeBuffer = ctx.allocateFrame();
+ builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(writeBuffer, true);
+ writer.open();
+ posListEntry.reset();
+ }
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- scanPosition(i, zeroPositionCollection,
- noneZeroPositionCollection);
- scanAgainToOutputTuple(i, zeroPositionCollection,
- noneZeroPositionCollection, builder);
- }
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ scanPosition(i, zeroPositionCollection, noneZeroPositionCollection);
+ scanAgainToOutputTuple(i, zeroPositionCollection, noneZeroPositionCollection, builder);
+ }
+ }
- private void scanPosition(int tIndex,
- ArrayBackedValueStorage zeroPositionCollection2,
- ArrayBackedValueStorage noneZeroPositionCollection2) {
- zeroPositionCollection2.reset();
- noneZeroPositionCollection2.reset();
- byte[] data = accessor.getBuffer().array();
- int offsetPoslist = accessor.getTupleStartOffset(tIndex)
- + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, InputPosListField);
- for (int i = 0; i < accessor.getFieldLength(tIndex,
- InputPosListField); i += PositionReference.LENGTH) {
- positionEntry.setNewReference(data, offsetPoslist + i);
- if (positionEntry.getPosInRead() == 0) {
- zeroPositionCollection2.append(positionEntry);
- } else {
- noneZeroPositionCollection2.append(positionEntry);
- }
- }
+ private void scanPosition(int tIndex, ArrayBackedValueStorage zeroPositionCollection2,
+ ArrayBackedValueStorage noneZeroPositionCollection2) {
+ zeroPositionCollection2.reset();
+ noneZeroPositionCollection2.reset();
+ byte[] data = accessor.getBuffer().array();
+ int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, InputPosListField);
+ for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
+ positionEntry.setNewReference(data, offsetPoslist + i);
+ if (positionEntry.getPosInRead() == 0) {
+ zeroPositionCollection2.append(positionEntry);
+ } else {
+ noneZeroPositionCollection2.append(positionEntry);
+ }
+ }
- }
+ }
- private void scanAgainToOutputTuple(int tIndex,
- ArrayBackedValueStorage zeroPositionCollection,
- ArrayBackedValueStorage noneZeroPositionCollection,
- ArrayTupleBuilder builder2) {
- byte[] data = accessor.getBuffer().array();
- int offsetPoslist = accessor.getTupleStartOffset(tIndex)
- + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, InputPosListField);
- for (int i = 0; i < accessor.getFieldLength(tIndex,
- InputPosListField); i += PositionReference.LENGTH) {
- positionEntry.setNewReference(data, offsetPoslist + i);
- if (positionEntry.getPosInRead() != 0) {
- appendNodeToBuilder(tIndex, positionEntry,
- zeroPositionCollection, builder2);
- } else {
- appendNodeToBuilder(tIndex, positionEntry,
- noneZeroPositionCollection, builder2);
- }
- }
- }
+ private void scanAgainToOutputTuple(int tIndex, ArrayBackedValueStorage zeroPositionCollection,
+ ArrayBackedValueStorage noneZeroPositionCollection, ArrayTupleBuilder builder2) {
+ byte[] data = accessor.getBuffer().array();
+ int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, InputPosListField);
+ for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
+ positionEntry.setNewReference(data, offsetPoslist + i);
+ if (positionEntry.getPosInRead() != 0) {
+ appendNodeToBuilder(tIndex, positionEntry, zeroPositionCollection, builder2);
+ } else {
+ appendNodeToBuilder(tIndex, positionEntry, noneZeroPositionCollection, builder2);
+ }
+ }
+ }
- private void appendNodeToBuilder(int tIndex, PositionReference pos,
- ArrayBackedValueStorage posList2, ArrayTupleBuilder builder2) {
- try {
- builder2.addField(pos.getByteArray(), 0,
- PositionReference.INTBYTES);
- builder2.addField(pos.getByteArray(),
- PositionReference.INTBYTES, 1);
- // ? ask Yingyi, if support empty bytes[]
- if (posList2 == null) {
- builder2.addFieldEndOffset();
- } else {
- builder2.addField(posList2.getByteArray(),
- posList2.getStartOffset(), posList2.getLength());
- }
- // set kmer, may not useful
- byte[] data = accessor.getBuffer().array();
- int offsetKmer = accessor.getTupleStartOffset(tIndex)
- + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, InputKmerField);
- builder2.addField(data, offsetKmer,
- accessor.getFieldLength(tIndex, InputKmerField));
+ private void appendNodeToBuilder(int tIndex, PositionReference pos, ArrayBackedValueStorage posList2,
+ ArrayTupleBuilder builder2) {
+ try {
+ builder2.addField(pos.getByteArray(), pos.getStartOffset(), PositionReference.INTBYTES);
+ builder2.addField(pos.getByteArray(), pos.getStartOffset() + PositionReference.INTBYTES, 1);
+ if (posList2 == null) {
+ builder2.addFieldEndOffset();
+ } else {
+ builder2.addField(posList2.getByteArray(), posList2.getStartOffset(), posList2.getLength());
+ }
+ // set kmer, may not useful
+ byte[] data = accessor.getBuffer().array();
+ int offsetKmer = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, InputKmerField);
+ builder2.addField(data, offsetKmer, accessor.getFieldLength(tIndex, InputKmerField));
- if (!appender.append(builder2.getFieldEndOffsets(),
- builder2.getByteArray(), 0, builder2.getSize())) {
- FrameUtils.flushFrame(writeBuffer, writer);
- appender.reset(writeBuffer, true);
- if (!appender.append(builder2.getFieldEndOffsets(),
- builder2.getByteArray(), 0, builder2.getSize())) {
- throw new IllegalStateException();
- }
- }
- builder2.reset();
- } catch (HyracksDataException e) {
- throw new IllegalStateException(
- "Failed to Add a field to the tuple by copying the data bytes from a byte array.");
- }
- }
+ if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ appender.reset(writeBuffer, true);
+ if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ builder2.reset();
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(
+ "Failed to Add a field to the tuple by copying the data bytes from a byte array.");
+ }
+ }
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
- @Override
- public void close() throws HyracksDataException {
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(writeBuffer, writer);
- }
- writer.close();
- }
+ @Override
+ public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+ writer.close();
+ }
- }
+ }
- @Override
- public AbstractOperatorNodePushable createPushRuntime(
- IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition,
- int nPartitions) {
- return new MapKmerPositionToReadNodePushable(
- ctx,
- recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
- recordDescriptors[0]);
- }
+ @Override
+ public AbstractOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new MapKmerPositionToReadNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(),
+ 0), recordDescriptors[0]);
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index b36b82d..37d5289 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -47,14 +47,16 @@
private KmerBytesWritable kmer;
private PositionReference pos;
private boolean bReversed;
+ private final int readLength;
public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
null });
- public ReadsKeyValueParserFactory(int k, boolean bGenerateReversed) {
+ public ReadsKeyValueParserFactory(int readlength, int k, boolean bGenerateReversed) {
bReversed = bGenerateReversed;
kmer = new KmerBytesWritable(k);
pos = new PositionReference();
+ this.readLength = readlength;
}
@Override
@@ -76,7 +78,7 @@
try {
readID = Integer.parseInt(geneLine[0]);
} catch (NumberFormatException e) {
- LOG.warn("Invalid data");
+ LOG.warn("Invalid data " );
return;
}
@@ -84,6 +86,10 @@
Matcher geneMatcher = genePattern.matcher(geneLine[1]);
boolean isValid = geneMatcher.matches();
if (isValid) {
+ if (geneLine[1].length() != readLength){
+ LOG.warn("Invalid readlength at: " + readID );
+ return;
+ }
SplitReads(readID, geneLine[1].getBytes(), writer);
}
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
index 4006fb4..6ee7946 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -47,7 +47,6 @@
@Override
public AggregateState createAggregateStates() {
- System.out.println("CreateState");
return new AggregateState(new ArrayBackedValueStorage());
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
index c7552ca..fe9dab5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
@@ -7,6 +7,7 @@
import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -21,6 +22,7 @@
*
*/
private static final long serialVersionUID = 1L;
+
public static final int InputReadIDField = MapKmerPositionToReadOperator.OutputReadIDField;
public static final int InputPosInReadField = MapKmerPositionToReadOperator.OutputPosInReadField;
public static final int InputPositionListField = MapKmerPositionToReadOperator.OutputOtherReadIDListField;
@@ -29,6 +31,9 @@
public static final int OutputReadIDField = 0;
public static final int OutputPositionListField = 1;
+ public static final RecordDescriptor readIDAggregateRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ null, null });
+
public AggregateReadIDAggregateFactory() {
}
@@ -65,7 +70,7 @@
ArrayBackedValueStorage storage = (ArrayBackedValueStorage) state.state;
storage.reset();
DataOutput out = storage.getDataOutput();
- byte posInRead = readByteField(accessor, tIndex, InputPositionListField);
+ byte posInRead = readByteField(accessor, tIndex, InputPosInReadField);
try {
out.writeByte(posInRead);
@@ -74,7 +79,8 @@
} catch (IOException e) {
throw new HyracksDataException("Failed to write into temporary storage");
}
-
+ // make fake feild
+ tupleBuilder.addFieldEndOffset();
}
private void writeBytesToStorage(DataOutput out, IFrameTupleAccessor accessor, int tIndex, int idField)
@@ -95,7 +101,7 @@
int stateTupleIndex, AggregateState state) throws HyracksDataException {
ArrayBackedValueStorage storage = (ArrayBackedValueStorage) state.state;
DataOutput out = storage.getDataOutput();
- byte posInRead = readByteField(accessor, tIndex, InputPositionListField);
+ byte posInRead = readByteField(accessor, tIndex, InputPosInReadField);
try {
out.writeByte(posInRead);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
index d3971fb..fb36898 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -14,57 +14,55 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-public class MergeReadIDAggregateFactory implements
- IAggregatorDescriptorFactory {
+public class MergeReadIDAggregateFactory implements IAggregatorDescriptorFactory {
- /**
+ /**
*
*/
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- private final int ValidPosCount;
+ private final int ValidPosCount;
- public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
- ValidPosCount = getPositionCount(readLength, kmerLength);
- }
+ public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
+ ValidPosCount = getPositionCount(readLength, kmerLength);
+ }
- public static int getPositionCount(int readLength, int kmerLength) {
- return readLength - kmerLength + 1;
- }
+ public static int getPositionCount(int readLength, int kmerLength) {
+ return readLength - kmerLength + 1;
+ }
- public static final int InputReadIDField = AggregateReadIDAggregateFactory.OutputReadIDField;
- public static final int InputPositionListField = AggregateReadIDAggregateFactory.OutputPositionListField;
+ public static final int InputReadIDField = AggregateReadIDAggregateFactory.OutputReadIDField;
+ public static final int InputPositionListField = AggregateReadIDAggregateFactory.OutputPositionListField;
- public static final int BYTE_SIZE = 1;
- public static final int INTEGER_SIZE = 4;
+ public static final int BYTE_SIZE = 1;
+ public static final int INTEGER_SIZE = 4;
- /**
- * (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...} to Aggregate as
- * (ReadID, Storage[posInRead]={PositionList,Kmer})
- *
- */
- @Override
+ /**
+ * (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...} to Aggregate as
+ * (ReadID, Storage[posInRead]={PositionList,Kmer})
+ */
+ @Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
throws HyracksDataException {
return new IAggregatorDescriptor() {
-
- class PositionArray{
- public ArrayBackedValueStorage[] storages;
- public int count;
-
- public PositionArray(ArrayBackedValueStorage[] storages2, int i){
- storages = storages2;
- count = i;
- }
-
- public void reset(){
- for (ArrayBackedValueStorage each : storages) {
+
+ class PositionArray {
+ public ArrayBackedValueStorage[] storages;
+ public int count;
+
+ public PositionArray(ArrayBackedValueStorage[] storages2, int i) {
+ storages = storages2;
+ count = i;
+ }
+
+ public void reset() {
+ for (ArrayBackedValueStorage each : storages) {
each.reset();
}
count = 0;
- }
- }
+ }
+ }
@Override
public AggregateState createAggregateStates() {
@@ -72,7 +70,7 @@
for (int i = 0; i < storages.length; i++) {
storages[i] = new ArrayBackedValueStorage();
}
- return new AggregateState(new PositionArray(storages,0));
+ return new AggregateState(new PositionArray(storages, 0));
}
@Override
@@ -98,6 +96,10 @@
fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
positionArray.count++;
}
+ // make fake fields
+ for (int i = 0; i < ValidPosCount; i++) {
+ tupleBuilder.addFieldEndOffset();
+ }
}
private int writeBytesToStorage(ArrayBackedValueStorage storage, ByteBuffer fieldBuffer, int fieldOffset)
@@ -152,7 +154,7 @@
@Override
public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- PositionArray positionArray = (PositionArray) state.state;
+ PositionArray positionArray = (PositionArray) state.state;
ArrayBackedValueStorage[] storages = positionArray.storages;
if (positionArray.count != storages.length) {
throw new IllegalStateException("Final aggregate position number is invalid");
@@ -160,7 +162,8 @@
DataOutput fieldOutput = tupleBuilder.getDataOutput();
try {
for (int i = 0; i < storages.length; i++) {
- fieldOutput.write(storages[i].getByteArray(), storages[i].getStartOffset(), storages[i].getLength());
+ fieldOutput.write(storages[i].getByteArray(), storages[i].getStartOffset(),
+ storages[i].getLength());
tupleBuilder.addFieldEndOffset();
}
} catch (IOException e) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/MapperKmerToReadIDTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/MapperKmerToReadIDTextWriterFactory.java
deleted file mode 100644
index 32e7b4d..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/MapperKmerToReadIDTextWriterFactory.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package edu.uci.ics.genomix.hyracks.dataflow.io;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-
-public class MapperKmerToReadIDTextWriterFactory implements ITupleWriterFactory {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public MapperKmerToReadIDTextWriterFactory(int kmerSize) {
- // TODO Auto-generated constructor stub
- }
-
- @Override
- public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
- // TODO Auto-generated method stub
- return null;
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/ReadIDAggregationTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/ReadIDAggregationTextWriterFactory.java
deleted file mode 100644
index 28b8484..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/ReadIDAggregationTextWriterFactory.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package edu.uci.ics.genomix.hyracks.dataflow.io;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-
-public class ReadIDAggregationTextWriterFactory implements ITupleWriterFactory {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public ReadIDAggregationTextWriterFactory(int kmerSize) {
- // TODO Auto-generated constructor stub
- }
-
- @Override
- public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
- // TODO Auto-generated method stub
- return null;
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index aa0c05d..8e45884 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -173,7 +173,7 @@
.getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
- hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(kmerSize,
+ hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(readLength, kmerSize,
bGenerateReversedKmer));
} catch (Exception e) {
throw new HyracksDataException(e);
@@ -232,16 +232,16 @@
int[] keyFields = new int[] { 0 }; // the id of grouped key
// (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
- ReadsKeyValueParserFactory.readKmerOutputRec);
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+ MapKmerPositionToReadOperator.readIDOutputRec);
connectOperators(jobSpec, mapKmerToRead, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
jobSpec));
- RecordDescriptor readIDCombineRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+
RecordDescriptor readIDFinalRec = new RecordDescriptor(
new ISerializerDeserializer[MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateReadIDAggregateFactory(),
- new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(),
- new ReadIDNormarlizedComputeFactory(), IntegerPointable.FACTORY, readIDCombineRec, readIDFinalRec);
+ new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(), null,
+ IntegerPointable.FACTORY, AggregateReadIDAggregateFactory.readIDAggregateRec, readIDFinalRec);
AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
connectOperators(jobSpec, sorter, ncNodeNames, readLocalAggregator, ncNodeNames,
new OneToOneConnectorDescriptor(jobSpec));
@@ -327,9 +327,6 @@
logDebug("Group by Read Operator");
lastOperator = generateGroupbyReadJob(jobSpec, lastOperator);
- logDebug("Map ReadInfo to Node");
- lastOperator = generateMapperFromReadToNode(jobSpec, lastOperator);
-
logDebug("Write node to result");
lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);
@@ -352,9 +349,9 @@
bGenerateReversedKmer = conf.getBoolean(GenomixJobConf.REVERSED_KMER, GenomixJobConf.DEFAULT_REVERSED);
String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
- if (type.equalsIgnoreCase("external")) {
+ if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_EXTERNAL)) {
groupbyType = GroupbyType.EXTERNAL;
- } else if (type.equalsIgnoreCase("precluster")) {
+ } else if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_PRECLUSTER)) {
groupbyType = GroupbyType.PRECLUSTER;
} else {
groupbyType = GroupbyType.HYBRIDHASH;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
index 8a11379..d5e52fa 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
@@ -1,14 +1,17 @@
package edu.uci.ics.genomix.hyracks.job;
+import java.io.DataOutput;
import java.util.Map;
-import edu.uci.ics.genomix.hyracks.dataflow.io.ReadIDAggregationTextWriterFactory;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
@@ -46,22 +49,52 @@
logDebug("Group by Read Operator");
lastOperator = generateGroupbyReadJob(jobSpec, lastOperator);
- logDebug("Map ReadInfo to Node");
- lastOperator = generateMapperFromReadToNode(jobSpec, lastOperator);
-
logDebug("Write node to result");
- generateRootByWriteReadIDAggregationResult(jobSpec, lastOperator);
-
+ lastOperator = generateRootByWriteReadIDAggregationResult(jobSpec, lastOperator);
+ jobSpec.addRoot(lastOperator);
return jobSpec;
}
public AbstractOperatorDescriptor generateRootByWriteReadIDAggregationResult(JobSpecification jobSpec,
AbstractOperatorDescriptor readCrossAggregator) throws HyracksException {
- ITupleWriterFactory readWriter = new ReadIDAggregationTextWriterFactory(kmerSize);
- HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec, hadoopJobConfFactory.getConf(), readWriter);
+ HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+ hadoopJobConfFactory.getConf(), new ITupleWriterFactory() {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return new ITupleWriter() {
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ };
+ }
+
+ });
connectOperators(jobSpec, readCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
new OneToOneConnectorDescriptor(jobSpec));
- jobSpec.addRoot(writeKmerOperator);
+
return writeKmerOperator;
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
index 4fa554f..f2cad73 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
@@ -1,14 +1,23 @@
package edu.uci.ics.genomix.hyracks.job;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.Map;
-import edu.uci.ics.genomix.hyracks.dataflow.io.MapperKmerToReadIDTextWriterFactory;
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
@@ -26,13 +35,79 @@
super(job, scheduler, ncMap, numPartitionPerMachine);
// TODO Auto-generated constructor stub
}
-
- public AbstractOperatorDescriptor generateRootByWriteMapperFromKmerToReadID(JobSpecification jobSpec, AbstractOperatorDescriptor mapper) throws HyracksException{
+
+ public AbstractOperatorDescriptor generateRootByWriteMapperFromKmerToReadID(JobSpecification jobSpec,
+ AbstractOperatorDescriptor mapper) throws HyracksException {
// Output Kmer
- ITupleWriterFactory kmerWriter = new MapperKmerToReadIDTextWriterFactory(kmerSize);
- HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec, hadoopJobConfFactory.getConf(), kmerWriter);
- connectOperators(jobSpec, mapper, ncNodeNames, writeKmerOperator, ncNodeNames,
- new OneToOneConnectorDescriptor(jobSpec));
+ HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+ hadoopJobConfFactory.getConf(), new ITupleWriterFactory() {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new ITupleWriter() {
+
+ private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+ private PositionListWritable plist = new PositionListWritable();
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ try {
+ int readID = Marshal.getInt(
+ tuple.getFieldData(MapKmerPositionToReadOperator.OutputReadIDField),
+ tuple.getFieldStart(MapKmerPositionToReadOperator.OutputReadIDField));
+ byte posInRead = tuple
+ .getFieldData(MapKmerPositionToReadOperator.OutputPosInReadField)[tuple
+ .getFieldStart(MapKmerPositionToReadOperator.OutputPosInReadField)];
+ int posCount = PositionListWritable.getCountByDataLength(tuple
+ .getFieldLength(MapKmerPositionToReadOperator.OutputOtherReadIDListField));
+ plist.setNewReference(
+ posCount,
+ tuple.getFieldData(MapKmerPositionToReadOperator.OutputOtherReadIDListField),
+ tuple.getFieldStart(MapKmerPositionToReadOperator.OutputOtherReadIDListField));
+
+ if (kmer.getLength() > tuple
+ .getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
+ throw new IllegalArgumentException("Not enough kmer bytes");
+ }
+ kmer.setNewReference(
+ tuple.getFieldData(MapKmerPositionToReadOperator.OutputKmerField),
+ tuple.getFieldStart(MapKmerPositionToReadOperator.OutputKmerField));
+
+ String out = readID + "\t" + (int) posInRead + "\t";
+ output.write(out.getBytes());
+ output.write(plist.toString().getBytes());
+ output.writeByte('\t');
+ output.write(kmer.toString().getBytes());
+ output.writeByte('\n');
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ };
+ }
+
+ });
+ connectOperators(jobSpec, mapper, ncNodeNames, writeKmerOperator, ncNodeNames, new OneToOneConnectorDescriptor(
+ jobSpec));
jobSpec.addRoot(writeKmerOperator);
return writeKmerOperator;
}
@@ -51,7 +126,7 @@
lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
generateRootByWriteMapperFromKmerToReadID(jobSpec, lastOperator);
-
+
return jobSpec;
}
}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 6cb2491..631fc5b 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -31,6 +31,7 @@
@SuppressWarnings("deprecation")
public class JobRunStepByStepTest {
private static final int KmerSize = 5;
+ private static final int ReadLength = 8;
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
@@ -41,6 +42,8 @@
private static final String EXPECTED_DIR = "src/test/resources/expected/";
private static final String EXPECTED_READER_RESULT = EXPECTED_DIR + "result_after_initial_read";
private static final String EXPECTED_OUPUT_KMER = EXPECTED_DIR + "result_after_kmerAggregate";
+ private static final String EXPECTED_KMER_TO_READID = EXPECTED_DIR + "result_after_kmer2readId";
+ private static final String EXPECTED_GROUPBYREADID = EXPECTED_DIR + "result_after_readIDAggreage";
private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
@@ -56,36 +59,40 @@
@Test
public void TestAll() throws Exception {
//TestReader();
- TestGroupbyKmer();
- // TestMapKmerToRead();
- // TestGroupByReadID();
+ //TestGroupbyKmer();
+ //TestMapKmerToRead();
+ TestGroupByReadID();
// TestEndToEnd();
}
public void TestReader() throws Exception {
+ cleanUpReEntry();
conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
driver.runJob(new GenomixJobConf(conf), Plan.CHECK_KMERREADER, true);
- Assert.assertEquals(true, checkResults(EXPECTED_READER_RESULT, false));
+ Assert.assertEquals(true, checkResults(EXPECTED_READER_RESULT, -1));
}
public void TestGroupbyKmer() throws Exception {
+ cleanUpReEntry();
conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_KMERHASHTABLE, true);
- Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER, true));
+ Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER, 1));
}
public void TestMapKmerToRead() throws Exception {
+ cleanUpReEntry();
conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_MAP_KMER_TO_READ, true);
- Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER,false));
+ Assert.assertEquals(true, checkResults(EXPECTED_KMER_TO_READID, 2));
}
public void TestGroupByReadID() throws Exception {
+ cleanUpReEntry();
conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
- conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
+ conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_GROUPBY_READID, true);
- Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER,false));
+ Assert.assertEquals(true, checkResults(EXPECTED_GROUPBYREADID, -1));
}
public void TestEndToEnd() throws Exception {
@@ -93,11 +100,11 @@
cleanUpReEntry();
conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER,false));
+ Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER, -1));
cleanUpReEntry();
conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER,false));
+ Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER, -1));
}
@Before
@@ -112,6 +119,7 @@
FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
conf.setInt(GenomixJobConf.KMER_LENGTH, KmerSize);
+ conf.setInt(GenomixJobConf.READ_LENGTH, ReadLength);
driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
}
@@ -156,7 +164,7 @@
}
}
- private boolean checkResults(String expectedPath, boolean checkPos) throws Exception {
+ private boolean checkResults(String expectedPath, int poslistField) throws Exception {
File dumped = null;
String format = conf.get(GenomixJobConf.OUTPUT_FORMAT);
if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(format)) {
@@ -202,8 +210,8 @@
dumped = new File(CONVERT_RESULT);
}
- if (checkPos) {
- TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped);
+ if (poslistField > 0) {
+ TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
} else {
TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
index e97df1b..49e21ba 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
@@ -61,7 +61,8 @@
}
}
- public static void compareWithUnSortedPosition(File expectedFile, File actualFile) throws Exception {
+ public static void compareWithUnSortedPosition(File expectedFile, File actualFile, int poslistField)
+ throws Exception {
BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
ArrayList<String> actualLines = new ArrayList<String>();
@@ -77,7 +78,7 @@
if (lineExpected == null) {
throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
}
- if (!containStrings(lineExpected, actualLine)) {
+ if (!containStrings(lineExpected, actualLine, poslistField)) {
throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
+ actualLine);
}
@@ -162,24 +163,32 @@
return true;
}
- private static boolean containStrings(String lineExpected, String actualLine) {
- String keyExp = lineExpected.split("\\\t")[0];
- String keyAct = actualLine.split("\\\t")[0];
- if (!keyAct.equals(keyExp)) {
+ private static boolean containStrings(String lineExpected, String actualLine, int poslistField) {
+ String[] fieldsExp = lineExpected.split("\\\t");
+ String[] fieldsAct = actualLine.split("\\\t");
+ if (fieldsAct.length != fieldsExp.length) {
return false;
}
+ for (int i = 0; i < fieldsAct.length; i++) {
+ if (i == poslistField) {
+ continue;
+ }
+ if (!fieldsAct[i].equals(fieldsExp[i])) {
+ return false;
+ }
+ }
ArrayList<String> posExp = new ArrayList<String>();
ArrayList<String> posAct = new ArrayList<String>();
- String valueExp = lineExpected.split("\\\t")[1];
+ String valueExp = lineExpected.split("\\\t")[poslistField];
String[] valuesExp = valueExp.substring(1, valueExp.length() - 1).split(",");
for (String str : valuesExp) {
posExp.add(str);
}
- String valueAct = actualLine.split("\\\t")[1];
+ String valueAct = actualLine.split("\\\t")[poslistField];
String[] valuesAct = valueAct.substring(1, valueAct.length() - 1).split(",");
for (String str : valuesAct) {
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
index 08f0f95..13190dd 100755
--- a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
@@ -2,4 +2,5 @@
2 AATAGAAG
3 AATAGAAG
4 AATAGAAG
-5 AATAGAAG
\ No newline at end of file
+5 AATAGAAG
+6 AGAAGAAG
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read b/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read
index 728c093..1091d2e 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read
@@ -1,3 +1,4 @@
+AAGAA (6,2)
AATAG (1,0)
AATAG (2,0)
AATAG (3,0)
@@ -8,13 +9,16 @@
AGAAG (3,3)
AGAAG (4,3)
AGAAG (5,3)
+AGAAG (6,0)
+AGAAG (6,3)
ATAGA (1,1)
ATAGA (2,1)
ATAGA (3,1)
ATAGA (4,1)
ATAGA (5,1)
+GAAGA (6,1)
TAGAA (1,2)
TAGAA (2,2)
TAGAA (3,2)
TAGAA (4,2)
-TAGAA (5,2)
\ No newline at end of file
+TAGAA (5,2)
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId
new file mode 100644
index 0000000..0ca9de6
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId
@@ -0,0 +1,24 @@
+1 0 [] AATAG
+1 1 [] ATAGA
+1 2 [] TAGAA
+1 3 [(6,0)] AGAAG
+2 0 [] AATAG
+2 1 [] ATAGA
+2 2 [] TAGAA
+2 3 [(6,0)] AGAAG
+3 0 [] AATAG
+3 1 [] ATAGA
+3 2 [] TAGAA
+3 3 [(6,0)] AGAAG
+4 0 [] AATAG
+4 1 [] ATAGA
+4 2 [] TAGAA
+4 3 [(6,0)] AGAAG
+5 0 [] AATAG
+5 1 [] ATAGA
+5 2 [] TAGAA
+5 3 [(6,0)] AGAAG
+6 0 [(1,3),(2,3),(3,3),(4,3),(5,3),(6,3)] AGAAG
+6 1 [] GAAGA
+6 2 [] AAGAA
+6 3 [(6,0)] AGAAG
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate
index d5624d7..499200a 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate
@@ -1,4 +1,6 @@
+AAGAA [(6,2)]
AATAG [(1,0),(2,0),(3,0),(4,0),(5,0)]
-AGAAG [(1,3),(2,3),(3,3),(4,3),(5,3)]
+AGAAG [(1,3),(2,3),(3,3),(4,3),(5,3),(6,0),(6,3)]
ATAGA [(1,1),(2,1),(3,1),(4,1),(5,1)]
-TAGAA [(1,2),(2,2),(3,2),(4,2),(5,2)]
+GAAGA [(6,1)]
+TAGAA [(1,2),(2,2),(3,2),(4,2),(5,2)]
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage b/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage