finish the debugging
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
index 2aee32d..f7caebb 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
@@ -59,7 +59,7 @@
public void append(KmerBytesWritable kmer) {
setSize((1 + valueCount) * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
- System.arraycopy(kmer.getBytes(), 0, storage,
+ System.arraycopy(kmer.getBytes(), kmer.offset, storage,
offset + HEADER_SIZE + valueCount * KmerBytesWritable.getBytesPerKmer(),
KmerBytesWritable.getBytesPerKmer());
valueCount += 1;
@@ -96,7 +96,7 @@
}
valueCount = 0;
setSize(newSize * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
- for (KmerBytesWritable kmer : uniqueElements) {
+ for (KmerBytesWritable kmer : uniqueElements) { // this point is not efficient
append(kmer);
}
Marshal.putInt(valueCount, storage, offset);
@@ -126,6 +126,7 @@
public void reset() {
valueCount = 0;
+ Marshal.putInt(valueCount, storage, offset);
}
public KmerBytesWritable getPosition(int i) {
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 362c12e..15a6cc9 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -71,7 +71,6 @@
this.kmer.reset(0);
}
-
public PositionListWritable getNodeIdList() {
return nodeIdList;
}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/NodeWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/NodeWritableTest.java
new file mode 100644
index 0000000..5bcf663
--- /dev/null
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/NodeWritableTest.java
@@ -0,0 +1,105 @@
+package edu.uci.ics.genomix.data.test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class NodeWritableTest {
+
+ @Test
+ public void testNodeReset() throws IOException {
+
+ NodeWritable outputNode = new NodeWritable();
+ NodeWritable inputNode = new NodeWritable();
+
+ KmerListWritable nextKmerList = new KmerListWritable();
+ KmerListWritable preKmerList = new KmerListWritable();
+ KmerBytesWritable preKmer = new KmerBytesWritable();
+ KmerBytesWritable curKmer = new KmerBytesWritable();
+ KmerBytesWritable nextKmer = new KmerBytesWritable();
+ PositionWritable nodeId = new PositionWritable();
+ PositionListWritable nodeIdList = new PositionListWritable();
+ KmerBytesWritable.setGlobalKmerLength(5);
+
+ nodeId.set((byte)0, (long)1, 0);
+ nodeIdList.append(nodeId);
+ for (int i = 6; i <= 10; i++) {
+ NodeWritable tempNode = new NodeWritable();
+
+ String randomString = generaterRandomString(i);
+ byte[] array = randomString.getBytes();
+
+ curKmer.setByRead(array, 0);
+ preKmer.setAsCopy(curKmer);
+ nextKmer.setAsCopy(curKmer);
+ nextKmer.shiftKmerWithNextChar(array[5]);
+
+ nextKmerList.append(nextKmer);
+
+ outputNode.setNodeIdList(nodeIdList);
+ outputNode.setFFList(nextKmerList);
+
+ tempNode.setNodeIdList(nodeIdList);
+ tempNode.setFFList(nextKmerList);
+
+ inputNode.setAsReference(outputNode.marshalToByteArray(), 0);
+ Assert.assertEquals(tempNode.toString(), inputNode.toString());
+
+ int j = 5;
+ for (; j < array.length - 1; j++) {
+ outputNode.reset();
+ curKmer.setAsCopy(nextKmer);
+
+ nextKmer.shiftKmerWithNextChar(array[j+1]);
+ nextKmerList.reset();
+ nextKmerList.append(nextKmer);
+ preKmerList.reset();
+ preKmerList.append(preKmer);
+ outputNode.setNodeIdList(nodeIdList);
+ outputNode.setFFList(nextKmerList);
+ outputNode.setRRList(preKmerList);
+ tempNode.reset();
+ tempNode.setNodeIdList(nodeIdList);
+ tempNode.setFFList(nextKmerList);
+ tempNode.setRRList(preKmerList);
+ preKmer.setAsCopy(curKmer);
+ inputNode.reset();
+ inputNode.setAsReference(outputNode.marshalToByteArray(), 0);
+ Assert.assertEquals(tempNode.toString(), inputNode.toString());
+ }
+ curKmer.setAsCopy(nextKmer);
+ preKmerList.reset();
+ preKmerList.append(preKmer);
+ outputNode.reset();
+ outputNode.setNodeIdList(nodeIdList);
+ outputNode.setRRList(preKmerList);
+ tempNode.reset();
+ tempNode.setNodeIdList(nodeIdList);
+ tempNode.setRRList(preKmerList);
+ inputNode.reset();
+ inputNode.setAsReference(outputNode.marshalToByteArray(), 0);
+ Assert.assertEquals(tempNode.toString(), inputNode.toString());
+ }
+ }
+
+ public String generaterRandomString(int n) {
+ char[] chars = "ACGT".toCharArray();
+ StringBuilder sb = new StringBuilder();
+ Random random = new Random();
+ for (int i = 0; i < n; i++) {
+ char c = chars[random.nextInt(chars.length)];
+ sb.append(c);
+ }
+ return sb.toString();
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
index 84f9fe0..fc5ca34 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
@@ -135,8 +135,8 @@
writeToFrame(writer);
/*middle kmer*/
- int i = kmerSize;
- for (; i < array.length - 1; i++) {
+ int i = kmerSize + 1;
+ for (; i < array.length; i++) {
outputNode.reset();
setPreKmerByOldCurKmer();
setCurKmerByOldNextKmer();
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
index 46fdd0e..a484179 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -48,6 +48,7 @@
return new IAggregatorDescriptor() {
private NodeWritable readNode = new NodeWritable();
+// private KmerBytesWritable readKeyKmer = new KmerBytesWritable();
protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -75,13 +76,14 @@
AggregateState state) throws HyracksDataException {
NodeWritable localUniNode = (NodeWritable) state.state;
localUniNode.reset();
+// readKeyKmer.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 0));
readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
localUniNode.getFFList().appendList(readNode.getFFList());
localUniNode.getFRList().appendList(readNode.getFRList());
localUniNode.getRFList().appendList(readNode.getRFList());
localUniNode.getRRList().appendList(readNode.getRRList());
-
+
// make an empty field
tupleBuilder.addFieldEndOffset();// mark question?
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/assembleKeyIntoNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/assembleKeyIntoNodeOperator.java
new file mode 100644
index 0000000..36a1d1e
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/assembleKeyIntoNodeOperator.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.newgraph.dataflow;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class assembleKeyIntoNodeOperator extends AbstractSingleActivityOperatorDescriptor {
+
+ public assembleKeyIntoNodeOperator(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int kmerSize) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = outRecDesc;
+ this.kmerSize = kmerSize;
+ KmerBytesWritable.setGlobalKmerLength(this.kmerSize);
+ }
+
+ private static final long serialVersionUID = 1L;
+ private final int kmerSize;
+
+ public static final int InputKmerField = 0;
+ public static final int InputtempNodeField = 1;
+ public static final int OutputNodeField = 0;
+
+ public static final RecordDescriptor nodeOutputRec = new RecordDescriptor(new ISerializerDeserializer[1]);
+
+ public class MapReadToNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ public static final int INT_LENGTH = 4;
+ private final IHyracksTaskContext ctx;
+ private final RecordDescriptor inputRecDesc;
+ private final RecordDescriptor outputRecDesc;
+
+ private FrameTupleAccessor accessor;
+ private ByteBuffer writeBuffer;
+ private ArrayTupleBuilder builder;
+ private FrameTupleAppender appender;
+
+ NodeWritable readNode;
+ KmerBytesWritable readKmer;
+
+ public MapReadToNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
+ RecordDescriptor outputRecDesc) {
+ this.ctx = ctx;
+ this.inputRecDesc = inputRecDesc;
+ this.outputRecDesc = outputRecDesc;
+
+ readNode = new NodeWritable();
+ readKmer = new KmerBytesWritable();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+ writeBuffer = ctx.allocateFrame();
+ builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(writeBuffer, true);
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ generateNodeFromKmer(i);
+ }
+ }
+
+ private void generateNodeFromKmer(int tIndex) throws HyracksDataException {
+ int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+
+ setKmer(readKmer, offsetPoslist + accessor.getFieldStartOffset(tIndex, InputKmerField));
+ readNode.reset();
+ setNode(readNode, offsetPoslist + accessor.getFieldStartOffset(tIndex, InputtempNodeField));
+
+ outputNode(readNode);
+ }
+
+
+ private void setKmer(KmerBytesWritable kmer, int offset) {
+ ByteBuffer buffer = accessor.getBuffer();
+ kmer.setAsCopy(buffer.array(), offset);
+ }
+
+ private void setNode(NodeWritable node, int offset) {
+ ByteBuffer buffer = accessor.getBuffer();
+ node.setAsCopy(buffer.array(), offset);
+ }
+
+
+ private void outputNode(NodeWritable node) throws HyracksDataException {
+
+ try {
+ builder.reset();
+ builder.addField(node.marshalToByteArray(), 0, node.getSerializedLength());
+
+ if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ appender.reset(writeBuffer, true);
+ if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+ throw new IllegalStateException("Failed to append tuplebuilder to frame");
+ }
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to Add a field to the tupleBuilder.");
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+ writer.close();
+ }
+
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new MapReadToNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
+ recordDescriptors[0]);
+ }
+
+}
+
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
index b7b7054..3836b13 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
@@ -17,7 +17,7 @@
import java.io.DataOutput;
import java.io.IOException;
-import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.assembleKeyIntoNodeOperator;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -33,8 +33,7 @@
*/
private static final long serialVersionUID = 1L;
private final int kmerSize;
- public static final int OutputKmerField = ReadsKeyValueParserFactory.OutputKmerField;
- public static final int outputNodeField = ReadsKeyValueParserFactory.OutputNodeField;
+ public static final int OutputNodeField = assembleKeyIntoNodeOperator.OutputNodeField;
public NodeTextWriterFactory(int k) {
this.kmerSize = k;
@@ -53,9 +52,7 @@
@Override
public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
- node.setAsReference(tuple.getFieldData(outputNodeField), tuple.getFieldStart(outputNodeField));
- node.getKmer().reset(kmerSize);
- node.getKmer().setAsReference(tuple.getFieldData(OutputKmerField), tuple.getFieldStart(OutputKmerField));
+ node.setAsReference(tuple.getFieldData(OutputNodeField), tuple.getFieldStart(OutputNodeField));
try {
output.write(node.toString().getBytes());
output.writeByte('\n');
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
index afc1cf7..5b35eac 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
@@ -29,6 +29,7 @@
import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.assembleKeyIntoNodeOperator;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.AggregateKmerAggregateFactory;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.MergeKmerAggregateFactory;
import edu.uci.ics.genomix.hyracks.newgraph.io.NodeTextWriterFactory;
@@ -181,6 +182,16 @@
return kmerCrossAggregator;
}
+ public AbstractOperatorDescriptor generateKmerToFinalNode(JobSpecification jobSpec,
+ AbstractOperatorDescriptor kmerCrossAggregator) {
+
+ AbstractOperatorDescriptor mapToFinalNode = new assembleKeyIntoNodeOperator(jobSpec,
+ assembleKeyIntoNodeOperator.nodeOutputRec, kmerSize);
+ connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapToFinalNode, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ return mapToFinalNode;
+ }
+
public AbstractOperatorDescriptor generateNodeWriterOpertator(JobSpecification jobSpec,
AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
ITupleWriterFactory nodeWriter = null;
@@ -209,10 +220,15 @@
logDebug("Group by Kmer");
AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+ logDebug("Generate final node");
+ lastOperator = generateKmerToFinalNode(jobSpec, lastOperator);
+
+ jobSpec.addRoot(lastOperator);
+
logDebug("Write node to result");
lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);
- jobSpec.addRoot(readOperator);//what's this? why we need this? why I can't seet it in the JobGenCheckReader
+ jobSpec.addRoot(lastOperator);
return jobSpec;
}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
index 25915aa..bf87b23 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
@@ -32,7 +32,7 @@
@SuppressWarnings("deprecation")
public class JobRun {
private static final int KmerSize = 5;
- private static final int ReadLength = 6;
+ private static final int ReadLength = 7;
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
@@ -52,8 +52,8 @@
@Test
public void TestAll() throws Exception {
- TestReader();
-// TestGroupby();
+// TestReader();
+ TestGroupby();
}
public void TestReader() throws Exception {
@@ -68,7 +68,7 @@
cleanUpReEntry();
conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-// dumpResult();
+ dumpResult();
}
@Before
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 51a0d15..fbbc89a 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -46,11 +46,11 @@
@SuppressWarnings("deprecation")
public class JobRunStepByStepTest {
private static final int KmerSize = 5;
- private static final int ReadLength = 8;
+ private static final int ReadLength = 7;
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/test1.txt";
+ private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/test.txt";
private static final String HDFS_INPUT_PATH = "/webmap";
private static final String HDFS_OUTPUT_PATH = "/webmap_result";
@@ -75,11 +75,11 @@
@Test
public void TestAll() throws Exception {
-// TestReader();
+ TestReader();
// TestGroupbyKmer();
// TestMapKmerToRead();
// TestGroupByReadID();
- TestEndToEnd();
+// TestEndToEnd();
// TestUnMergedNode();
}
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/SplitRepeat.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/SplitRepeat.txt
new file mode 100644
index 0000000..bb03d70
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/SplitRepeat.txt
@@ -0,0 +1,2 @@
+1 AATAG
+2 CATAC
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
index 3f1cd5c..a720dc4 100644
--- a/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
@@ -1 +1 @@
-1 AATAGA
+1 AATAGAA
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 9cdac8f..e0b21bc 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -313,7 +313,7 @@
*/
public void processMerges(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete,
byte neighborToMergeDir, KmerBytesWritable nodeToAdd,
- int kmerSize, KmerBytesWritable kmer){
+ int kmerSize, VKmerBytesWritable kmer){
switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
case MessageFlag.DIR_FF:
this.getFFList().remove(nodeToDelete); //set(null);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
index 64965e3..fc0cd4b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
@@ -95,7 +95,7 @@
/**
* get destination vertex
*/
- public KmerBytesWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
+ public VKmerBytesWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
posIterator = value.getFFList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
@@ -112,7 +112,7 @@
}
- public KmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
+ public VKmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
posIterator = value.getRFList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
index ecd3c4f..186fa4c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
@@ -51,7 +51,7 @@
kmerList.reset();
if(fakeVertex == null){
// fakeVertex = new KmerBytesWritable(kmerSize + 1); // TODO check if merge is correct
- fakeVertex = new KmerBytesWritable();
+ fakeVertex = new VKmerBytesWritable();
String random = generaterRandomString(kmerSize + 1);
fakeVertex.setByRead(random.getBytes(), 0);
}
@@ -187,7 +187,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}