reformat JobGen to avoid NonSerializable exception
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
index 5cf44ba..0b0dcf2 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
@@ -4,7 +4,12 @@
public class NodeReference extends NodeWritable {
- public NodeReference(int kmerSize) {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public NodeReference(int kmerSize) {
super(kmerSize);
// TODO Auto-generated constructor stub
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
index 76b10d0..98b6ca1 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
@@ -5,4 +5,9 @@
public class PositionListReference extends PositionListWritable implements IValueReference {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
index 026758a..1e22529 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
@@ -5,4 +5,9 @@
public class PositionReference extends PositionWritable implements IValueReference {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
index fe72a81..09c0d2e 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
@@ -5,6 +5,7 @@
import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -17,157 +18,186 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-public class MapKmerPositionToReadOperator extends AbstractSingleActivityOperatorDescriptor {
+public class MapKmerPositionToReadOperator extends
+ AbstractSingleActivityOperatorDescriptor {
- public MapKmerPositionToReadOperator(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc) {
- super(spec, 1, 1);
- recordDescriptors[0] = recDesc;
- }
+ public MapKmerPositionToReadOperator(IOperatorDescriptorRegistry spec,
+ RecordDescriptor recDesc) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = recDesc;
+ }
- private static final long serialVersionUID = 1L;
- public static final int InputKmerField = 0;
- public static final int InputPosListField = 1;
+ private static final long serialVersionUID = 1L;
+ public static final int InputKmerField = 0;
+ public static final int InputPosListField = 1;
- public static final int OutputReadIDField = 0;
- public static final int OutputPosInReadField = 1;
- public static final int OutputOtherReadIDListField = 2;
- public static final int OutputKmerField = 3; // may not needed
+ public static final int OutputReadIDField = 0;
+ public static final int OutputPosInReadField = 1;
+ public static final int OutputOtherReadIDListField = 2;
+ public static final int OutputKmerField = 3; // may not needed
- /**
- * Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,{OtherReadID,...},*Kmer*)
- * OtherReadID appears only when otherReadID.otherPos==0
- */
- public class MapKmerPositionToReadNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
- private final IHyracksTaskContext ctx;
- private final RecordDescriptor inputRecDesc;
- private final RecordDescriptor outputRecDesc;
+ public static final RecordDescriptor readIDOutputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] { null, null, null, null });
- private FrameTupleAccessor accessor;
- private ByteBuffer writeBuffer;
- private ArrayTupleBuilder builder;
- private FrameTupleAppender appender;
+ /**
+ * Map (Kmer, {(ReadID,PosInRead),...}) into
+ * (ReadID,PosInRead,{OtherReadID,...},*Kmer*) OtherReadID appears only when
+ * otherReadID.otherPos==0
+ */
+ public class MapKmerPositionToReadNodePushable extends
+ AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ private final IHyracksTaskContext ctx;
+ private final RecordDescriptor inputRecDesc;
+ private final RecordDescriptor outputRecDesc;
- private PositionReference positionEntry;
- private ArrayBackedValueStorage posListEntry;
- private ArrayBackedValueStorage zeroPositionCollection;
- private ArrayBackedValueStorage noneZeroPositionCollection;
+ private FrameTupleAccessor accessor;
+ private ByteBuffer writeBuffer;
+ private ArrayTupleBuilder builder;
+ private FrameTupleAppender appender;
- public MapKmerPositionToReadNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
- RecordDescriptor outputRecDesc) {
- this.ctx = ctx;
- this.inputRecDesc = inputRecDesc;
- this.outputRecDesc = outputRecDesc;
- this.positionEntry = new PositionReference();
- this.posListEntry = new ArrayBackedValueStorage();
- this.zeroPositionCollection = new ArrayBackedValueStorage();
- this.noneZeroPositionCollection = new ArrayBackedValueStorage();
- }
+ private PositionReference positionEntry;
+ private ArrayBackedValueStorage posListEntry;
+ private ArrayBackedValueStorage zeroPositionCollection;
+ private ArrayBackedValueStorage noneZeroPositionCollection;
- @Override
- public void open() throws HyracksDataException {
- accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
- writeBuffer = ctx.allocateFrame();
- builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
- appender = new FrameTupleAppender(ctx.getFrameSize());
- appender.reset(writeBuffer, true);
- writer.open();
- posListEntry.reset();
- }
+ public MapKmerPositionToReadNodePushable(IHyracksTaskContext ctx,
+ RecordDescriptor inputRecDesc, RecordDescriptor outputRecDesc) {
+ this.ctx = ctx;
+ this.inputRecDesc = inputRecDesc;
+ this.outputRecDesc = outputRecDesc;
+ this.positionEntry = new PositionReference();
+ this.posListEntry = new ArrayBackedValueStorage();
+ this.zeroPositionCollection = new ArrayBackedValueStorage();
+ this.noneZeroPositionCollection = new ArrayBackedValueStorage();
+ }
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- scanPosition(i, zeroPositionCollection, noneZeroPositionCollection);
- scanAgainToOutputTuple(i, zeroPositionCollection, noneZeroPositionCollection, builder);
- }
- }
+ @Override
+ public void open() throws HyracksDataException {
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+ writeBuffer = ctx.allocateFrame();
+ builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(writeBuffer, true);
+ writer.open();
+ posListEntry.reset();
+ }
- private void scanPosition(int tIndex, ArrayBackedValueStorage zeroPositionCollection2,
- ArrayBackedValueStorage noneZeroPositionCollection2) {
- zeroPositionCollection2.reset();
- noneZeroPositionCollection2.reset();
- byte[] data = accessor.getBuffer().array();
- int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, InputPosListField);
- for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
- positionEntry.setNewReference(data, offsetPoslist + i);
- if (positionEntry.getPosInRead() == 0) {
- zeroPositionCollection2.append(positionEntry);
- } else {
- noneZeroPositionCollection2.append(positionEntry);
- }
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ scanPosition(i, zeroPositionCollection,
+ noneZeroPositionCollection);
+ scanAgainToOutputTuple(i, zeroPositionCollection,
+ noneZeroPositionCollection, builder);
+ }
+ }
- }
+ private void scanPosition(int tIndex,
+ ArrayBackedValueStorage zeroPositionCollection2,
+ ArrayBackedValueStorage noneZeroPositionCollection2) {
+ zeroPositionCollection2.reset();
+ noneZeroPositionCollection2.reset();
+ byte[] data = accessor.getBuffer().array();
+ int offsetPoslist = accessor.getTupleStartOffset(tIndex)
+ + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, InputPosListField);
+ for (int i = 0; i < accessor.getFieldLength(tIndex,
+ InputPosListField); i += PositionReference.LENGTH) {
+ positionEntry.setNewReference(data, offsetPoslist + i);
+ if (positionEntry.getPosInRead() == 0) {
+ zeroPositionCollection2.append(positionEntry);
+ } else {
+ noneZeroPositionCollection2.append(positionEntry);
+ }
+ }
- private void scanAgainToOutputTuple(int tIndex, ArrayBackedValueStorage zeroPositionCollection,
- ArrayBackedValueStorage noneZeroPositionCollection, ArrayTupleBuilder builder2) {
- byte[] data = accessor.getBuffer().array();
- int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, InputPosListField);
- for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
- positionEntry.setNewReference(data, offsetPoslist + i);
- if (positionEntry.getPosInRead() != 0) {
- appendNodeToBuilder(tIndex, positionEntry, zeroPositionCollection, builder2);
- } else {
- appendNodeToBuilder(tIndex, positionEntry, noneZeroPositionCollection, builder2);
- }
- }
- }
+ }
- private void appendNodeToBuilder(int tIndex, PositionReference pos, ArrayBackedValueStorage posList2,
- ArrayTupleBuilder builder2) {
- try {
- builder2.addField(pos.getByteArray(), 0, PositionReference.INTBYTES);
- builder2.addField(pos.getByteArray(), PositionReference.INTBYTES, 1);
- //? ask Yingyi, if support empty bytes[]
- if (posList2 == null) {
- builder2.addFieldEndOffset();
- } else {
- builder2.addField(posList2.getByteArray(), posList2.getStartOffset(), posList2.getLength());
- }
- // set kmer, may not useful
- byte[] data = accessor.getBuffer().array();
- int offsetKmer = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, InputKmerField);
- builder2.addField(data, offsetKmer, accessor.getFieldLength(tIndex, InputKmerField));
+ private void scanAgainToOutputTuple(int tIndex,
+ ArrayBackedValueStorage zeroPositionCollection,
+ ArrayBackedValueStorage noneZeroPositionCollection,
+ ArrayTupleBuilder builder2) {
+ byte[] data = accessor.getBuffer().array();
+ int offsetPoslist = accessor.getTupleStartOffset(tIndex)
+ + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, InputPosListField);
+ for (int i = 0; i < accessor.getFieldLength(tIndex,
+ InputPosListField); i += PositionReference.LENGTH) {
+ positionEntry.setNewReference(data, offsetPoslist + i);
+ if (positionEntry.getPosInRead() != 0) {
+ appendNodeToBuilder(tIndex, positionEntry,
+ zeroPositionCollection, builder2);
+ } else {
+ appendNodeToBuilder(tIndex, positionEntry,
+ noneZeroPositionCollection, builder2);
+ }
+ }
+ }
- if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
- FrameUtils.flushFrame(writeBuffer, writer);
- appender.reset(writeBuffer, true);
- if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
- throw new IllegalStateException();
- }
- }
- builder2.reset();
- } catch (HyracksDataException e) {
- throw new IllegalStateException(
- "Failed to Add a field to the tuple by copying the data bytes from a byte array.");
- }
- }
+ private void appendNodeToBuilder(int tIndex, PositionReference pos,
+ ArrayBackedValueStorage posList2, ArrayTupleBuilder builder2) {
+ try {
+ builder2.addField(pos.getByteArray(), 0,
+ PositionReference.INTBYTES);
+ builder2.addField(pos.getByteArray(),
+ PositionReference.INTBYTES, 1);
+ // ? ask Yingyi, if support empty bytes[]
+ if (posList2 == null) {
+ builder2.addFieldEndOffset();
+ } else {
+ builder2.addField(posList2.getByteArray(),
+ posList2.getStartOffset(), posList2.getLength());
+ }
+ // set kmer, may not useful
+ byte[] data = accessor.getBuffer().array();
+ int offsetKmer = accessor.getTupleStartOffset(tIndex)
+ + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, InputKmerField);
+ builder2.addField(data, offsetKmer,
+ accessor.getFieldLength(tIndex, InputKmerField));
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
+ if (!appender.append(builder2.getFieldEndOffsets(),
+ builder2.getByteArray(), 0, builder2.getSize())) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ appender.reset(writeBuffer, true);
+ if (!appender.append(builder2.getFieldEndOffsets(),
+ builder2.getByteArray(), 0, builder2.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ builder2.reset();
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(
+ "Failed to Add a field to the tuple by copying the data bytes from a byte array.");
+ }
+ }
- @Override
- public void close() throws HyracksDataException {
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(writeBuffer, writer);
- }
- writer.close();
- }
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
- }
+ @Override
+ public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+ writer.close();
+ }
- @Override
- public AbstractOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new MapKmerPositionToReadNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(),
- 0), recordDescriptors[0]);
- }
+ }
+
+ @Override
+ public AbstractOperatorNodePushable createPushRuntime(
+ IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+ return new MapKmerPositionToReadNodePushable(
+ ctx,
+ recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
+ recordDescriptors[0]);
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
index 7dc4947..bfc5ae5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -3,7 +3,6 @@
import java.io.DataOutput;
import java.io.IOException;
-import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -11,7 +10,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
@@ -37,16 +35,6 @@
return offset;
}
- protected byte readByteField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
- return ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),
- getOffSet(accessor, tIndex, fieldId));
- }
-
- protected int readIntField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
- return IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
- getOffSet(accessor, tIndex, fieldId));
- }
-
@Override
public void reset() {
}
@@ -67,7 +55,7 @@
AggregateState state) throws HyracksDataException {
ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
inputVal.reset();
- position.set(readIntField(accessor, tIndex, 1), readByteField(accessor, tIndex, 2));
+ position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor,tIndex,1));
inputVal.append(position);
}
@@ -75,7 +63,7 @@
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
- position.set(readIntField(accessor, tIndex, 1), readByteField(accessor, tIndex, 2));
+ position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor,tIndex,1));
inputVal.append(position);
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
index 2877ee6..d3971fb 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -4,8 +4,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.commons.lang3.tuple.Pair;
-
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -16,39 +14,57 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-public class MergeReadIDAggregateFactory implements IAggregatorDescriptorFactory {
+public class MergeReadIDAggregateFactory implements
+ IAggregatorDescriptorFactory {
- /**
+ /**
*
*/
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- private final int ValidPosCount;
+ private final int ValidPosCount;
- public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
- ValidPosCount = getPositionCount(readLength, kmerLength);
- }
+ public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
+ ValidPosCount = getPositionCount(readLength, kmerLength);
+ }
- public static int getPositionCount(int readLength, int kmerLength){
- return readLength - kmerLength + 1;
- }
- public static final int InputReadIDField = AggregateReadIDAggregateFactory.OutputReadIDField;
- public static final int InputPositionListField = AggregateReadIDAggregateFactory.OutputPositionListField;
+ public static int getPositionCount(int readLength, int kmerLength) {
+ return readLength - kmerLength + 1;
+ }
- public static final int BYTE_SIZE = 1;
- public static final int INTEGER_SIZE = 4;
+ public static final int InputReadIDField = AggregateReadIDAggregateFactory.OutputReadIDField;
+ public static final int InputPositionListField = AggregateReadIDAggregateFactory.OutputPositionListField;
- /**
- * (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...} to
- * Aggregate as
- * (ReadID, Storage[posInRead]={PositionList,Kmer})
- *
- */
- @Override
+ public static final int BYTE_SIZE = 1;
+ public static final int INTEGER_SIZE = 4;
+
+ /**
+ * (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...} to Aggregate as
+ * (ReadID, Storage[posInRead]={PositionList,Kmer})
+ *
+ */
+ @Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
throws HyracksDataException {
return new IAggregatorDescriptor() {
+
+ class PositionArray{
+ public ArrayBackedValueStorage[] storages;
+ public int count;
+
+ public PositionArray(ArrayBackedValueStorage[] storages2, int i){
+ storages = storages2;
+ count = i;
+ }
+
+ public void reset(){
+ for (ArrayBackedValueStorage each : storages) {
+ each.reset();
+ }
+ count = 0;
+ }
+ }
@Override
public AggregateState createAggregateStates() {
@@ -56,19 +72,15 @@
for (int i = 0; i < storages.length; i++) {
storages[i] = new ArrayBackedValueStorage();
}
- return new AggregateState(Pair.of(storages, 0));
+ return new AggregateState(new PositionArray(storages,0));
}
@Override
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- @SuppressWarnings("unchecked")
- Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
- ArrayBackedValueStorage[] storages = pair.getLeft();
- for (ArrayBackedValueStorage each : storages) {
- each.reset();
- }
- int count = 0;
+ PositionArray positionArray = (PositionArray) state.state;
+ positionArray.reset();
+ ArrayBackedValueStorage[] storages = positionArray.storages;
int fieldOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ accessor.getFieldStartOffset(tIndex, InputPositionListField);
@@ -84,9 +96,8 @@
fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
// read Kmer
fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
- count++;
+ positionArray.count++;
}
- pair.setValue(count);
}
private int writeBytesToStorage(ArrayBackedValueStorage storage, ByteBuffer fieldBuffer, int fieldOffset)
@@ -111,10 +122,8 @@
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
- @SuppressWarnings("unchecked")
- Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
- ArrayBackedValueStorage[] storages = pair.getLeft();
- int count = pair.getRight();
+ PositionArray positionArray = (PositionArray) state.state;
+ ArrayBackedValueStorage[] storages = positionArray.storages;
int fieldOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ accessor.getFieldStartOffset(tIndex, InputPositionListField);
@@ -130,9 +139,8 @@
fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
// read Kmer
fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
- count++;
+ positionArray.count++;
}
- pair.setValue(count);
}
@Override
@@ -144,11 +152,9 @@
@Override
public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- @SuppressWarnings("unchecked")
- Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
- ArrayBackedValueStorage[] storages = pair.getLeft();
- int count = pair.getRight();
- if (count != storages.length) {
+ PositionArray positionArray = (PositionArray) state.state;
+ ArrayBackedValueStorage[] storages = positionArray.storages;
+ if (positionArray.count != storages.length) {
throw new IllegalStateException("Final aggregate position number is invalid");
}
DataOutput fieldOutput = tupleBuilder.getDataOutput();
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
index 0508ba7..981b2de 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
@@ -24,7 +24,7 @@
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.mapred.JobConf;
-import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -47,7 +47,7 @@
public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
this.confFactory = new ConfFactory(conf);
- this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
+ this.kmerlength = conf.getInt(GenomixJobConf.KMER_LENGTH, GenomixJobConf.DEFAULT_KMERLEN);
}
public class TupleWriter implements ITupleWriter {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
index 7de3b54..684d01e 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
@@ -11,7 +11,7 @@
import edu.uci.ics.genomix.data.Marshal;
import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
-import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -40,7 +40,7 @@
public NodeSequenceWriterFactory(JobConf conf) throws HyracksDataException {
this.confFactory = new ConfFactory(conf);
- this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
+ this.kmerlength = conf.getInt(GenomixJobConf.KMER_LENGTH, GenomixJobConf.DEFAULT_KMERLEN);
}
public class TupleWriter implements ITupleWriter {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
index 68a83ac..5014a8a 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
@@ -24,7 +24,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.GenericOptionsParser;
-import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
import edu.uci.ics.genomix.hyracks.job.JobGen;
import edu.uci.ics.genomix.hyracks.job.JobGenBrujinGraph;
import edu.uci.ics.genomix.hyracks.job.JobGenCheckReader;
@@ -70,11 +70,11 @@
this.numPartitionPerMachine = numPartitionPerMachine;
}
- public void runJob(GenomixJob job) throws HyracksException {
+ public void runJob(GenomixJobConf job) throws HyracksException {
runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
}
- public void runJob(GenomixJob job, Plan planChoice, boolean profiling) throws HyracksException {
+ public void runJob(GenomixJobConf job, Plan planChoice, boolean profiling) throws HyracksException {
/** add hadoop configurations */
URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
job.addResource(hadoopCore);
@@ -99,12 +99,16 @@
break;
case OUTPUT_KMERHASHTABLE:
jobGen = new JobGenCreateKmerInfo(job, scheduler, ncMap, numPartitionPerMachine);
+ break;
case OUTPUT_MAP_KMER_TO_READ:
jobGen = new JobGenMapKmerToRead(job,scheduler, ncMap, numPartitionPerMachine);
+ break;
case OUTPUT_GROUPBY_READID:
jobGen = new JobGenGroupbyReadID(job, scheduler, ncMap, numPartitionPerMachine);
+ break;
case CHECK_KMERREADER:
jobGen = new JobGenCheckReader(job, scheduler, ncMap, numPartitionPerMachine);
+ break;
}
start = System.currentTimeMillis();
@@ -136,7 +140,7 @@
}
public static void main(String[] args) throws Exception {
- GenomixJob jobConf = new GenomixJob();
+ GenomixJobConf jobConf = new GenomixJobConf();
String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
if (otherArgs.length < 4) {
System.err.println("Need <serverIP> <port> <input> <output>");
@@ -152,6 +156,7 @@
Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
jobConf.set("mapred.input.dir", path.toString());
+ @SuppressWarnings("deprecation")
Path outputDir = new Path(jobConf.getWorkingDirectory(), otherArgs[3]);
jobConf.set("mapred.output.dir", outputDir.toString());
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
similarity index 84%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
index 4b0c984..fe9afdf 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
@@ -20,7 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
-public class GenomixJob extends JobConf {
+public class GenomixJobConf extends JobConf {
public static final String JOB_NAME = "genomix";
@@ -61,14 +61,21 @@
public static final boolean DEFAULT_REVERSED = false;
- public static final String DEFAULT_GROUPBY_TYPE = "hybrid";
- public static final String DEFAULT_OUTPUT_FORMAT = "binary";
+ public static final String JOB_PLAN_GRAPHBUILD = "graphbuild";
+ public static final String JOB_PLAN_GRAPHSTAT = "graphstat";
+
+ public static final String GROUPBY_TYPE_HYBRID = "hybrid";
+ public static final String GROUPBY_TYPE_EXTERNAL = "external";
+ public static final String GROUPBY_TYPE_PRECLUSTER = "precluster";
+ public static final String OUTPUT_FORMAT_BINARY = "binary";
+ public static final String OUTPUT_FORMAT_TEXT = "text";
- public GenomixJob() throws IOException {
+
+ public GenomixJobConf() throws IOException {
super(new Configuration());
}
- public GenomixJob(Configuration conf) throws IOException {
+ public GenomixJobConf(Configuration conf) throws IOException {
super(conf);
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
index d17d9ef..a9dfc9b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
@@ -18,14 +18,14 @@
import java.io.Serializable;
import java.util.UUID;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
-public abstract class JobGen implements Serializable{
+public abstract class JobGen implements Serializable {
/**
*
@@ -34,13 +34,9 @@
protected final ConfFactory confFactory;
protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
- public JobGen(GenomixJob job) throws HyracksDataException {
+ public JobGen(GenomixJobConf job) throws HyracksDataException {
this.confFactory = new ConfFactory(job);
- this.initJobConfiguration();
}
- protected abstract void initJobConfiguration()throws HyracksDataException ;
-
public abstract JobSpecification generateJob() throws HyracksException;
-
}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index 4adddb6..2f40337 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -15,6 +15,7 @@
package edu.uci.ics.genomix.hyracks.job;
+import java.io.IOException;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -91,8 +92,8 @@
protected ConfFactory hadoopJobConfFactory;
protected static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
- protected Scheduler scheduler;
protected String[] ncNodeNames;
+ protected String[] readSchedule;
protected int readLength;
protected int kmerSize;
@@ -107,17 +108,16 @@
LOG.debug(status + " nc nodes:" + ncNodeNames.length);
}
- public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
+ public JobGenBrujinGraph(GenomixJobConf job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
int numPartitionPerMachine) throws HyracksDataException {
super(job);
- this.scheduler = scheduler;
String[] nodes = new String[ncMap.size()];
ncMap.keySet().toArray(nodes);
ncNodeNames = new String[nodes.length * numPartitionPerMachine];
for (int i = 0; i < numPartitionPerMachine; i++) {
System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
}
- logDebug("initialize");
+ initJobConfiguration(scheduler);
}
private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
@@ -165,12 +165,9 @@
public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
try {
-
InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
.getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
- LOG.info("HDFS read into " + splits.length + " splits");
- String[] readSchedule = scheduler.getLocationConstraints(splits);
return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(kmerSize,
bGenerateReversedKmer));
@@ -209,10 +206,11 @@
public AbstractOperatorDescriptor generateMapperFromKmerToRead(JobSpecification jobSpec,
AbstractOperatorDescriptor kmerCrossAggregator) {
- //Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,{OtherPosition,...},Kmer)
- RecordDescriptor readIDOutputRec = new RecordDescriptor(
- new ISerializerDeserializer[] { null, null, null, null });
- AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec, readIDOutputRec);
+ // Map (Kmer, {(ReadID,PosInRead),...}) into
+ // (ReadID,PosInRead,{OtherPosition,...},Kmer)
+
+ AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec,
+ MapKmerPositionToReadOperator.readIDOutputRec);
connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapKmerToRead, ncNodeNames,
new OneToOneConnectorDescriptor(jobSpec));
return mapKmerToRead;
@@ -220,7 +218,7 @@
public AbstractOperatorDescriptor generateGroupbyReadJob(JobSpecification jobSpec,
AbstractOperatorDescriptor mapKmerToRead) throws HyracksDataException {
- // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
+ // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
RecordDescriptor readIDCombineRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
RecordDescriptor readIDFinalRec = new RecordDescriptor(
new ISerializerDeserializer[MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
@@ -240,7 +238,8 @@
public AbstractOperatorDescriptor generateMapperFromReadToNode(JobSpecification jobSpec,
AbstractOperatorDescriptor readCrossAggregator) {
- //Map (ReadID, [(Poslist,Kmer) ... ]) to (Node, IncomingList, OutgoingList, Kmer)
+ // Map (ReadID, [(Poslist,Kmer) ... ]) to (Node, IncomingList,
+ // OutgoingList, Kmer)
RecordDescriptor nodeRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null, null });
AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec, nodeRec, kmerSize);
connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
@@ -248,7 +247,7 @@
return mapEachReadToNode;
}
- public AbstractOperatorDescriptor generateRootByWriteKmerGroupbyResult(JobSpecification jobSpec,
+ public AbstractOperatorDescriptor generateKmerWritorOperator(JobSpecification jobSpec,
AbstractOperatorDescriptor kmerCrossAggregator) throws HyracksException {
// Output Kmer
ITupleWriterFactory kmerWriter = null;
@@ -266,11 +265,11 @@
hadoopJobConfFactory.getConf(), kmerWriter);
connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
new OneToOneConnectorDescriptor(jobSpec));
- jobSpec.addRoot(writeKmerOperator);
+ // jobSpec.addRoot(writeKmerOperator);
return writeKmerOperator;
}
- public AbstractOperatorDescriptor generateRootByWriteNodeResult(JobSpecification jobSpec,
+ public AbstractOperatorDescriptor generateNodeWriterOpertator(JobSpecification jobSpec,
AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
ITupleWriterFactory nodeWriter = null;
switch (outputFormat) {
@@ -288,7 +287,6 @@
hadoopJobConfFactory.getConf(), nodeWriter);
connectOperators(jobSpec, mapEachReadToNode, ncNodeNames, writeNodeOperator, ncNodeNames,
new OneToOneConnectorDescriptor(jobSpec));
- jobSpec.addRoot(writeNodeOperator);
return writeNodeOperator;
}
@@ -303,8 +301,8 @@
logDebug("Group by Kmer");
AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
- logDebug("Write kmer to result");
- generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+ // logDebug("Write kmer to result");
+ // generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
logDebug("Map Kmer to Read Operator");
lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
@@ -316,28 +314,27 @@
lastOperator = generateMapperFromReadToNode(jobSpec, lastOperator);
logDebug("Write node to result");
- generateRootByWriteNodeResult(jobSpec, lastOperator);
+ lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);
+ jobSpec.addRoot(lastOperator);
return jobSpec;
}
- @SuppressWarnings("deprecation")
- @Override
- protected void initJobConfiguration() throws HyracksDataException {
+ protected void initJobConfiguration(Scheduler scheduler) throws HyracksDataException {
Configuration conf = confFactory.getConf();
- readLength = conf.getInt(GenomixJob.READ_LENGTH, GenomixJob.DEFAULT_READLEN);
- kmerSize = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
+ readLength = conf.getInt(GenomixJobConf.READ_LENGTH, GenomixJobConf.DEFAULT_READLEN);
+ kmerSize = conf.getInt(GenomixJobConf.KMER_LENGTH, GenomixJobConf.DEFAULT_KMERLEN);
if (kmerSize % 2 == 0) {
kmerSize--;
- conf.setInt(GenomixJob.KMER_LENGTH, kmerSize);
+ conf.setInt(GenomixJobConf.KMER_LENGTH, kmerSize);
}
- frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
- tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
- frameSize = conf.getInt(GenomixJob.FRAME_SIZE, GenomixJob.DEFAULT_FRAME_SIZE);
+ frameLimits = conf.getInt(GenomixJobConf.FRAME_LIMIT, GenomixJobConf.DEFAULT_FRAME_LIMIT);
+ tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
+ frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, GenomixJobConf.DEFAULT_FRAME_SIZE);
- bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER, GenomixJob.DEFAULT_REVERSED);
+ bGenerateReversedKmer = conf.getBoolean(GenomixJobConf.REVERSED_KMER, GenomixJobConf.DEFAULT_REVERSED);
- String type = conf.get(GenomixJob.GROUPBY_TYPE, GenomixJob.DEFAULT_GROUPBY_TYPE);
+ String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
if (type.equalsIgnoreCase("external")) {
groupbyType = GroupbyType.EXTERNAL;
} else if (type.equalsIgnoreCase("precluster")) {
@@ -346,13 +343,22 @@
groupbyType = GroupbyType.HYBRIDHASH;
}
- String output = conf.get(GenomixJob.OUTPUT_FORMAT, GenomixJob.DEFAULT_OUTPUT_FORMAT);
+ String output = conf.get(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
if (output.equalsIgnoreCase("text")) {
outputFormat = OutputFormat.TEXT;
} else {
outputFormat = OutputFormat.BINARY;
}
- hadoopJobConfFactory = new ConfFactory(new JobConf(conf));
+ try {
+ hadoopJobConfFactory = new ConfFactory(new JobConf(conf));
+ @SuppressWarnings("deprecation")
+ InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
+ .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
+ readSchedule = scheduler.getLocationConstraints(splits);
+ } catch (IOException ex) {
+ throw new HyracksDataException(ex);
+ }
+
LOG.info("Genomix Graph Build Configuration");
LOG.info("Kmer:" + kmerSize);
LOG.info("Groupby type:" + type);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
index a18c280..c6a69d4 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
@@ -28,7 +28,7 @@
*/
private static final long serialVersionUID = 1L;
- public JobGenCheckReader(GenomixJob job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+ public JobGenCheckReader(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
int numPartitionPerMachine) throws HyracksDataException {
super(job, scheduler, ncMap, numPartitionPerMachine);
// TODO Auto-generated constructor stub
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java
index 2c1a70a..4ee36a0 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java
@@ -17,7 +17,7 @@
*/
private static final long serialVersionUID = 1L;
- public JobGenCreateKmerInfo(GenomixJob job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+ public JobGenCreateKmerInfo(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
int numPartitionPerMachine) throws HyracksDataException {
super(job, scheduler, ncMap, numPartitionPerMachine);
// TODO Auto-generated constructor stub
@@ -34,7 +34,8 @@
AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
logDebug("Write kmer to result");
- generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+ lastOperator = generateKmerWritorOperator(jobSpec, lastOperator);
+ jobSpec.addRoot(lastOperator);
return jobSpec;
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
index 7649100..8a11379 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
@@ -21,7 +21,7 @@
*/
private static final long serialVersionUID = 1L;
- public JobGenGroupbyReadID(GenomixJob job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+ public JobGenGroupbyReadID(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
int numPartitionPerMachine) throws HyracksDataException {
super(job, scheduler, ncMap, numPartitionPerMachine);
// TODO Auto-generated constructor stub
@@ -37,8 +37,8 @@
logDebug("Group by Kmer");
AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
- logDebug("Write kmer to result");
- generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+ //logDebug("Write kmer to result");
+ //generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
logDebug("Map Kmer to Read Operator");
lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
index 76499e6..4fa554f 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
@@ -21,7 +21,7 @@
*/
private static final long serialVersionUID = 1L;
- public JobGenMapKmerToRead(GenomixJob job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+ public JobGenMapKmerToRead(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
int numPartitionPerMachine) throws HyracksDataException {
super(job, scheduler, ncMap, numPartitionPerMachine);
// TODO Auto-generated constructor stub
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java
index ae2838d..216f631 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java
@@ -40,7 +40,7 @@
import edu.uci.ics.genomix.hyracks.driver.Driver;
import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
-import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
import edu.uci.ics.genomix.type.KmerBytesWritable;
public class JobKmerGroupbyTest {
@@ -76,7 +76,7 @@
FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
- conf.setInt(GenomixJob.KMER_LENGTH, 5);
+ conf.setInt(GenomixJobConf.KMER_LENGTH, 5);
driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
}
@@ -130,7 +130,7 @@
cleanUpReEntry();
TestHybridGroupby();
cleanUpReEntry();
- conf.setBoolean(GenomixJob.REVERSED_KMER, true);
+ conf.setBoolean(GenomixJobConf.REVERSED_KMER, true);
TestExternalReversedGroupby();
cleanUpReEntry();
TestPreClusterReversedGroupby();
@@ -139,53 +139,53 @@
}
public void TestExternalGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "external");
+ conf.set(GenomixJobConf.GROUPBY_TYPE, "external");
System.err.println("Testing ExternalGroupBy");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+ driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults(EXPECTED_PATH));
}
public void TestPreClusterGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
+ conf.set(GenomixJobConf.GROUPBY_TYPE, "precluster");
System.err.println("Testing PreClusterGroupBy");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+ driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults(EXPECTED_PATH));
}
public void TestHybridGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
+ conf.set(GenomixJobConf.GROUPBY_TYPE, "hybrid");
System.err.println("Testing HybridGroupBy");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+ driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults(EXPECTED_PATH));
}
public void TestExternalReversedGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "external");
- conf.setBoolean(GenomixJob.REVERSED_KMER, true);
+ conf.set(GenomixJobConf.GROUPBY_TYPE, "external");
+ conf.setBoolean(GenomixJobConf.REVERSED_KMER, true);
System.err.println("Testing ExternalGroupBy + Reversed");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+ driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
}
public void TestPreClusterReversedGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
- conf.setBoolean(GenomixJob.REVERSED_KMER, true);
+ conf.set(GenomixJobConf.GROUPBY_TYPE, "precluster");
+ conf.setBoolean(GenomixJobConf.REVERSED_KMER, true);
System.err.println("Testing PreclusterGroupBy + Reversed");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+ driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
}
public void TestHybridReversedGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
- conf.setBoolean(GenomixJob.REVERSED_KMER, true);
+ conf.set(GenomixJobConf.GROUPBY_TYPE, "hybrid");
+ conf.setBoolean(GenomixJobConf.REVERSED_KMER, true);
System.err.println("Testing HybridGroupBy + Reversed");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+ driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
}
private boolean checkResults(String expectedPath) throws Exception {
File dumped = null;
- String format = conf.get(GenomixJob.OUTPUT_FORMAT);
+ String format = conf.get(GenomixJobConf.OUTPUT_FORMAT);
if ("text".equalsIgnoreCase(format)) {
FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
@@ -209,8 +209,8 @@
SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
// KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
- GenomixJob.DEFAULT_KMERLEN));
+ KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJobConf.KMER_LENGTH,
+ GenomixJobConf.DEFAULT_KMERLEN));
// KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
KmerBytesWritable value = null;
while (reader.next(key, value)) {
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index fca0607..da74f27 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -25,155 +25,207 @@
import edu.uci.ics.genomix.hyracks.driver.Driver;
import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
-import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+@SuppressWarnings("deprecation")
public class JobRunStepByStepTest {
- private static final int KmerSize = 5;
- private static final String ACTUAL_RESULT_DIR = "actual";
- private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final int KmerSize = 5;
+ private static final String ACTUAL_RESULT_DIR = "actual";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/text.txt";
- private static final String HDFS_INPUT_PATH = "/webmap";
- private static final String HDFS_OUTPUT_PATH = "/webmap_result";
-
- private static final String EXPECTED_DIR = "src/test/resources/expected/";
- private static final String EXPECTED_READER_RESULT = EXPECTED_DIR + "result_after_initial_read";
+ private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/text.txt";
+ private static final String HDFS_INPUT_PATH = "/webmap";
+ private static final String HDFS_OUTPUT_PATH = "/webmap_result";
- private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
- private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
- private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private MiniDFSCluster dfsCluster;
+ private static final String EXPECTED_DIR = "src/test/resources/expected/";
+ private static final String EXPECTED_READER_RESULT = EXPECTED_DIR
+ + "result_after_initial_read";
+ private static final String EXPECTED_OUPUT_KMER = EXPECTED_DIR
+ + "result_after_kmerAggregate";
- private JobConf conf = new JobConf();
- private int numberOfNC = 2;
- private int numPartitionPerMachine = 1;
+ private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR
+ + HDFS_OUTPUT_PATH + "/merged.txt";
+ private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
+ + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
- private Driver driver;
-
- @Test
- public void TestAll() throws Exception {
- TestReader();
- }
-
- public void TestReader() throws Exception{
- conf.set(GenomixJob.GROUPBY_TYPE, "external");
- System.err.println("Testing ExternalGroupBy");
- driver.runJob(new GenomixJob(conf), Plan.CHECK_KMERREADER, true);
- Assert.assertEquals(true, checkResults(EXPECTED_READER_RESULT));
- }
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
+ private int numPartitionPerMachine = 1;
- @Before
- public void setUp() throws Exception {
- cleanupStores();
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
- FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
- FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- startHDFS();
+ private Driver driver;
- FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
- FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
+ @Test
+ public void TestAll() throws Exception {
+ //TestReader();
+ TestGroupbyKmer();
+ // TestMapKmerToRead();
+ // TestGroupByReadID();
+ // TestEndToEnd();
+ }
- conf.setInt(GenomixJob.KMER_LENGTH, KmerSize);
- driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
- }
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File("teststore"));
- FileUtils.forceMkdir(new File("build"));
- FileUtils.cleanDirectory(new File("teststore"));
- FileUtils.cleanDirectory(new File("build"));
- }
+ public void TestReader() throws Exception {
+ conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+ driver.runJob(new GenomixJobConf(conf), Plan.CHECK_KMERREADER, true);
+ Assert.assertEquals(true, checkResults(EXPECTED_READER_RESULT));
+ }
- private void startHDFS() throws IOException {
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ public void TestGroupbyKmer() throws Exception {
+ conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+ conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
+ driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_KMERHASHTABLE, true);
+ Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
+ }
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path("build"), true);
- System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
- FileSystem dfs = FileSystem.get(conf);
- Path src = new Path(DATA_INPUT_PATH);
- Path dest = new Path(HDFS_INPUT_PATH);
- dfs.mkdirs(dest);
- //dfs.mkdirs(result);
- dfs.copyFromLocalFile(src, dest);
+ public void TestMapKmerToRead() throws Exception {
+ conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+ driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_MAP_KMER_TO_READ, true);
+ Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
+ }
- DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
- conf.writeXml(confOutput);
- confOutput.flush();
- confOutput.close();
- }
+ public void TestGroupByReadID() throws Exception {
+ conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+ conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
+ driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_GROUPBY_READID, true);
+ Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
+ }
- private void cleanUpReEntry() throws IOException {
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- if (lfs.exists(new Path(DUMPED_RESULT))) {
- lfs.delete(new Path(DUMPED_RESULT), true);
- }
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
- dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
- }
- }
-
- private boolean checkResults(String expectedPath) throws Exception {
- File dumped = null;
- String format = conf.get(GenomixJob.OUTPUT_FORMAT);
- if ("text".equalsIgnoreCase(format)) {
- FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
- FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
- dumped = new File(DUMPED_RESULT);
- } else {
+ public void TestEndToEnd() throws Exception {
+ conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+ cleanUpReEntry();
+ conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
+ driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+ Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
+ cleanUpReEntry();
+ conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+ driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+ Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
+ }
- FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
- File filePathTo = new File(CONVERT_RESULT);
- BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
- for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
- String partname = "/part-" + i;
- // FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
- // + partname), FileSystem.getLocal(new Configuration()),
- // new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname), false, conf);
+ @Before
+ public void setUp() throws Exception {
+ cleanupStores();
+ edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
- Path path = new Path(HDFS_OUTPUT_PATH + partname);
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.getFileStatus(path).getLen() == 0) {
- continue;
- }
- SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
+ FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
+ FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
- // KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
- GenomixJob.DEFAULT_KMERLEN));
-// KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- KmerBytesWritable value = null;
- while (reader.next(key, value)) {
- if (key == null || value == null) {
- break;
- }
- bw.write(key.toString() + "\t" + value.toString());
- System.out.println(key.toString() + "\t" + value.toString());
- bw.newLine();
- }
- reader.close();
- }
- bw.close();
- dumped = new File(CONVERT_RESULT);
- }
+ conf.setInt(GenomixJobConf.KMER_LENGTH, KmerSize);
+ driver = new Driver(
+ edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
+ edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT,
+ numPartitionPerMachine);
+ }
- TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
- return true;
- }
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
- @After
- public void tearDown() throws Exception {
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
- cleanupHDFS();
- }
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
+ Path src = new Path(DATA_INPUT_PATH);
+ Path dest = new Path(HDFS_INPUT_PATH);
+ dfs.mkdirs(dest);
+ // dfs.mkdirs(result);
+ dfs.copyFromLocalFile(src, dest);
+
+ DataOutputStream confOutput = new DataOutputStream(
+ new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
+
+ private void cleanUpReEntry() throws IOException {
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ if (lfs.exists(new Path(DUMPED_RESULT))) {
+ lfs.delete(new Path(DUMPED_RESULT), true);
+ }
+ FileSystem dfs = FileSystem.get(conf);
+ if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
+ dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
+ }
+ }
+
+ private boolean checkResults(String expectedPath) throws Exception {
+ File dumped = null;
+ String format = conf.get(GenomixJobConf.OUTPUT_FORMAT);
+ if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(format)) {
+ FileUtil.copyMerge(FileSystem.get(conf),
+ new Path(HDFS_OUTPUT_PATH), FileSystem
+ .getLocal(new Configuration()), new Path(
+ DUMPED_RESULT), false, conf, null);
+ dumped = new File(DUMPED_RESULT);
+ } else {
+
+ FileSystem.getLocal(new Configuration()).mkdirs(
+ new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
+ File filePathTo = new File(CONVERT_RESULT);
+ BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+ for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
+ String partname = "/part-" + i;
+ // FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
+ // + partname), FileSystem.getLocal(new Configuration()),
+ // new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname),
+ // false, conf);
+
+ Path path = new Path(HDFS_OUTPUT_PATH + partname);
+ FileSystem dfs = FileSystem.get(conf);
+ if (dfs.getFileStatus(path).getLen() == 0) {
+ continue;
+ }
+ SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path,
+ conf);
+
+ // KmerBytesWritable key = (KmerBytesWritable)
+ // ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ KmerBytesWritable key = new KmerBytesWritable(conf.getInt(
+ GenomixJobConf.KMER_LENGTH, GenomixJobConf.DEFAULT_KMERLEN));
+ // KmerCountValue value = (KmerCountValue)
+ // ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ KmerBytesWritable value = null;
+ while (reader.next(key, value)) {
+ if (key == null || value == null) {
+ break;
+ }
+ bw.write(key.toString() + "\t" + value.toString());
+ System.out
+ .println(key.toString() + "\t" + value.toString());
+ bw.newLine();
+ }
+ reader.close();
+ }
+ bw.close();
+ dumped = new File(CONVERT_RESULT);
+ }
+
+ TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+ return true;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
+ cleanupHDFS();
+ }
+
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
}