finish design of NodeReferenc API
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
new file mode 100644
index 0000000..42638a9
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
@@ -0,0 +1,74 @@
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+
+public class NodeReference {
+ private PositionReference nodeID;
+ private int countOfKmer;
+ private PositionListReference incomingList;
+ private PositionListReference outgoingList;
+
+ public NodeReference() {
+ nodeID = new PositionReference();
+ countOfKmer = 0;
+ incomingList = new PositionListReference();
+ outgoingList = new PositionListReference();
+ }
+
+ public int getCount(){
+ return countOfKmer;
+ }
+
+ public void setCount(int count) {
+ this.countOfKmer = count;
+ }
+
+ public void setNodeID(PositionReference ref) {
+ this.setNodeID(ref.getReadID(), ref.getPosInRead());
+ }
+
+ public void setNodeID(int readID, byte posInRead) {
+ nodeID.set(readID, posInRead);
+ }
+
+ public void setIncomingList(PositionListReference incoming) {
+ incomingList.set(incoming);
+ }
+
+ public void setOutgoingList(PositionListReference outgoing) {
+ outgoingList.set(outgoing);
+ }
+
+ public void reset() {
+ nodeID.set(0, (byte) 0);
+ incomingList.reset();
+ outgoingList.reset();
+ countOfKmer = 0;
+ }
+
+ public PositionListReference getIncomingList() {
+ return incomingList;
+ }
+
+ public PositionListReference getOutgoingList() {
+ return outgoingList;
+ }
+
+ public PositionReference getNodeID() {
+ return nodeID;
+ }
+
+ public void mergeNextWithinOneRead(NodeReference nextNodeEntry) {
+ this.countOfKmer += nextNodeEntry.countOfKmer;
+ for(PositionReference pos : nextNodeEntry.getOutgoingList()){
+ this.outgoingList.append(pos);
+ }
+ }
+
+ public void set(NodeReference node) {
+ this.nodeID.set(node.getNodeID().getReadID(), node.getNodeID().getPosInRead());
+ this.countOfKmer = node.countOfKmer;
+ this.incomingList.set(node.getIncomingList());
+ this.outgoingList.set(node.getOutgoingList());
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
index 0fa1489..895c644 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
@@ -3,24 +3,37 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Iterator;
import org.apache.hadoop.io.Writable;
import edu.uci.ics.hyracks.data.std.api.IValueReference;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-public class PositionListReference implements Writable, IValueReference {
- private byte[] values;
+public class PositionListReference implements Writable, Iterable<PositionReference>, IValueReference {
+ private byte[] storage;
+ private int offset;
private int valueCount;
private static final byte[] EMPTY = {};
private PositionReference posIter = new PositionReference();
public PositionListReference() {
- this.values = EMPTY;
+ this.storage = EMPTY;
this.valueCount = 0;
+ this.offset = 0;
+ }
+
+ public PositionListReference(int count, byte [] data, int offset){
+ setNewReference(count, data, offset);
}
+ public void setNewReference(int count, byte[] data, int offset){
+ this.valueCount = count;
+ this.storage = data;
+ this.offset = offset;
+ }
+
protected void setSize(int size) {
if (size > getCapacity()) {
setCapacity((size * 3 / 2));
@@ -28,16 +41,17 @@
}
protected int getCapacity() {
- return values.length;
+ return storage.length - offset;
}
protected void setCapacity(int new_cap) {
if (new_cap > getCapacity()) {
byte[] new_data = new byte[new_cap];
- if (values.length > 0) {
- System.arraycopy(values, 0, new_data, 0, values.length);
+ if (storage.length - offset > 0) {
+ System.arraycopy(storage, offset, new_data, 0, storage.length-offset);
}
- values = new_data;
+ storage = new_data;
+ offset = 0;
}
}
@@ -45,32 +59,56 @@
public void readFields(DataInput in) throws IOException {
this.valueCount = in.readInt();
setSize(valueCount * PositionReference.LENGTH);
- in.readFully(values, 0, valueCount * PositionReference.LENGTH);
+ in.readFully(storage, offset, valueCount * PositionReference.LENGTH);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(valueCount);
- out.write(values, 0, valueCount * PositionReference.LENGTH);
+ out.write(storage, offset, valueCount * PositionReference.LENGTH);
}
public PositionReference getPosition(int i) {
if (i >= valueCount) {
- throw new ArrayIndexOutOfBoundsException("Not so much positions");
+ throw new ArrayIndexOutOfBoundsException("No such positions");
}
- posIter.setNewSpace(values, i * PositionReference.LENGTH);
+ posIter.setNewReference(storage, offset + i * PositionReference.LENGTH);
return posIter;
}
+
+ @Override
+ public Iterator<PositionReference> iterator() {
+ Iterator<PositionReference> it = new Iterator<PositionReference>() {
+
+ private int currentIndex = 0;
+
+ @Override
+ public boolean hasNext() {
+ return currentIndex < valueCount;
+ }
+
+ @Override
+ public PositionReference next() {
+ return getPosition(currentIndex);
+ }
+
+ @Override
+ public void remove() {
+ // TODO Auto-generated method stub
+ }
+ };
+ return it;
+ }
public void set(PositionListReference list2) {
- set(list2.valueCount, list2.values, 0);
+ set(list2.valueCount, list2.storage, list2.offset);
}
public void set(int valueCount, byte[] newData, int offset) {
this.valueCount = valueCount;
setSize(valueCount * PositionReference.LENGTH);
if (valueCount > 0) {
- System.arraycopy(newData, offset, values, 0, valueCount * PositionReference.LENGTH);
+ System.arraycopy(newData, offset, storage, this.offset, valueCount * PositionReference.LENGTH);
}
}
@@ -80,35 +118,35 @@
public void append(PositionReference pos) {
setSize((1 + valueCount) * PositionReference.LENGTH);
- System.arraycopy(pos.getByteArray(), pos.getStartOffset(), values, valueCount * PositionReference.LENGTH,
+ System.arraycopy(pos.getByteArray(), pos.getStartOffset(), storage, offset + valueCount * PositionReference.LENGTH,
pos.getLength());
valueCount += 1;
}
public void append(int readID, byte posInRead) {
setSize((1 + valueCount) * PositionReference.LENGTH);
- IntegerSerializerDeserializer.putInt(readID, values, valueCount * PositionReference.LENGTH);
- values[valueCount * PositionReference.LENGTH + PositionReference.INTBYTES] = posInRead;
+ IntegerSerializerDeserializer.putInt(readID, storage, offset + valueCount * PositionReference.LENGTH);
+ storage[offset + valueCount * PositionReference.LENGTH + PositionReference.INTBYTES] = posInRead;
valueCount += 1;
}
- public int getPositionCount() {
+ public int getCountOfPosition() {
return valueCount;
}
@Override
public byte[] getByteArray() {
- return values;
+ return storage;
}
@Override
public int getStartOffset() {
- return 0;
+ return offset;
}
@Override
public int getLength() {
- return valueCount * PositionReference.LENGTH;
+ return valueCount * PositionReference.LENGTH ;
}
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
index b3feaa5..100b74d 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
@@ -21,13 +21,12 @@
}
public PositionReference(byte[] storage, int offset) {
- setNewSpace(storage, offset);
+ setNewReference(storage, offset);
}
public PositionReference(int readID, byte posInRead) {
this();
- IntegerSerializerDeserializer.putInt(readID, storage, offset);
- storage[offset + INTBYTES] = posInRead;
+ set(readID, posInRead);
}
public void set(int readID, byte posInRead) {
@@ -35,7 +34,7 @@
storage[offset + INTBYTES] = posInRead;
}
- public void setNewSpace(byte[] storage, int offset) {
+ public void setNewReference(byte[] storage, int offset) {
this.storage = storage;
this.offset = offset;
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
index 924f455..fe72a81 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
@@ -25,9 +25,16 @@
}
private static final long serialVersionUID = 1L;
+ public static final int InputKmerField = 0;
+ public static final int InputPosListField = 1;
+
+ public static final int OutputReadIDField = 0;
+ public static final int OutputPosInReadField = 1;
+ public static final int OutputOtherReadIDListField = 2;
+ public static final int OutputKmerField = 3; // may not needed
/**
- * Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,*Kmer*,{OtherReadID,...})
+ * Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,{OtherReadID,...},*Kmer*)
* OtherReadID appears only when otherReadID.otherPos==0
*/
public class MapKmerPositionToReadNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -40,14 +47,6 @@
private ArrayTupleBuilder builder;
private FrameTupleAppender appender;
- public static final int InputKmerField = 0;
- public static final int InputPosListField = 1;
-
- public static final int OutputReadIDField = 0;
- public static final int OutputPosInReadField = 1;
- public static final int OutputOtherReadIDListField = 2;
- public static final int OutputKmerField = 3; // may not needed
-
private PositionReference positionEntry;
private ArrayBackedValueStorage posListEntry;
private ArrayBackedValueStorage zeroPositionCollection;
@@ -81,7 +80,7 @@
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; i++) {
scanPosition(i, zeroPositionCollection, noneZeroPositionCollection);
- writeTuple(i, zeroPositionCollection, noneZeroPositionCollection, builder);
+ scanAgainToOutputTuple(i, zeroPositionCollection, noneZeroPositionCollection, builder);
}
}
@@ -90,12 +89,10 @@
zeroPositionCollection2.reset();
noneZeroPositionCollection2.reset();
byte[] data = accessor.getBuffer().array();
- //Kmer, {(ReadID,PosInRead),...}
- // to ReadID, PosInRead, Kmer, {OtherReadID}
int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ accessor.getFieldStartOffset(tIndex, InputPosListField);
for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
- positionEntry.setNewSpace(data, offsetPoslist + i);
+ positionEntry.setNewReference(data, offsetPoslist + i);
if (positionEntry.getPosInRead() == 0) {
zeroPositionCollection2.append(positionEntry);
} else {
@@ -105,13 +102,13 @@
}
- private void writeTuple(int tIndex, ArrayBackedValueStorage zeroPositionCollection,
+ private void scanAgainToOutputTuple(int tIndex, ArrayBackedValueStorage zeroPositionCollection,
ArrayBackedValueStorage noneZeroPositionCollection, ArrayTupleBuilder builder2) {
byte[] data = accessor.getBuffer().array();
int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ accessor.getFieldStartOffset(tIndex, InputPosListField);
for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
- positionEntry.setNewSpace(data, offsetPoslist + i);
+ positionEntry.setNewReference(data, offsetPoslist + i);
if (positionEntry.getPosInRead() != 0) {
appendNodeToBuilder(tIndex, positionEntry, zeroPositionCollection, builder2);
} else {
@@ -136,7 +133,7 @@
int offsetKmer = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ accessor.getFieldStartOffset(tIndex, InputKmerField);
builder2.addField(data, offsetKmer, accessor.getFieldLength(tIndex, InputKmerField));
-
+
if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
FrameUtils.flushFrame(writeBuffer, writer);
appender.reset(writeBuffer, true);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
new file mode 100644
index 0000000..4921da1
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -0,0 +1,142 @@
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.NodeReference;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class MapReadToNodeOperator extends AbstractSingleActivityOperatorDescriptor {
+
+ public MapReadToNodeOperator(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = outRecDesc;
+ }
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public static final int InputReadIDField = 0;
+ public static final int InputInfoFieldStart = 1;
+
+ public static final int OutputNodeIDField = 0;
+ public static final int OutputCountOfKmerField = 1;
+ public static final int OutputIncomingField = 2;
+ public static final int OutputOutgoingField = 3;
+ public static final int OutputKmerBytesField = 4;
+
+ /**
+ * (ReadID, Storage[posInRead]={PositionList,Kmer})
+ * to Position, LengthCount, InComingPosList, OutgoingPosList, Kmer
+ */
+ public class MapReadToNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ private final IHyracksTaskContext ctx;
+ private final RecordDescriptor inputRecDesc;
+ private final RecordDescriptor outputRecDesc;
+
+ private FrameTupleAccessor accessor;
+ private ByteBuffer writeBuffer;
+ private ArrayTupleBuilder builder;
+ private FrameTupleAppender appender;
+
+ private NodeReference curNodeEntry;
+ private NodeReference nextNodeEntry;
+
+ public MapReadToNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
+ RecordDescriptor outputRecDesc) {
+ this.ctx = ctx;
+ this.inputRecDesc = inputRecDesc;
+ this.outputRecDesc = outputRecDesc;
+ curNodeEntry = new NodeReference();
+ nextNodeEntry = new NodeReference();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+ writeBuffer = ctx.allocateFrame();
+ builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(writeBuffer, true);
+ writer.open();
+ curNodeEntry.reset();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ generateNodeFromRead(i);
+ }
+ }
+
+ private void generateNodeFromRead(int tIndex) {
+ int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+ resetNode(curNodeEntry, offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart));
+
+ for (int i = InputInfoFieldStart + 1; i < accessor.getFieldCount(); i++) {
+ setNodeRef(nextNodeEntry, offsetPoslist + accessor.getFieldStartOffset(tIndex, i));
+ if (nextNodeEntry.getOutgoingList().getCountOfPosition() == 0) {
+ curNodeEntry.mergeNextWithinOneRead(nextNodeEntry);
+ } else {
+ curNodeEntry.setOutgoingList(nextNodeEntry.getOutgoingList());
+ curNodeEntry.getOutgoingList().append(nextNodeEntry.getNodeID());
+ outputNode(curNodeEntry);
+ nextNodeEntry.getIncomingList().append(curNodeEntry.getNodeID());
+ curNodeEntry.set( nextNodeEntry);
+ }
+ }
+ outputNode(curNodeEntry);
+ }
+
+ private void outputNode(NodeReference node) {
+ // TODO Auto-generated method stub
+
+ }
+
+ private void setNodeRef(NodeReference node, int i) {
+ // TODO Auto-generated method stub
+
+ }
+
+ private void resetNode(NodeReference node, int i) {
+
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+ writer.close();
+ }
+
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return new MapReadToNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
+ recordDescriptors[0]);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
index cb9db16..c7552ca 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
@@ -1,76 +1,136 @@
package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-public class AggregateReadIDAggregateFactory implements IAggregatorDescriptorFactory{
+public class AggregateReadIDAggregateFactory implements IAggregatorDescriptorFactory {
/**
*
*/
private static final long serialVersionUID = 1L;
+ public static final int InputReadIDField = MapKmerPositionToReadOperator.OutputReadIDField;
+ public static final int InputPosInReadField = MapKmerPositionToReadOperator.OutputPosInReadField;
+ public static final int InputPositionListField = MapKmerPositionToReadOperator.OutputOtherReadIDListField;
+ public static final int InputKmerField = MapKmerPositionToReadOperator.OutputKmerField;
+ public static final int OutputReadIDField = 0;
+ public static final int OutputPositionListField = 1;
+
+ public AggregateReadIDAggregateFactory() {
+ }
+
+ /**
+ * (ReadID,PosInRead,{OtherPosition,...},Kmer) to
+ * (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
+ */
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
throws HyracksDataException {
- // TODO Auto-generated method stub
- return new IAggregatorDescriptor(){
+ return new IAggregatorDescriptor() {
+
+ protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+ int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+ return offset;
+ }
+
+ protected byte readByteField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ return ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),
+ getOffSet(accessor, tIndex, fieldId));
+ }
@Override
public AggregateState createAggregateStates() {
- // TODO Auto-generated method stub
- return null;
+ return new AggregateState(new ArrayBackedValueStorage());
}
@Override
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- // TODO Auto-generated method stub
-
+ ArrayBackedValueStorage storage = (ArrayBackedValueStorage) state.state;
+ storage.reset();
+ DataOutput out = storage.getDataOutput();
+ byte posInRead = readByteField(accessor, tIndex, InputPositionListField);
+
+ try {
+ out.writeByte(posInRead);
+ writeBytesToStorage(out, accessor, tIndex, InputPositionListField);
+ writeBytesToStorage(out, accessor, tIndex, InputKmerField);
+ } catch (IOException e) {
+ throw new HyracksDataException("Failed to write into temporary storage");
+ }
+
+ }
+
+ private void writeBytesToStorage(DataOutput out, IFrameTupleAccessor accessor, int tIndex, int idField)
+ throws IOException {
+ int len = accessor.getFieldLength(tIndex, idField);
+ out.writeInt(len);
+ out.write(accessor.getBuffer().array(), getOffSet(accessor, tIndex, idField), len);
}
@Override
public void reset() {
// TODO Auto-generated method stub
-
+
}
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
- // TODO Auto-generated method stub
-
+ ArrayBackedValueStorage storage = (ArrayBackedValueStorage) state.state;
+ DataOutput out = storage.getDataOutput();
+ byte posInRead = readByteField(accessor, tIndex, InputPositionListField);
+
+ try {
+ out.writeByte(posInRead);
+ writeBytesToStorage(out, accessor, tIndex, InputPositionListField);
+ writeBytesToStorage(out, accessor, tIndex, InputKmerField);
+ } catch (IOException e) {
+ throw new HyracksDataException("Failed to write into temporary storage");
+ }
}
@Override
public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- // TODO Auto-generated method stub
-
+ throw new IllegalStateException("partial result method should not be called");
}
@Override
public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- // TODO Auto-generated method stub
-
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ try {
+ fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
}
@Override
public void close() {
// TODO Auto-generated method stub
-
+
}
-
+
};
}
-
-
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
index ec2b47c..b98db56 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -38,7 +38,7 @@
throws HyracksDataException {
return new IAggregatorDescriptor (){
- private PositionReference position = new PositionReference();
+ private PositionReference positionReEntry = new PositionReference();
@Override
public AggregateState createAggregateStates() {
@@ -52,8 +52,8 @@
inputVal.reset();
int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
for( int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex, 1); offset += PositionReference.LENGTH){
- position.setNewSpace(accessor.getBuffer().array(), leadOffset + offset);
- inputVal.append(position);
+ positionReEntry.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
+ inputVal.append(positionReEntry);
}
}
@@ -69,8 +69,8 @@
ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage)state.state;
int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
for( int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex, 1); offset += PositionReference.LENGTH){
- position.setNewSpace(accessor.getBuffer().array(), leadOffset + offset);
- inputVal.append(position);
+ positionReEntry.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
+ inputVal.append(positionReEntry);
}
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
index a20b60d..2877ee6 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -1,75 +1,173 @@
package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.tuple.Pair;
+
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-public class MergeReadIDAggregateFactory implements IAggregatorDescriptorFactory{
+public class MergeReadIDAggregateFactory implements IAggregatorDescriptorFactory {
/**
*
*/
private static final long serialVersionUID = 1L;
+ private final int ValidPosCount;
+
+ public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
+ ValidPosCount = getPositionCount(readLength, kmerLength);
+ }
+
+ public static int getPositionCount(int readLength, int kmerLength){
+ return readLength - kmerLength + 1;
+ }
+ public static final int InputReadIDField = AggregateReadIDAggregateFactory.OutputReadIDField;
+ public static final int InputPositionListField = AggregateReadIDAggregateFactory.OutputPositionListField;
+
+ public static final int BYTE_SIZE = 1;
+ public static final int INTEGER_SIZE = 4;
+
+ /**
+ * (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...} to
+ * Aggregate as
+ * (ReadID, Storage[posInRead]={PositionList,Kmer})
+ *
+ */
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
throws HyracksDataException {
- // TODO Auto-generated method stub
- return new IAggregatorDescriptor(){
+ return new IAggregatorDescriptor() {
@Override
public AggregateState createAggregateStates() {
- // TODO Auto-generated method stub
- return null;
+ ArrayBackedValueStorage[] storages = new ArrayBackedValueStorage[ValidPosCount];
+ for (int i = 0; i < storages.length; i++) {
+ storages[i] = new ArrayBackedValueStorage();
+ }
+ return new AggregateState(Pair.of(storages, 0));
}
@Override
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- // TODO Auto-generated method stub
-
+ @SuppressWarnings("unchecked")
+ Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
+ ArrayBackedValueStorage[] storages = pair.getLeft();
+ for (ArrayBackedValueStorage each : storages) {
+ each.reset();
+ }
+ int count = 0;
+
+ int fieldOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, InputPositionListField);
+ ByteBuffer fieldBuffer = accessor.getBuffer();
+
+ while (fieldOffset < accessor.getFieldEndOffset(tIndex, InputPositionListField)) {
+ byte posInRead = fieldBuffer.get(fieldOffset);
+ if (storages[posInRead].getLength() > 0) {
+ throw new IllegalArgumentException("Reentering into an exist storage");
+ }
+ fieldOffset += BYTE_SIZE;
+ // read poslist
+ fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+ // read Kmer
+ fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+ count++;
+ }
+ pair.setValue(count);
+ }
+
+ private int writeBytesToStorage(ArrayBackedValueStorage storage, ByteBuffer fieldBuffer, int fieldOffset)
+ throws HyracksDataException {
+ int lengthPosList = fieldBuffer.getInt(fieldOffset);
+ try {
+ storage.getDataOutput().writeInt(lengthPosList);
+ fieldOffset += INTEGER_SIZE;
+ storage.getDataOutput().write(fieldBuffer.array(), fieldOffset, lengthPosList);
+ } catch (IOException e) {
+ throw new HyracksDataException("Failed to write into temporary storage");
+ }
+ return lengthPosList + INTEGER_SIZE;
}
@Override
public void reset() {
// TODO Auto-generated method stub
-
+
}
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
- // TODO Auto-generated method stub
-
+ @SuppressWarnings("unchecked")
+ Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
+ ArrayBackedValueStorage[] storages = pair.getLeft();
+ int count = pair.getRight();
+
+ int fieldOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, InputPositionListField);
+ ByteBuffer fieldBuffer = accessor.getBuffer();
+
+ while (fieldOffset < accessor.getFieldEndOffset(tIndex, InputPositionListField)) {
+ byte posInRead = fieldBuffer.get(fieldOffset);
+ if (storages[posInRead].getLength() > 0) {
+ throw new IllegalArgumentException("Reentering into an exist storage");
+ }
+ fieldOffset += BYTE_SIZE;
+ // read poslist
+ fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+ // read Kmer
+ fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+ count++;
+ }
+ pair.setValue(count);
}
@Override
public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- // TODO Auto-generated method stub
-
+ throw new IllegalStateException("partial result method should not be called");
}
@Override
public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- // TODO Auto-generated method stub
-
+ @SuppressWarnings("unchecked")
+ Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
+ ArrayBackedValueStorage[] storages = pair.getLeft();
+ int count = pair.getRight();
+ if (count != storages.length) {
+ throw new IllegalStateException("Final aggregate position number is invalid");
+ }
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ for (int i = 0; i < storages.length; i++) {
+ fieldOutput.write(storages[i].getByteArray(), storages[i].getStartOffset(), storages[i].getLength());
+ tupleBuilder.addFieldEndOffset();
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
}
@Override
public void close() {
// TODO Auto-generated method stub
-
+
}
-
+
};
}
-
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
similarity index 97%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerSequenceWriterFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
index 1bd427c..455b5c5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.hyracks.dataflow;
+package edu.uci.ics.genomix.hyracks.dataflow.io;
import java.io.DataOutput;
import java.io.IOException;
@@ -43,7 +43,7 @@
public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
this.confFactory = new ConfFactory(conf);
- this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+ this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
}
public class TupleWriter implements ITupleWriter {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
similarity index 97%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerTextWriterFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
index 54e84f7..b8f99ef 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.hyracks.dataflow;
+package edu.uci.ics.genomix.hyracks.dataflow.io;
import java.io.DataOutput;
import java.io.IOException;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
similarity index 93%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeSequenceWriterFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
index 44b8f55..ee17228 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.hyracks.dataflow;
+package edu.uci.ics.genomix.hyracks.dataflow.io;
import org.apache.hadoop.mapred.JobConf;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
similarity index 91%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeTextWriterFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
index 21098a9..7f64f28 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.hyracks.dataflow;
+package edu.uci.ics.genomix.hyracks.dataflow.io;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
index 66f0d1e..4b0c984 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
@@ -25,7 +25,9 @@
public static final String JOB_NAME = "genomix";
/** Kmers length */
- public static final String KMER_LENGTH = "genomix.kmer";
+ public static final String KMER_LENGTH = "genomix.kmerlen";
+ /** Read length */
+ public static final String READ_LENGTH = "genomix.readlen";
/** Frame Size */
public static final String FRAME_SIZE = "genomix.framesize";
/** Frame Limit, hyracks need */
@@ -46,7 +48,8 @@
public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
- public static final int DEFAULT_KMER = 21;
+ public static final int DEFAULT_KMERLEN = 21;
+ public static final int DEFAULT_READLEN = 124;
public static final int DEFAULT_FRAME_SIZE = 32768;
public static final int DEFAULT_FRAME_LIMIT = 4096;
public static final int DEFAULT_TABLE_SIZE = 10485767;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index 4357d7a..ea5b58f 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -28,16 +28,17 @@
import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.genomix.hyracks.dataflow.KMerSequenceWriterFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.KMerTextWriterFactory;
import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
-import edu.uci.ics.genomix.hyracks.dataflow.NodeSequenceWriterFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.NodeTextWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateKmerAggregateFactory;
import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateReadIDAggregateFactory;
import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeKmerAggregateFactory;
import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeReadIDAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerTextWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeTextWriterFactory;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -86,6 +87,7 @@
private Scheduler scheduler;
private String[] ncNodeNames;
+ private int readLength;
private int kmerSize;
private int frameLimits;
private int frameSize;
@@ -94,15 +96,6 @@
private OutputFormat outputFormat;
private boolean bGenerateReversedKmer;
- /** works for hybrid hashing */
- private long inputSizeInRawRecords;
- private long inputSizeInUniqueKeys;
- private int recordSizeInBytes;
- private int hashfuncStartLevel;
- private ExternalGroupOperatorDescriptor readLocalAggregator;
- private MToNPartitioningConnectorDescriptor readConnPartition;
- private ExternalGroupOperatorDescriptor readCrossAggregator;
-
private void logDebug(String status) {
LOG.debug(status + " nc nodes:" + ncNodeNames.length);
}
@@ -135,27 +128,28 @@
private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec,
IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
- IPointableFactory pointable, RecordDescriptor outRed) throws HyracksDataException {
+ IPointableFactory pointable, RecordDescriptor combineRed, RecordDescriptor finalRec)
+ throws HyracksDataException {
int[] keyFields = new int[] { 0 }; // the id of grouped key
Object[] obj = new Object[3];
switch (groupbyType) {
case EXTERNAL:
obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
- outRed);
+ combineRed);
obj[1] = new MToNPartitioningConnectorDescriptor(jobSpec, partition);
obj[2] = newExternalGroupby(jobSpec, keyFields, merger, merger, partition, normalizer, pointable,
- outRed);
+ finalRec);
break;
case PRECLUSTER:
default:
obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
- outRed);
+ combineRed);
obj[1] = new MToNPartitioningMergingConnectorDescriptor(jobSpec, partition, keyFields,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) });
obj[2] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, merger,
- outRed);
+ finalRec);
break;
}
return obj;
@@ -197,7 +191,7 @@
Object[] objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateKmerAggregateFactory(),
new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
- new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec);
+ new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
logDebug("LocalKmerGroupby Operator");
connectOperators(jobSpec, readOperator, ncNodeNames, kmerLocalAggregator, ncNodeNames,
@@ -218,18 +212,28 @@
logDebug("Group by Read Operator");
// (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
- RecordDescriptor nodeCombineRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+ RecordDescriptor readIDCombineRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+ RecordDescriptor readIDFinalRec = new RecordDescriptor(
+ new ISerializerDeserializer[MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateReadIDAggregateFactory(),
- new MergeReadIDAggregateFactory(), new ReadIDPartitionComputerFactory(),
- new ReadIDNormarlizedComputeFactory(), IntegerPointable.FACTORY, nodeCombineRec);
+ new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(),
+ new ReadIDNormarlizedComputeFactory(), IntegerPointable.FACTORY, readIDCombineRec, readIDFinalRec);
AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
connectOperators(jobSpec, mapKmerToRead, ncNodeNames, readLocalAggregator, ncNodeNames,
new OneToOneConnectorDescriptor(jobSpec));
+ logDebug("Group by ReadID merger");
IConnectorDescriptor readconn = (IConnectorDescriptor) objs[1];
AbstractOperatorDescriptor readCrossAggregator = (AbstractOperatorDescriptor) objs[2];
connectOperators(jobSpec, readLocalAggregator, ncNodeNames, readCrossAggregator, ncNodeNames, readconn);
+ logDebug("Map ReadInfo to Node");
+ //Map (ReadID, [(Poslist,Kmer) ... ]) to (Node, IncomingList, OutgoingList, Kmer)
+ RecordDescriptor nodeRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null, null });
+ AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec, nodeRec);
+ connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+
// Output Kmer
ITupleWriterFactory kmerWriter = null;
ITupleWriterFactory nodeWriter = null;
@@ -252,10 +256,10 @@
// Output Node
HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, nodeWriter);
- connectOperators(jobSpec, readCrossAggregator, ncNodeNames, writeNodeOperator, ncNodeNames,
+ connectOperators(jobSpec, mapEachReadToNode, ncNodeNames, writeNodeOperator, ncNodeNames,
new OneToOneConnectorDescriptor(jobSpec));
jobSpec.addRoot(writeNodeOperator);
-
+
if (groupbyType == GroupbyType.PRECLUSTER) {
jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
}
@@ -264,8 +268,8 @@
@Override
protected void initJobConfiguration() {
-
- kmerSize = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+ readLength = conf.getInt(GenomixJob.READ_LENGTH, GenomixJob.DEFAULT_READLEN);
+ kmerSize = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
if (kmerSize % 2 == 0) {
kmerSize--;
conf.setInt(GenomixJob.KMER_LENGTH, kmerSize);
@@ -273,17 +277,6 @@
frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
frameSize = conf.getInt(GenomixJob.FRAME_SIZE, GenomixJob.DEFAULT_FRAME_SIZE);
- inputSizeInRawRecords = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
- inputSizeInUniqueKeys = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
- recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
- hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
- /** here read the different recordSize why ? */
- recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER, GenomixJob.DEFAULT_REVERSED);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
index 540bdaf..240d2ee 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
@@ -65,7 +65,7 @@
@Override
protected void initJobConfiguration() {
// TODO Auto-generated method stub
- kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+ kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
hadoopjob = new JobConf(conf);
hadoopjob.setInputFormat(SequenceFileInputFormat.class);
}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
index 4a279d6..ba9aea2 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
@@ -212,7 +212,7 @@
// KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
- GenomixJob.DEFAULT_KMER));
+ GenomixJob.DEFAULT_KMERLEN));
KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
while (reader.next(key, value)) {