passed kmer groupby test
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index e91dd66..b36b82d 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -121,7 +121,7 @@
throw new IllegalArgumentException("Position id is beyond 127 at " + readID);
}
tupleBuilder.reset();
- tupleBuilder.addField(kmer.getBytes(), 0, kmer.getLength());
+ tupleBuilder.addField(kmer.getBytes(), kmer.getOffset(), kmer.getLength());
pos.set(readID, (byte) posInRead);
tupleBuilder.addField(pos.getByteArray(), pos.getStartOffset(), pos.getLength());
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
index bfc5ae5..4006fb4 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -47,6 +47,7 @@
@Override
public AggregateState createAggregateStates() {
+ System.out.println("CreateState");
return new AggregateState(new ArrayBackedValueStorage());
}
@@ -55,15 +56,18 @@
AggregateState state) throws HyracksDataException {
ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
inputVal.reset();
- position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor,tIndex,1));
+ position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
inputVal.append(position);
+
+ // make an empty field
+ tupleBuilder.addFieldEndOffset();
}
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
- position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor,tIndex,1));
+ position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
inputVal.append(position);
}
@@ -80,6 +84,7 @@
ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
try {
fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+
tupleBuilder.addFieldEndOffset();
} catch (IOException e) {
throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
index b98db56..0f791a1 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -36,9 +36,9 @@
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
throws HyracksDataException {
- return new IAggregatorDescriptor (){
+ return new IAggregatorDescriptor() {
- private PositionReference positionReEntry = new PositionReference();
+ private PositionReference position = new PositionReference();
@Override
public AggregateState createAggregateStates() {
@@ -46,31 +46,35 @@
}
@Override
- public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage)state.state;
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
inputVal.reset();
int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
- for( int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex, 1); offset += PositionReference.LENGTH){
- positionReEntry.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
- inputVal.append(positionReEntry);
+ for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
+ 1); offset += PositionReference.LENGTH) {
+ position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
+ inputVal.append(position);
}
+ //make a fake feild to cheat caller
+ tupleBuilder.addFieldEndOffset();
}
@Override
public void reset() {
// TODO Auto-generated method stub
-
+
}
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
- ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage)state.state;
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
- for( int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex, 1); offset += PositionReference.LENGTH){
- positionReEntry.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
- inputVal.append(positionReEntry);
+ for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
+ 1); offset += PositionReference.LENGTH) {
+ position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
+ inputVal.append(position);
}
}
@@ -88,6 +92,7 @@
try {
fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
tupleBuilder.addFieldEndOffset();
+
} catch (IOException e) {
throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
}
@@ -96,10 +101,10 @@
@Override
public void close() {
// TODO Auto-generated method stub
-
+
}
-
+
};
-
+
}
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
index 981b2de..0a1ef96 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
@@ -71,8 +71,8 @@
throw new IllegalArgumentException("Not enough kmer bytes");
}
reEnterKey.setNewReference(tuple.getFieldData(InputKmerField), tuple.getFieldStart(InputKmerField));
- int countOfPos = tuple.getFieldLength(InputPositionListField);
- if (countOfPos % PositionWritable.LENGTH != 0) {
+ int countOfPos = tuple.getFieldLength(InputPositionListField)/PositionWritable.LENGTH;
+ if (tuple.getFieldLength(InputPositionListField) % PositionWritable.LENGTH != 0) {
throw new IllegalArgumentException("Invalid count of position byte");
}
plist.setNewReference(countOfPos, tuple.getFieldData(InputPositionListField),
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
index 664a2d2..f614846 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
@@ -48,9 +48,11 @@
if (kmer.getLength() > tuple.getFieldLength(KMerSequenceWriterFactory.InputKmerField)) {
throw new IllegalArgumentException("Not enough kmer bytes");
}
- kmer.setNewReference(tuple.getFieldData(KMerSequenceWriterFactory.InputKmerField), tuple.getFieldStart(KMerSequenceWriterFactory.InputKmerField));
- int countOfPos = tuple.getFieldLength(KMerSequenceWriterFactory.InputPositionListField);
- if (countOfPos % PositionWritable.LENGTH != 0) {
+ kmer.setNewReference(tuple.getFieldData(KMerSequenceWriterFactory.InputKmerField),
+ tuple.getFieldStart(KMerSequenceWriterFactory.InputKmerField));
+ int countOfPos = tuple.getFieldLength(KMerSequenceWriterFactory.InputPositionListField)
+ / PositionWritable.LENGTH;
+ if (tuple.getFieldLength(KMerSequenceWriterFactory.InputPositionListField) % PositionWritable.LENGTH != 0) {
throw new IllegalArgumentException("Invalid count of position byte");
}
plist.setNewReference(countOfPos, tuple.getFieldData(KMerSequenceWriterFactory.InputPositionListField),
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index 2f40337..aa0c05d 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -58,6 +58,7 @@
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
@@ -67,6 +68,7 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
@@ -132,12 +134,12 @@
tableSize), true);
}
- private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec,
+ private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec, int[] keyFields,
IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
IPointableFactory pointable, RecordDescriptor combineRed, RecordDescriptor finalRec)
throws HyracksDataException {
- int[] keyFields = new int[] { 0 }; // the id of grouped key
+
Object[] obj = new Object[3];
switch (groupbyType) {
@@ -150,7 +152,9 @@
break;
case PRECLUSTER:
default:
- obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
+
+ obj[0] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, aggregator,
combineRed);
obj[1] = new MToNPartitioningMergingConnectorDescriptor(jobSpec, partition, keyFields,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) });
@@ -185,16 +189,23 @@
public AbstractOperatorDescriptor generateGroupbyKmerJob(JobSpecification jobSpec,
AbstractOperatorDescriptor readOperator) throws HyracksDataException {
+ int[] keyFields = new int[] { 0 }; // the id of grouped key
+
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+ ReadsKeyValueParserFactory.readKmerOutputRec);
+ connectOperators(jobSpec, readOperator, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
+ jobSpec));
RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
jobSpec.setFrameSize(frameSize);
- Object[] objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateKmerAggregateFactory(),
+ Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateKmerAggregateFactory(),
new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
logDebug("LocalKmerGroupby Operator");
- connectOperators(jobSpec, readOperator, ncNodeNames, kmerLocalAggregator, ncNodeNames,
+ connectOperators(jobSpec, sorter, ncNodeNames, kmerLocalAggregator, ncNodeNames,
new OneToOneConnectorDescriptor(jobSpec));
logDebug("CrossKmerGroupby Operator");
@@ -218,15 +229,21 @@
public AbstractOperatorDescriptor generateGroupbyReadJob(JobSpecification jobSpec,
AbstractOperatorDescriptor mapKmerToRead) throws HyracksDataException {
+ int[] keyFields = new int[] { 0 }; // the id of grouped key
// (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+ ReadsKeyValueParserFactory.readKmerOutputRec);
+ connectOperators(jobSpec, mapKmerToRead, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
+ jobSpec));
RecordDescriptor readIDCombineRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
RecordDescriptor readIDFinalRec = new RecordDescriptor(
new ISerializerDeserializer[MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
- Object[] objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateReadIDAggregateFactory(),
+ Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateReadIDAggregateFactory(),
new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(),
new ReadIDNormarlizedComputeFactory(), IntegerPointable.FACTORY, readIDCombineRec, readIDFinalRec);
AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
- connectOperators(jobSpec, mapKmerToRead, ncNodeNames, readLocalAggregator, ncNodeNames,
+ connectOperators(jobSpec, sorter, ncNodeNames, readLocalAggregator, ncNodeNames,
new OneToOneConnectorDescriptor(jobSpec));
logDebug("Group by ReadID merger");
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index da74f27..6cb2491 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -30,202 +30,193 @@
@SuppressWarnings("deprecation")
public class JobRunStepByStepTest {
- private static final int KmerSize = 5;
- private static final String ACTUAL_RESULT_DIR = "actual";
- private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final int KmerSize = 5;
+ private static final String ACTUAL_RESULT_DIR = "actual";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/text.txt";
- private static final String HDFS_INPUT_PATH = "/webmap";
- private static final String HDFS_OUTPUT_PATH = "/webmap_result";
+ private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/text.txt";
+ private static final String HDFS_INPUT_PATH = "/webmap";
+ private static final String HDFS_OUTPUT_PATH = "/webmap_result";
- private static final String EXPECTED_DIR = "src/test/resources/expected/";
- private static final String EXPECTED_READER_RESULT = EXPECTED_DIR
- + "result_after_initial_read";
- private static final String EXPECTED_OUPUT_KMER = EXPECTED_DIR
- + "result_after_kmerAggregate";
+ private static final String EXPECTED_DIR = "src/test/resources/expected/";
+ private static final String EXPECTED_READER_RESULT = EXPECTED_DIR + "result_after_initial_read";
+ private static final String EXPECTED_OUPUT_KMER = EXPECTED_DIR + "result_after_kmerAggregate";
- private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR
- + HDFS_OUTPUT_PATH + "/merged.txt";
- private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
- private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
- + File.separator + "conf.xml";
- private MiniDFSCluster dfsCluster;
+ private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
+ private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
- private JobConf conf = new JobConf();
- private int numberOfNC = 2;
- private int numPartitionPerMachine = 1;
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
+ private int numPartitionPerMachine = 1;
- private Driver driver;
+ private Driver driver;
- @Test
- public void TestAll() throws Exception {
- //TestReader();
- TestGroupbyKmer();
- // TestMapKmerToRead();
- // TestGroupByReadID();
- // TestEndToEnd();
- }
+ @Test
+ public void TestAll() throws Exception {
+ //TestReader();
+ TestGroupbyKmer();
+ // TestMapKmerToRead();
+ // TestGroupByReadID();
+ // TestEndToEnd();
+ }
- public void TestReader() throws Exception {
- conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
- driver.runJob(new GenomixJobConf(conf), Plan.CHECK_KMERREADER, true);
- Assert.assertEquals(true, checkResults(EXPECTED_READER_RESULT));
- }
+ public void TestReader() throws Exception {
+ conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+ driver.runJob(new GenomixJobConf(conf), Plan.CHECK_KMERREADER, true);
+ Assert.assertEquals(true, checkResults(EXPECTED_READER_RESULT, false));
+ }
- public void TestGroupbyKmer() throws Exception {
- conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
- conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
- driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_KMERHASHTABLE, true);
- Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
- }
+ public void TestGroupbyKmer() throws Exception {
+ conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+ conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+ driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_KMERHASHTABLE, true);
+ Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER, true));
+ }
- public void TestMapKmerToRead() throws Exception {
- conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
- driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_MAP_KMER_TO_READ, true);
- Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
- }
+ public void TestMapKmerToRead() throws Exception {
+ conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+ driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_MAP_KMER_TO_READ, true);
+ Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER,false));
+ }
- public void TestGroupByReadID() throws Exception {
- conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
- conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
- driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_GROUPBY_READID, true);
- Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
- }
+ public void TestGroupByReadID() throws Exception {
+ conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+ conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
+ driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_GROUPBY_READID, true);
+ Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER,false));
+ }
- public void TestEndToEnd() throws Exception {
- conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
- cleanUpReEntry();
- conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
- driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
- cleanUpReEntry();
- conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
- driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
- }
+ public void TestEndToEnd() throws Exception {
+ conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+ cleanUpReEntry();
+ conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
+ driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+ Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER,false));
+ cleanUpReEntry();
+ conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+ driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+ Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER,false));
+ }
- @Before
- public void setUp() throws Exception {
- cleanupStores();
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
- FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
- FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- startHDFS();
+ @Before
+ public void setUp() throws Exception {
+ cleanupStores();
+ edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
- FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
- FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
+ FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
+ FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
- conf.setInt(GenomixJobConf.KMER_LENGTH, KmerSize);
- driver = new Driver(
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT,
- numPartitionPerMachine);
- }
+ conf.setInt(GenomixJobConf.KMER_LENGTH, KmerSize);
+ driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
+ edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
+ }
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File("teststore"));
- FileUtils.forceMkdir(new File("build"));
- FileUtils.cleanDirectory(new File("teststore"));
- FileUtils.cleanDirectory(new File("build"));
- }
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
- private void startHDFS() throws IOException {
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path("build"), true);
- System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
- FileSystem dfs = FileSystem.get(conf);
- Path src = new Path(DATA_INPUT_PATH);
- Path dest = new Path(HDFS_INPUT_PATH);
- dfs.mkdirs(dest);
- // dfs.mkdirs(result);
- dfs.copyFromLocalFile(src, dest);
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
+ Path src = new Path(DATA_INPUT_PATH);
+ Path dest = new Path(HDFS_INPUT_PATH);
+ dfs.mkdirs(dest);
+ // dfs.mkdirs(result);
+ dfs.copyFromLocalFile(src, dest);
- DataOutputStream confOutput = new DataOutputStream(
- new FileOutputStream(new File(HADOOP_CONF_PATH)));
- conf.writeXml(confOutput);
- confOutput.flush();
- confOutput.close();
- }
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
- private void cleanUpReEntry() throws IOException {
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- if (lfs.exists(new Path(DUMPED_RESULT))) {
- lfs.delete(new Path(DUMPED_RESULT), true);
- }
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
- dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
- }
- }
+ private void cleanUpReEntry() throws IOException {
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ if (lfs.exists(new Path(DUMPED_RESULT))) {
+ lfs.delete(new Path(DUMPED_RESULT), true);
+ }
+ FileSystem dfs = FileSystem.get(conf);
+ if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
+ dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
+ }
+ }
- private boolean checkResults(String expectedPath) throws Exception {
- File dumped = null;
- String format = conf.get(GenomixJobConf.OUTPUT_FORMAT);
- if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(format)) {
- FileUtil.copyMerge(FileSystem.get(conf),
- new Path(HDFS_OUTPUT_PATH), FileSystem
- .getLocal(new Configuration()), new Path(
- DUMPED_RESULT), false, conf, null);
- dumped = new File(DUMPED_RESULT);
- } else {
+ private boolean checkResults(String expectedPath, boolean checkPos) throws Exception {
+ File dumped = null;
+ String format = conf.get(GenomixJobConf.OUTPUT_FORMAT);
+ if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(format)) {
+ FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
+ FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
+ dumped = new File(DUMPED_RESULT);
+ } else {
- FileSystem.getLocal(new Configuration()).mkdirs(
- new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
- File filePathTo = new File(CONVERT_RESULT);
- BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
- for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
- String partname = "/part-" + i;
- // FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
- // + partname), FileSystem.getLocal(new Configuration()),
- // new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname),
- // false, conf);
+ FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
+ File filePathTo = new File(CONVERT_RESULT);
+ BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+ for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
+ String partname = "/part-" + i;
+ // FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
+ // + partname), FileSystem.getLocal(new Configuration()),
+ // new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname),
+ // false, conf);
- Path path = new Path(HDFS_OUTPUT_PATH + partname);
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.getFileStatus(path).getLen() == 0) {
- continue;
- }
- SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path,
- conf);
+ Path path = new Path(HDFS_OUTPUT_PATH + partname);
+ FileSystem dfs = FileSystem.get(conf);
+ if (dfs.getFileStatus(path).getLen() == 0) {
+ continue;
+ }
+ SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
- // KmerBytesWritable key = (KmerBytesWritable)
- // ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- KmerBytesWritable key = new KmerBytesWritable(conf.getInt(
- GenomixJobConf.KMER_LENGTH, GenomixJobConf.DEFAULT_KMERLEN));
- // KmerCountValue value = (KmerCountValue)
- // ReflectionUtils.newInstance(reader.getValueClass(), conf);
- KmerBytesWritable value = null;
- while (reader.next(key, value)) {
- if (key == null || value == null) {
- break;
- }
- bw.write(key.toString() + "\t" + value.toString());
- System.out
- .println(key.toString() + "\t" + value.toString());
- bw.newLine();
- }
- reader.close();
- }
- bw.close();
- dumped = new File(CONVERT_RESULT);
- }
+ // KmerBytesWritable key = (KmerBytesWritable)
+ // ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJobConf.KMER_LENGTH, KmerSize));
+ // KmerCountValue value = (KmerCountValue)
+ // ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ KmerBytesWritable value = null;
+ while (reader.next(key, value)) {
+ if (key == null || value == null) {
+ break;
+ }
+ bw.write(key.toString() + "\t" + value.toString());
+ System.out.println(key.toString() + "\t" + value.toString());
+ bw.newLine();
+ }
+ reader.close();
+ }
+ bw.close();
+ dumped = new File(CONVERT_RESULT);
+ }
- TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
- return true;
- }
+ if (checkPos) {
+ TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped);
+ } else {
+ TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+ }
+ return true;
+ }
- @After
- public void tearDown() throws Exception {
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
- cleanupHDFS();
- }
+ @After
+ public void tearDown() throws Exception {
+ edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
+ cleanupHDFS();
+ }
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
index bcfdedd..e97df1b 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
@@ -61,6 +61,38 @@
}
}
+ public static void compareWithUnSortedPosition(File expectedFile, File actualFile) throws Exception {
+ BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
+ BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
+ ArrayList<String> actualLines = new ArrayList<String>();
+ String lineExpected, lineActual;
+ try {
+ while ((lineActual = readerActual.readLine()) != null) {
+ actualLines.add(lineActual);
+ }
+ Collections.sort(actualLines);
+ int num = 1;
+ for (String actualLine : actualLines) {
+ lineExpected = readerExpected.readLine();
+ if (lineExpected == null) {
+ throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
+ }
+ if (!containStrings(lineExpected, actualLine)) {
+ throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
+ + actualLine);
+ }
+ ++num;
+ }
+ lineExpected = readerExpected.readLine();
+ if (lineExpected != null) {
+ throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineExpected);
+ }
+ } finally {
+ readerActual.close();
+ readerExpected.close();
+ }
+ }
+
public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
@@ -130,4 +162,40 @@
return true;
}
+ private static boolean containStrings(String lineExpected, String actualLine) {
+ String keyExp = lineExpected.split("\\\t")[0];
+ String keyAct = actualLine.split("\\\t")[0];
+ if (!keyAct.equals(keyExp)) {
+ return false;
+ }
+
+ ArrayList<String> posExp = new ArrayList<String>();
+ ArrayList<String> posAct = new ArrayList<String>();
+
+ String valueExp = lineExpected.split("\\\t")[1];
+ String[] valuesExp = valueExp.substring(1, valueExp.length() - 1).split(",");
+
+ for (String str : valuesExp) {
+ posExp.add(str);
+ }
+
+ String valueAct = actualLine.split("\\\t")[1];
+ String[] valuesAct = valueAct.substring(1, valueAct.length() - 1).split(",");
+
+ for (String str : valuesAct) {
+ posAct.add(str);
+ }
+
+ if (posExp.size() != posAct.size()) {
+ return false;
+ }
+ Collections.sort(posExp);
+ Collections.sort(posAct);
+ for (int i = 0; i < posExp.size(); i++) {
+ if (!posExp.get(i).equals(posAct.get(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
}
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read b/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read
new file mode 100644
index 0000000..728c093
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read
@@ -0,0 +1,20 @@
+AATAG (1,0)
+AATAG (2,0)
+AATAG (3,0)
+AATAG (4,0)
+AATAG (5,0)
+AGAAG (1,3)
+AGAAG (2,3)
+AGAAG (3,3)
+AGAAG (4,3)
+AGAAG (5,3)
+ATAGA (1,1)
+ATAGA (2,1)
+ATAGA (3,1)
+ATAGA (4,1)
+ATAGA (5,1)
+TAGAA (1,2)
+TAGAA (2,2)
+TAGAA (3,2)
+TAGAA (4,2)
+TAGAA (5,2)
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate
new file mode 100644
index 0000000..d5624d7
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate
@@ -0,0 +1,4 @@
+AATAG [(1,0),(2,0),(3,0),(4,0),(5,0)]
+AGAAG [(1,3),(2,3),(3,3),(4,3),(5,3)]
+ATAGA [(1,1),(2,1),(3,1),(4,1),(5,1)]
+TAGAA [(1,2),(2,2),(3,2),(4,2),(5,2)]