Merge branch 'jianfeng/genomix' into jianfeng/genomix-reverse
Conflicts:
genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index c0b00a7..65931a2 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -13,21 +13,22 @@
*/
private static final long serialVersionUID = 1L;
private PositionWritable nodeID;
- private PositionListWritable incomingList;
- private PositionListWritable outgoingList;
+ private PositionListWritable forwardForwardList;
+ private PositionListWritable forwardReverseList;
+ private PositionListWritable reverseForwardList;
+ private PositionListWritable reverseReverseList;
private KmerBytesWritable kmer;
public NodeWritable() {
- nodeID = new PositionWritable();
- incomingList = new PositionListWritable();
- outgoingList = new PositionListWritable();
- kmer = new KmerBytesWritable();
+ this(21);
}
public NodeWritable(int kmerSize) {
- nodeID = new PositionWritable();
- incomingList = new PositionListWritable();
- outgoingList = new PositionListWritable();
+ nodeID = new PositionWritable(0,(byte) 0);
+ forwardForwardList = new PositionListWritable();
+ forwardReverseList = new PositionListWritable();
+ reverseForwardList = new PositionListWritable();
+ reverseReverseList = new PositionListWritable();
kmer = new KmerBytesWritable(kmerSize);
}
@@ -39,32 +40,33 @@
nodeID.set(readID, posInRead);
}
- public void setIncomingList(PositionListWritable incoming) {
- incomingList.set(incoming);
- }
-
-
- public void setOutgoingList(PositionListWritable outgoing) {
- outgoingList.set(outgoing);
- }
-
public void setKmer(KmerBytesWritable right) {
this.kmer.set(right);
}
public void reset(int kmerSize) {
nodeID.set(0, (byte) 0);
- incomingList.reset();
- outgoingList.reset();
+ forwardForwardList.reset();
+ forwardReverseList.reset();
+ reverseForwardList.reset();
+ reverseReverseList.reset();
kmer.reset(kmerSize);
}
- public PositionListWritable getIncomingList() {
- return incomingList;
+ public PositionListWritable getFFList() {
+ return forwardForwardList;
+ }
+
+ public PositionListWritable getFRList() {
+ return forwardReverseList;
}
- public PositionListWritable getOutgoingList() {
- return outgoingList;
+ public PositionListWritable getRFList() {
+ return reverseForwardList;
+ }
+
+ public PositionListWritable getRRList() {
+ return reverseReverseList;
}
public PositionWritable getNodeID() {
@@ -79,36 +81,44 @@
return kmer.getKmerLength();
}
- public void mergeNext(NodeWritable nextNode, int initialKmerSize) {
- this.outgoingList.set(nextNode.outgoingList);
+ public void mergeForwadNext(NodeWritable nextNode, int initialKmerSize) {
+ this.forwardForwardList.set(nextNode.forwardForwardList);
+ this.forwardReverseList.set(nextNode.forwardReverseList);
kmer.mergeNextKmer(initialKmerSize, nextNode.getKmer());
}
- public void mergePre(NodeWritable preNode, int initialKmerSize){
- this.incomingList.set(preNode.incomingList);
+ public void mergeForwardPre(NodeWritable preNode, int initialKmerSize){
+ this.reverseForwardList.set(preNode.reverseForwardList);
+ this.reverseReverseList.set(preNode.reverseReverseList);
kmer.mergePreKmer(initialKmerSize, preNode.getKmer());
}
public void set(NodeWritable node) {
this.nodeID.set(node.getNodeID().getReadID(), node.getNodeID().getPosInRead());
- this.incomingList.set(node.getIncomingList());
- this.outgoingList.set(node.getOutgoingList());
+ this.forwardForwardList.set(node.forwardForwardList);
+ this.forwardReverseList.set(node.forwardReverseList);
+ this.reverseForwardList.set(node.reverseForwardList);
+ this.reverseReverseList.set(node.reverseReverseList);
this.kmer.set(node.kmer);
}
@Override
public void readFields(DataInput in) throws IOException {
this.nodeID.readFields(in);
- this.incomingList.readFields(in);
- this.outgoingList.readFields(in);
+ this.forwardForwardList.readFields(in);
+ this.forwardReverseList.readFields(in);
+ this.reverseForwardList.readFields(in);
+ this.reverseReverseList.readFields(in);
this.kmer.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
this.nodeID.write(out);
- this.incomingList.write(out);
- this.outgoingList.write(out);
+ this.forwardForwardList.write(out);
+ this.forwardReverseList.write(out);
+ this.reverseForwardList.write(out);
+ this.reverseReverseList.write(out);
this.kmer.write(out);
}
@@ -127,17 +137,27 @@
StringBuilder sbuilder = new StringBuilder();
sbuilder.append('(');
sbuilder.append(nodeID.toString()).append('\t');
- sbuilder.append(incomingList.toString()).append('\t');
- sbuilder.append(outgoingList.toString()).append('\t');
+ sbuilder.append(forwardForwardList.toString()).append('\t');
+ sbuilder.append(forwardReverseList.toString()).append('\t');
+ sbuilder.append(reverseForwardList.toString()).append('\t');
+ sbuilder.append(reverseReverseList.toString()).append('\t');
sbuilder.append(kmer.toString()).append(')');
return sbuilder.toString();
}
+ public int inDegree(){
+ return reverseReverseList.getCountOfPosition() + reverseForwardList.getCountOfPosition();
+ }
+
+ public int outDegree(){
+ return forwardForwardList.getCountOfPosition() + forwardReverseList.getCountOfPosition();
+ }
+
/*
* Return if this node is a "path" compressible node, that is, it has an in-degree and out-degree of 1
*/
public boolean isPathNode() {
- return incomingList.getCountOfPosition() == 1 && outgoingList.getCountOfPosition() == 1;
+ return inDegree() == 1 && outDegree() == 1;
}
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NoteWritableFactory.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NoteWritableFactory.java
deleted file mode 100644
index 55c6ca7..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NoteWritableFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package edu.uci.ics.genomix.type;
-
-public class NoteWritableFactory {
- private NodeWritable node;
- private KmerBytesWritableFactory kmerBytesWritableFactory;
- private int kmerSize = 55;
-
- public NoteWritableFactory() {
- node = new NodeWritable();
- kmerBytesWritableFactory = new KmerBytesWritableFactory(kmerSize);
- }
-
- public NodeWritable append(final NodeWritable orignalNode, final KmerBytesWritable appendKmer){
- KmerBytesWritable preKmer = orignalNode.getKmer();
- node.setKmer(kmerBytesWritableFactory.mergeTwoKmer(preKmer,appendKmer));
- return node;
- }
-
- public NodeWritable append(final NodeWritable orignalNode, final NodeWritable appendNode) {
- KmerBytesWritable nextKmer = kmerBytesWritableFactory.getSubKmerFromChain(kmerSize - 2, appendNode.getKmer().kmerlength - kmerSize + 2,
- appendNode.getKmer());
- return append(orignalNode, nextKmer);
- }
-
- public NodeWritable prepend(final NodeWritable orignalNode, final KmerBytesWritable prependKmer){
- KmerBytesWritable nextKmer = orignalNode.getKmer();
- node.setKmer(kmerBytesWritableFactory.mergeTwoKmer(prependKmer,nextKmer));
- return node;
- }
-
- public NodeWritable prepend(final NodeWritable orignalNode, final NodeWritable prependNode) {
- KmerBytesWritable prependKmer = kmerBytesWritableFactory.getSubKmerFromChain(kmerSize - 2, orignalNode.getKmer().kmerlength - kmerSize + 2,
- orignalNode.getKmer());
- return prepend(orignalNode, prependKmer);
- }
-
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
index f77c844..10821b0 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
@@ -42,7 +42,7 @@
public void set(PositionWritable pos) {
set(pos.getReadID(), pos.getPosInRead());
}
-
+
public void set(int readID, byte posInRead) {
Marshal.putInt(readID, storage, offset);
storage[offset + INTBYTES] = posInRead;
@@ -67,11 +67,11 @@
public int getLength() {
return LENGTH;
}
-
- public boolean isSameReadID(PositionWritable other){
+
+ public boolean isSameReadID(PositionWritable other) {
return getReadID() == other.getReadID();
}
-
+
@Override
public void readFields(DataInput in) throws IOException {
in.readFully(storage, offset, LENGTH);
@@ -138,7 +138,7 @@
return diff;
}
}
-
+
static { // register this comparator
WritableComparator.define(PositionWritable.class, new Comparator());
}
diff --git a/genomix/genomix-hadoop/pom.xml b/genomix/genomix-hadoop/pom.xml
index fe83c3c..610092a 100755
--- a/genomix/genomix-hadoop/pom.xml
+++ b/genomix/genomix-hadoop/pom.xml
@@ -156,5 +156,23 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>genomix-hyracks</artifactId>
+ <version>0.2.6-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>0.2.6-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>0.2.6-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
new file mode 100644
index 0000000..b405161
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
@@ -0,0 +1,215 @@
+package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.test.TestUtils;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
+
+@SuppressWarnings("deprecation")
+public class TestPathMergeH3 {
+ private static final int KMER_LENGTH = 5;
+ private static final int READ_LENGTH = 8;
+
+ private static final String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
+ private static final String HDFS_SEQUENCE = "/00-sequence/";
+ private static final String HDFS_GRAPHBUILD = "/01-graphbuild/";
+ private static final String HDFS_MERGED = "/02-graphmerge/";
+
+ private static final String EXPECTED_ROOT = "src/test/resources/expected/";
+ private static final String ACTUAL_ROOT = "src/test/resources/actual/";
+ private static final String GRAPHBUILD_FILE = "result.graphbuild.txt";
+ private static final String PATHMERGE_FILE = "result.mergepath.txt";
+
+ private static final String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
+
+ private MiniDFSCluster dfsCluster;
+
+ private static JobConf conf = new JobConf();
+ private int numberOfNC = 2;
+ private int numPartitionPerMachine = 1;
+
+ private Driver driver;
+
+ @Test
+ public void TestBuildGraph() throws Exception {
+ copySequenceToDFS();
+ buildGraph();
+ }
+
+// @Test
+ public void TestMergeOneIteration() throws Exception {
+ copySequenceToDFS();
+ buildGraph();
+ MergePathsH3Driver h3 = new MergePathsH3Driver();
+ h3.run(HDFS_GRAPHBUILD, HDFS_MERGED, 2, KMER_LENGTH, 1, null, conf);
+ copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, conf);
+ }
+
+
+
+ public void buildGraph() throws Exception {
+ FileInputFormat.setInputPaths(conf, HDFS_SEQUENCE);
+ FileOutputFormat.setOutputPath(conf, new Path(HDFS_GRAPHBUILD));
+ conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
+ conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+ driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+ copyResultsToLocal(HDFS_GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD_FILE, conf);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ cleanupStores();
+ HyracksUtils.init();
+ FileUtils.forceMkdir(new File(ACTUAL_ROOT));
+ FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
+ startHDFS();
+
+ conf.setInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH);
+ conf.setInt(GenomixJobConf.READ_LENGTH, READ_LENGTH);
+ driver = new Driver(HyracksUtils.CC_HOST,
+ HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
+ }
+
+ /*
+ * Merge and copy a DFS directory to a local destination, converting to text if necessary. Also locally store the binary-formatted result if available.
+ */
+ private static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, Configuration conf) throws IOException {
+ String fileFormat = conf.get(GenomixJobConf.OUTPUT_FORMAT);
+ if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(fileFormat)) {
+ // for text files, just concatenate them together
+ FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
+ FileSystem.getLocal(new Configuration()), new Path(localDestFile),
+ false, conf, null);
+ } else {
+ // file is binary
+ // merge and store the binary format
+ FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
+ FileSystem.getLocal(new Configuration()), new Path(localDestFile + ".bin"),
+ false, conf, null);
+ // load the Node's and write them out as text locally
+ FileSystem.getLocal(new Configuration()).mkdirs(new Path(localDestFile).getParent());
+ File filePathTo = new File(localDestFile);
+ BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+ for (int i=0; i < java.lang.Integer.MAX_VALUE; i++) {
+ Path path = new Path(hdfsSrcDir + "part-" + i);
+ FileSystem dfs = FileSystem.get(conf);
+ if (!dfs.exists(path)) {
+ break;
+ }
+ if (dfs.getFileStatus(path).getLen() == 0) {
+ continue;
+ }
+ SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
+ NodeWritable key = new NodeWritable(conf.getInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH));
+ NullWritable value = NullWritable.get();
+ while (reader.next(key, value)) {
+ if (key == null || value == null) {
+ break;
+ }
+ bw.write(key.toString() + "\t" + value.toString());
+ System.out.println(key.toString() + "\t" + value.toString());
+ bw.newLine();
+ }
+ reader.close();
+ }
+ bw.close();
+ }
+
+ }
+
+ private boolean checkResults(String expectedPath, String actualPath, int[] poslistField) throws Exception {
+ File dumped = new File(actualPath);
+ if (poslistField != null) {
+ TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
+ } else {
+ TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+ }
+ return true;
+ }
+
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(HADOOP_CONF_ROOT + "core-site.xml"));
+ conf.addResource(new Path(HADOOP_CONF_ROOT + "mapred-site.xml"));
+ conf.addResource(new Path(HADOOP_CONF_ROOT + "hdfs-site.xml"));
+
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_ROOT + "conf.xml")));
+ conf.writeXml(confOutput);
+ confOutput.close();
+ }
+
+ private void copySequenceToDFS() throws IOException {
+ FileSystem dfs = FileSystem.get(conf);
+ Path src = new Path(LOCAL_SEQUENCE_FILE);
+ Path dest = new Path(HDFS_SEQUENCE);
+ dfs.mkdirs(dest);
+ // dfs.mkdirs(result);
+ dfs.copyFromLocalFile(src, dest);
+ }
+
+ @BeforeClass
+ public static void cleanUpEntry() throws IOException {
+ // local cleanup
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ if (lfs.exists(new Path(ACTUAL_ROOT))) {
+ lfs.delete(new Path(ACTUAL_ROOT), true);
+ }
+ // dfs cleanup
+ FileSystem dfs = FileSystem.get(conf);
+ String[] paths = {HDFS_SEQUENCE, HDFS_GRAPHBUILD, HDFS_MERGED};
+ for (String path : paths) {
+ if (dfs.exists(new Path(path))) {
+ dfs.delete(new Path(path), true);
+ }
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ HyracksUtils.deinit();
+ cleanupHDFS();
+ }
+
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
+}
diff --git a/genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt b/genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt
new file mode 100755
index 0000000..13190dd
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt
@@ -0,0 +1,6 @@
+1 AATAGAAG
+2 AATAGAAG
+3 AATAGAAG
+4 AATAGAAG
+5 AATAGAAG
+6 AGAAGAAG
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/core-site.xml b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/core-site.xml
new file mode 100644
index 0000000..3e5bacb
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/core-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+ <property>
+ <name>fs.default.name</name>
+ <value>hdfs://127.0.0.1:31888</value>
+ </property>
+ <property>
+ <name>hadoop.tmp.dir</name>
+ <value>/tmp/hadoop</value>
+ </property>
+
+
+</configuration>
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/hdfs-site.xml b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/hdfs-site.xml
new file mode 100644
index 0000000..b1b1902
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/hdfs-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+ <property>
+ <name>dfs.replication</name>
+ <value>1</value>
+ </property>
+
+ <property>
+ <name>dfs.block.size</name>
+ <value>65536</value>
+ </property>
+
+</configuration>
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/log4j.properties b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/log4j.properties
new file mode 100755
index 0000000..d5e6004
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/log4j.properties
@@ -0,0 +1,94 @@
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=FATAL,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=FATAL
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Rolling File Appender
+#
+
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# FSNamesystem Audit logging
+# All audit events are logged at INFO level
+#
+log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+# Jets3t library
+log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/mapred-site.xml b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/mapred-site.xml
new file mode 100644
index 0000000..525e7d5
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/mapred-site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+ <property>
+ <name>mapred.job.tracker</name>
+ <value>localhost:29007</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.map.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.reduce.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.max.split.size</name>
+ <value>2048</value>
+ </property>
+
+</configuration>
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
index 0b0dcf2..f98c684 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
@@ -13,5 +13,7 @@
super(kmerSize);
// TODO Auto-generated constructor stub
}
+
+
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
index 4ea5a84..8609519 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
@@ -24,11 +24,15 @@
public class MapKmerPositionToReadOperator extends AbstractSingleActivityOperatorDescriptor {
- public MapKmerPositionToReadOperator(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc) {
+ public MapKmerPositionToReadOperator(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc, int readlength,
+ int kmerSize) {
super(spec, 1, 1);
recordDescriptors[0] = recDesc;
+ LAST_POSID = readlength - kmerSize + 1;
}
+ private final int LAST_POSID;
+
private static final long serialVersionUID = 1L;
public static final int InputKmerField = 0;
public static final int InputPosListField = 1;
@@ -93,6 +97,10 @@
}
}
+ private boolean isStart(byte posInRead) {
+ return posInRead == 1 || posInRead == -LAST_POSID;
+ }
+
private void scanPosition(int tIndex, ArrayBackedValueStorage zeroPositionCollection2,
ArrayBackedValueStorage noneZeroPositionCollection2) {
zeroPositionCollection2.reset();
@@ -102,7 +110,7 @@
+ accessor.getFieldStartOffset(tIndex, InputPosListField);
for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
positionEntry.setNewReference(data, offsetPoslist + i);
- if (positionEntry.getPosInRead() == 0) {
+ if (isStart(positionEntry.getPosInRead())) {
zeroPositionCollection2.append(positionEntry);
} else {
noneZeroPositionCollection2.append(positionEntry);
@@ -118,7 +126,7 @@
+ accessor.getFieldStartOffset(tIndex, InputPosListField);
for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
positionEntry.setNewReference(data, offsetPoslist + i);
- if (positionEntry.getPosInRead() != 0) {
+ if (!isStart(positionEntry.getPosInRead())) {
appendNodeToBuilder(tIndex, positionEntry, zeroPositionCollection, builder2);
} else {
appendNodeToBuilder(tIndex, positionEntry, noneZeroPositionCollection, builder2);
@@ -139,11 +147,16 @@
writePosToFieldAndSkipSameReadID(pos, builder2.getDataOutput(), posList2);
builder2.addFieldEndOffset();
}
- // set kmer, may not useful
- byte[] data = accessor.getBuffer().array();
- int offsetKmer = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(tIndex, InputKmerField);
- builder2.addField(data, offsetKmer, accessor.getFieldLength(tIndex, InputKmerField));
+ // set kmer, may not useful,
+ // the reversed ID don't need to output the kmer
+ if (pos.getPosInRead() > 0) {
+ byte[] data = accessor.getBuffer().array();
+ int offsetKmer = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, InputKmerField);
+ builder2.addField(data, offsetKmer, accessor.getFieldLength(tIndex, InputKmerField));
+ } else {
+ builder2.addFieldEndOffset();
+ }
if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
FrameUtils.flushFrame(writeBuffer, writer);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
index cddbe4d..98d21af 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -40,12 +40,13 @@
public static final int OutputNodeIDField = 0;
public static final int OutputCountOfKmerField = 1;
- public static final int OutputIncomingField = 2;
- public static final int OutputOutgoingField = 3;
- public static final int OutputKmerBytesField = 4;
+ public static final int OutputForwardForwardField = 2;
+ public static final int OutputForwardReverseField = 3;
+ public static final int OutputReverseForwardField = 4;
+ public static final int OutputReverseReverseField = 5;
+ public static final int OutputKmerBytesField = 6;
- public static final RecordDescriptor nodeOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
- null, null, null, null });
+ public static final RecordDescriptor nodeOutputRec = new RecordDescriptor(new ISerializerDeserializer[7]);
/**
* (ReadID, Storage[posInRead]={len, PositionList, len, Kmer})
@@ -57,6 +58,8 @@
private final RecordDescriptor inputRecDesc;
private final RecordDescriptor outputRecDesc;
+ private final int LAST_POSITION_ID;
+
private FrameTupleAccessor accessor;
private ByteBuffer writeBuffer;
private ArrayTupleBuilder builder;
@@ -66,6 +69,8 @@
private NodeReference nextNodeEntry;
private NodeReference nextNextNodeEntry;
+ private PositionListWritable cachePositionList;
+
public MapReadToNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
RecordDescriptor outputRecDesc) {
this.ctx = ctx;
@@ -74,6 +79,8 @@
curNodeEntry = new NodeReference(kmerSize);
nextNodeEntry = new NodeReference(kmerSize);
nextNextNodeEntry = new NodeReference(0);
+ cachePositionList = new PositionListWritable();
+ LAST_POSITION_ID = (inputRecDesc.getFieldCount() - InputInfoFieldStart) / 2;
}
@Override
@@ -100,104 +107,159 @@
int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
int readID = accessor.getBuffer().getInt(
offsetPoslist + accessor.getFieldStartOffset(tIndex, InputReadIDField));
- resetNode(curNodeEntry, readID, (byte) 0,
- offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart), true);
+ if ((accessor.getFieldCount() - InputInfoFieldStart) % 2 != 0) {
+ throw new IllegalArgumentException("field length is odd");
+ }
- for (int i = InputInfoFieldStart + 1; i < accessor.getFieldCount(); i++) {
- resetNode(nextNodeEntry, readID, (byte) (i - InputInfoFieldStart),
- offsetPoslist + accessor.getFieldStartOffset(tIndex, i), true);
- NodeReference pNextNext = null;
- if (i + 1 < accessor.getFieldCount()) {
- resetNode(nextNextNodeEntry, readID, (byte) (i - InputInfoFieldStart + 1),
- offsetPoslist + accessor.getFieldStartOffset(tIndex, i + 1), false);
- pNextNext = nextNextNodeEntry;
- }
+ resetNode(curNodeEntry, readID, (byte) (1));
+ setForwardIncomingList(curNodeEntry,
+ offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart));
+ setKmer(curNodeEntry.getKmer(), offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart));
+ if (curNodeEntry.getNodeID().getPosInRead() == LAST_POSITION_ID) {
+ setReverseIncomingList(curNodeEntry,
+ offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart + 1));
+ }
- if (nextNodeEntry.getOutgoingList().getCountOfPosition() == 0) {
- if (pNextNext == null || pNextNext.getOutgoingList().getCountOfPosition() == 0) {
- curNodeEntry.mergeNext(nextNodeEntry, kmerSize);
- } else {
- curNodeEntry.getOutgoingList().reset();
- curNodeEntry.getOutgoingList().append(nextNodeEntry.getNodeID());
- outputNode(curNodeEntry);
+ // next Node
+ readNodesInfo(tIndex, readID, curNodeEntry, nextNodeEntry, InputInfoFieldStart);
- nextNodeEntry.getIncomingList().append(curNodeEntry.getNodeID());
- curNodeEntry.set(nextNodeEntry);
- }
- } else { // nextNode entry outgoing > 0
- curNodeEntry.getOutgoingList().set(nextNodeEntry.getOutgoingList());
- curNodeEntry.getOutgoingList().append(nextNodeEntry.getNodeID());
- nextNodeEntry.getIncomingList().append(curNodeEntry.getNodeID());
+ for (int i = InputInfoFieldStart + 2; i < accessor.getFieldCount(); i += 2) {
+ readNodesInfo(tIndex, readID, nextNodeEntry, nextNextNodeEntry, i);
+
+ if (curNodeEntry.inDegree() > 1 || curNodeEntry.outDegree() > 0 || nextNodeEntry.inDegree() > 0
+ || nextNodeEntry.outDegree() > 0 || nextNextNodeEntry.inDegree() > 0
+ || nextNextNodeEntry.outDegree() > 0) {
+ connect(curNodeEntry, nextNodeEntry);
outputNode(curNodeEntry);
curNodeEntry.set(nextNodeEntry);
- curNodeEntry.getOutgoingList().reset();
+ nextNodeEntry.set(nextNextNodeEntry);
+ continue;
}
+ curNodeEntry.mergeForwadNext(nextNodeEntry, kmerSize);
+ nextNodeEntry.set(nextNextNodeEntry);
}
outputNode(curNodeEntry);
}
- private void resetNode(NodeReference node, int readID, byte posInRead, int offset, boolean isInitial) {
+ private void readNodesInfo(int tIndex, int readID, NodeReference curNode, NodeReference nextNode, int curFieldID) {
+ // nextNext node
+ int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+ if (curFieldID + 2 < accessor.getFieldCount()) {
+ setForwardOutgoingList(curNode, offsetPoslist + accessor.getFieldStartOffset(tIndex, curFieldID + 2));
+ resetNode(nextNode, readID, (byte) (1 + (curFieldID + 2 - InputInfoFieldStart) / 2));
+ setKmer(nextNode.getKmer(), offsetPoslist + accessor.getFieldStartOffset(tIndex, curFieldID + 2));
+ setReverseOutgoingList(nextNode, offsetPoslist + accessor.getFieldStartOffset(tIndex, curFieldID + 1));
+ if (nextNode.getNodeID().getPosInRead() == LAST_POSITION_ID) {
+ setReverseIncomingList(nextNode,
+ offsetPoslist + accessor.getFieldStartOffset(tIndex, curFieldID + 3));
+ }
+ } else {
+ resetNode(nextNode, readID, (byte) 0);
+ }
+ }
+
+ private void setKmer(KmerBytesWritable kmer, int offset) {
+ ByteBuffer buffer = accessor.getBuffer();
+ int length = buffer.getInt(offset);
+ offset += INT_LENGTH + length;
+ length = buffer.getInt(offset);
+ if (kmer.getLength() != length) {
+ throw new IllegalArgumentException("kmer size is invalid");
+ }
+ offset += INT_LENGTH;
+ kmer.set(buffer.array(), offset);
+ }
+
+ private void connect(NodeReference curNode, NodeReference nextNode) {
+ curNode.getFFList().append(nextNode.getNodeID());
+ nextNode.getRRList().append(curNode.getNodeID());
+ }
+
+ private void setCachList(int offset) {
+ ByteBuffer buffer = accessor.getBuffer();
+ int count = PositionListWritable.getCountByDataLength(buffer.getInt(offset));
+ cachePositionList.set(count, buffer.array(), offset + INT_LENGTH);
+ }
+
+ private void resetNode(NodeReference node, int readID, byte posInRead) {
node.reset(kmerSize);
node.setNodeID(readID, posInRead);
+ }
- ByteBuffer buffer = accessor.getBuffer();
- int lengthPos = buffer.getInt(offset);
- int countPosition = PositionListWritable.getCountByDataLength(lengthPos);
- offset += INT_LENGTH;
- if (posInRead == 0) {
- setPositionList(node.getIncomingList(), countPosition, buffer.array(), offset, true);
- // minus 1 position of the incoming list to get the correct predecessor
- for (PositionWritable pos : node.getIncomingList()) {
- if (pos.getPosInRead() == 0) {
- for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
- System.out.println(ste);
- }
- throw new IllegalArgumentException("The incoming position list contain invalid posInRead");
+ private void setReverseOutgoingList(NodeReference node, int offset) {
+ setCachList(offset);
+ for (PositionWritable pos : cachePositionList) {
+ if (pos.getPosInRead() > 0) {
+ node.getRFList().append(pos);
+ } else {
+ node.getRRList().append(pos.getReadID(), (byte) -pos.getPosInRead());
+ }
+ }
+ }
+
+ private void setReverseIncomingList(NodeReference node, int offset) {
+ setCachList(offset);
+ for (PositionWritable pos : cachePositionList) {
+ if (pos.getPosInRead() > 0) {
+ if (pos.getPosInRead() > 1) {
+ node.getFRList().append(pos.getReadID(), (byte) (pos.getPosInRead() - 1));
+ } else {
+ throw new IllegalArgumentException("Invalid position");
}
- pos.set(pos.getReadID(), (byte) (pos.getPosInRead() - 1));
+ } else {
+ if (pos.getPosInRead() > -LAST_POSITION_ID) {
+ node.getFFList().append(pos.getReadID(), (byte) -(pos.getPosInRead() - 1));
+ }
}
- } else {
- setPositionList(node.getOutgoingList(), countPosition, buffer.array(), offset, isInitial);
- }
- offset += lengthPos;
- int lengthKmer = buffer.getInt(offset);
- if (node.getKmer().getLength() != lengthKmer) {
- for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
- System.out.println(ste);
- }
- throw new IllegalStateException("Size of Kmer is invalid ");
- }
- setKmer(node.getKmer(), buffer.array(), offset + INT_LENGTH, isInitial);
- }
-
- private void setKmer(KmerBytesWritable kmer, byte[] array, int offset, boolean isInitial) {
- if (isInitial) {
- kmer.set(array, offset);
- } else {
- kmer.setNewReference(array, offset);
}
}
- private void setPositionList(PositionListWritable positionListWritable, int count, byte[] array, int offset,
- boolean isInitial) {
- if (isInitial) {
- positionListWritable.set(count, array, offset);
- } else {
- positionListWritable.setNewReference(count, array, offset);
+ private void setForwardOutgoingList(NodeReference node, int offset) {
+ setCachList(offset);
+ for (PositionWritable pos : cachePositionList) {
+ if (pos.getPosInRead() > 0) {
+ node.getFFList().append(pos);
+ } else {
+ node.getFRList().append(pos.getReadID(), (byte) -pos.getPosInRead());
+ }
+ }
+ }
+
+ private void setForwardIncomingList(NodeReference node, int offset) {
+ setCachList(offset);
+ for (PositionWritable pos : cachePositionList) {
+ if (pos.getPosInRead() > 0) {
+ if (pos.getPosInRead() > 1) {
+ node.getRRList().append(pos.getReadID(), (byte) (pos.getPosInRead() - 1));
+ } else {
+ throw new IllegalArgumentException("position id is invalid");
+ }
+ } else {
+ if (pos.getPosInRead() > -LAST_POSITION_ID) {
+ node.getRFList().append(pos.getReadID(), (byte) -(pos.getPosInRead() - 1));
+ }
+ }
}
}
private void outputNode(NodeReference node) throws HyracksDataException {
+ if (node.getNodeID().getPosInRead() == 0) {
+ return;
+ }
try {
builder.reset();
builder.addField(node.getNodeID().getByteArray(), node.getNodeID().getStartOffset(), node.getNodeID()
.getLength());
builder.getDataOutput().writeInt(node.getCount());
builder.addFieldEndOffset();
- builder.addField(node.getIncomingList().getByteArray(), node.getIncomingList().getStartOffset(), node
- .getIncomingList().getLength());
- builder.addField(node.getOutgoingList().getByteArray(), node.getOutgoingList().getStartOffset(), node
- .getOutgoingList().getLength());
+ builder.addField(node.getFFList().getByteArray(), node.getFFList().getStartOffset(), node.getFFList()
+ .getLength());
+ builder.addField(node.getFRList().getByteArray(), node.getFRList().getStartOffset(), node.getFRList()
+ .getLength());
+ builder.addField(node.getRFList().getByteArray(), node.getRFList().getStartOffset(), node.getRFList()
+ .getLength());
+ builder.addField(node.getRRList().getByteArray(), node.getRRList().getStartOffset(), node.getRRList()
+ .getLength());
builder.addField(node.getKmer().getBytes(), node.getKmer().getOffset(), node.getKmer().getLength());
if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
@@ -230,7 +292,6 @@
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- // TODO Auto-generated method stub
return new MapReadToNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
recordDescriptors[0]);
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index 85aee5b..3667d43 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -41,10 +41,9 @@
public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
-
+
public static final int OutputKmerField = 0;
public static final int OutputPosition = 1;
-
private final boolean bReversed;
private final int readLength;
@@ -81,7 +80,7 @@
try {
readID = Integer.parseInt(geneLine[0]);
} catch (NumberFormatException e) {
- LOG.warn("Invalid data " );
+ LOG.warn("Invalid data ");
return;
}
@@ -89,8 +88,8 @@
Matcher geneMatcher = genePattern.matcher(geneLine[1]);
boolean isValid = geneMatcher.matches();
if (isValid) {
- if (geneLine[1].length() != readLength){
- LOG.warn("Invalid readlength at: " + readID );
+ if (geneLine[1].length() != readLength) {
+ LOG.warn("Invalid readlength at: " + readID);
return;
}
SplitReads(readID, geneLine[1].getBytes(), writer);
@@ -103,29 +102,29 @@
return;
}
kmer.setByRead(array, 0);
- InsertToFrame(kmer, readID, 0, writer);
+ InsertToFrame(kmer, readID, 1, writer);
/** middle kmer */
for (int i = kmerSize; i < array.length; i++) {
kmer.shiftKmerWithNextChar(array[i]);
- InsertToFrame(kmer, readID, i - kmerSize + 1, writer);
+ InsertToFrame(kmer, readID, i - kmerSize + 2, writer);
}
if (bReversed) {
/** first kmer */
kmer.setByReadReverse(array, 0);
- InsertToFrame(kmer, -readID, array.length - kmerSize, writer);
+ InsertToFrame(kmer, readID, -1, writer);
/** middle kmer */
for (int i = kmerSize; i < array.length; i++) {
kmer.shiftKmerWithPreCode(GeneCode.getPairedCodeFromSymbol(array[i]));
- InsertToFrame(kmer, -readID, array.length - i - 1, writer);
+ InsertToFrame(kmer, readID, -(i - kmerSize + 2), writer);
}
}
}
private void InsertToFrame(KmerBytesWritable kmer, int readID, int posInRead, IFrameWriter writer) {
try {
- if (posInRead > 127) {
+ if (Math.abs(posInRead) > 127) {
throw new IllegalArgumentException("Position id is beyond 127 at " + readID);
}
tupleBuilder.reset();
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
index 57ba91a..d0fc24b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
@@ -75,7 +75,9 @@
try {
out.writeByte(posInRead);
writeBytesToStorage(out, accessor, tIndex, InputPositionListField);
- writeBytesToStorage(out, accessor, tIndex, InputKmerField);
+ if (posInRead > 0) {
+ writeBytesToStorage(out, accessor, tIndex, InputKmerField);
+ }
} catch (IOException e) {
throw new HyracksDataException("Failed to write into temporary storage");
}
@@ -108,7 +110,9 @@
try {
out.writeByte(posInRead);
writeBytesToStorage(out, accessor, tIndex, InputPositionListField);
- writeBytesToStorage(out, accessor, tIndex, InputKmerField);
+ if (posInRead > 0) {
+ writeBytesToStorage(out, accessor, tIndex, InputKmerField);
+ }
} catch (IOException e) {
throw new HyracksDataException("Failed to write into temporary storage");
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
index 8c8c7fd..0d51035 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -4,8 +4,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import edu.uci.ics.genomix.data.KmerUtil;
-import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -24,11 +22,9 @@
private static final long serialVersionUID = 1L;
private final int ValidPosCount;
- private final int kmerLength;
public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
ValidPosCount = getPositionCount(readLength, kmerLength);
- this.kmerLength = kmerLength;
}
public static int getPositionCount(int readLength, int kmerLength) {
@@ -52,17 +48,24 @@
return new IAggregatorDescriptor() {
class PositionArray {
- public ArrayBackedValueStorage[] storages;
+ public ArrayBackedValueStorage[] forwardStorages;
+ public ArrayBackedValueStorage[] reverseStorages;
public int count;
- public PositionArray(ArrayBackedValueStorage[] storages2, int i) {
- storages = storages2;
- count = i;
+ public PositionArray() {
+ forwardStorages = new ArrayBackedValueStorage[ValidPosCount];
+ reverseStorages = new ArrayBackedValueStorage[ValidPosCount];
+ for (int i = 0; i < ValidPosCount; i++) {
+ forwardStorages[i] = new ArrayBackedValueStorage();
+ reverseStorages[i] = new ArrayBackedValueStorage();
+ }
+ count = 0;
}
public void reset() {
- for (ArrayBackedValueStorage each : storages) {
- each.reset();
+ for (int i = 0; i < ValidPosCount; i++) {
+ forwardStorages[i].reset();
+ reverseStorages[i].reset();
}
count = 0;
}
@@ -70,11 +73,8 @@
@Override
public AggregateState createAggregateStates() {
- ArrayBackedValueStorage[] storages = new ArrayBackedValueStorage[ValidPosCount];
- for (int i = 0; i < storages.length; i++) {
- storages[i] = new ArrayBackedValueStorage();
- }
- return new AggregateState(new PositionArray(storages, 0));
+
+ return new AggregateState(new PositionArray());
}
@Override
@@ -86,35 +86,38 @@
pushIntoStorage(accessor, tIndex, positionArray);
// make fake fields
- for (int i = 0; i < ValidPosCount; i++) {
+ for (int i = 0; i < ValidPosCount * 2; i++) {
tupleBuilder.addFieldEndOffset();
}
}
private void pushIntoStorage(IFrameTupleAccessor accessor, int tIndex, PositionArray positionArray)
throws HyracksDataException {
- ArrayBackedValueStorage[] storages = positionArray.storages;
int leadbyte = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
int fieldOffset = leadbyte + accessor.getFieldStartOffset(tIndex, InputPositionListField);
ByteBuffer fieldBuffer = accessor.getBuffer();
while (fieldOffset < leadbyte + accessor.getFieldEndOffset(tIndex, InputPositionListField)) {
byte posInRead = fieldBuffer.get(fieldOffset);
- if (storages[posInRead].getLength() > 0) {
+
+ ArrayBackedValueStorage[] storage = positionArray.forwardStorages;
+ boolean hasKmer = true;
+ if (posInRead < 0) {
+ storage = positionArray.reverseStorages;
+ posInRead = (byte) -posInRead;
+ hasKmer = false;
+ }
+ if (storage[posInRead - 1].getLength() > 0) {
throw new IllegalArgumentException("Reentering into an exist storage");
}
fieldOffset += BYTE_SIZE;
// read poslist
- int lengthPosList = fieldBuffer.getInt(fieldOffset);
- PositionListWritable.getCountByDataLength(lengthPosList);
- fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+ fieldOffset += writeBytesToStorage(storage[posInRead - 1], fieldBuffer, fieldOffset);
// read Kmer
- lengthPosList = fieldBuffer.getInt(fieldOffset);
- if (lengthPosList != KmerUtil.getByteNumFromK(kmerLength)){
- throw new IllegalStateException("Size of Kmer is invalid ");
+ if (hasKmer) {
+ fieldOffset += writeBytesToStorage(storage[posInRead - 1], fieldBuffer, fieldOffset);
}
- fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
positionArray.count += 1;
}
@@ -159,15 +162,21 @@
public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
PositionArray positionArray = (PositionArray) state.state;
- ArrayBackedValueStorage[] storages = positionArray.storages;
- if (positionArray.count != storages.length) {
+
+ if (positionArray.count != ValidPosCount * 2) {
throw new IllegalStateException("Final aggregate position number is invalid");
}
DataOutput fieldOutput = tupleBuilder.getDataOutput();
try {
- for (int i = 0; i < storages.length; i++) {
- fieldOutput.write(storages[i].getByteArray(), storages[i].getStartOffset(),
- storages[i].getLength());
+ for (int i = 0; i < ValidPosCount; i++) {
+ fieldOutput.write(positionArray.forwardStorages[i].getByteArray(),
+ positionArray.forwardStorages[i].getStartOffset(),
+ positionArray.forwardStorages[i].getLength());
+ tupleBuilder.addFieldEndOffset();
+
+ fieldOutput.write(positionArray.reverseStorages[i].getByteArray(),
+ positionArray.reverseStorages[i].getStartOffset(),
+ positionArray.reverseStorages[i].getLength());
tupleBuilder.addFieldEndOffset();
}
} catch (IOException e) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
index 7c70aa9..00409ef 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
@@ -32,8 +32,11 @@
public static final int InputNodeIDField = MapReadToNodeOperator.OutputNodeIDField;
public static final int InputCountOfKmerField = MapReadToNodeOperator.OutputCountOfKmerField;
- public static final int InputIncomingField = MapReadToNodeOperator.OutputIncomingField;
- public static final int InputOutgoingField = MapReadToNodeOperator.OutputOutgoingField;
+ public static final int InputFFField = MapReadToNodeOperator.OutputForwardForwardField;
+ public static final int InputFRField = MapReadToNodeOperator.OutputForwardReverseField;
+ public static final int InputRFField = MapReadToNodeOperator.OutputReverseForwardField;
+ public static final int InputRRField = MapReadToNodeOperator.OutputReverseReverseField;
+
public static final int InputKmerBytesField = MapReadToNodeOperator.OutputKmerBytesField;
private ConfFactory confFactory;
@@ -68,11 +71,15 @@
public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
node.getNodeID().setNewReference(tuple.getFieldData(InputNodeIDField),
tuple.getFieldStart(InputNodeIDField));
- node.getIncomingList().setNewReference(tuple.getFieldLength(InputIncomingField) / PositionWritable.LENGTH,
- tuple.getFieldData(InputIncomingField), tuple.getFieldStart(InputIncomingField));
- node.getOutgoingList().setNewReference(tuple.getFieldLength(InputOutgoingField) / PositionWritable.LENGTH,
- tuple.getFieldData(InputOutgoingField), tuple.getFieldStart(InputOutgoingField));
-
+ node.getFFList().setNewReference(tuple.getFieldLength(InputFFField) / PositionWritable.LENGTH,
+ tuple.getFieldData(InputFFField), tuple.getFieldStart(InputFFField));
+ node.getFRList().setNewReference(tuple.getFieldLength(InputFRField) / PositionWritable.LENGTH,
+ tuple.getFieldData(InputFRField), tuple.getFieldStart(InputFRField));
+ node.getRFList().setNewReference(tuple.getFieldLength(InputRFField) / PositionWritable.LENGTH,
+ tuple.getFieldData(InputRFField), tuple.getFieldStart(InputRFField));
+ node.getRRList().setNewReference(tuple.getFieldLength(InputRRField) / PositionWritable.LENGTH,
+ tuple.getFieldData(InputRRField), tuple.getFieldStart(InputRRField));
+
node.getKmer().setNewReference(
Marshal.getInt(tuple.getFieldData(NodeSequenceWriterFactory.InputCountOfKmerField),
tuple.getFieldStart(NodeSequenceWriterFactory.InputCountOfKmerField)),
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
index c24760f..cec702e 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
@@ -5,7 +5,7 @@
import edu.uci.ics.genomix.data.Marshal;
import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -39,16 +39,22 @@
public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
node.getNodeID().setNewReference(tuple.getFieldData(NodeSequenceWriterFactory.InputNodeIDField),
tuple.getFieldStart(NodeSequenceWriterFactory.InputNodeIDField));
- node.getIncomingList().setNewReference(
- PositionListWritable.getCountByDataLength(tuple
- .getFieldLength(NodeSequenceWriterFactory.InputIncomingField)),
- tuple.getFieldData(NodeSequenceWriterFactory.InputIncomingField),
- tuple.getFieldStart(NodeSequenceWriterFactory.InputIncomingField));
- node.getOutgoingList().setNewReference(
- PositionListWritable.getCountByDataLength(tuple
- .getFieldLength(NodeSequenceWriterFactory.InputOutgoingField)),
- tuple.getFieldData(NodeSequenceWriterFactory.InputOutgoingField),
- tuple.getFieldStart(NodeSequenceWriterFactory.InputOutgoingField));
+ node.getFFList().setNewReference(
+ tuple.getFieldLength(NodeSequenceWriterFactory.InputFFField) / PositionWritable.LENGTH,
+ tuple.getFieldData(NodeSequenceWriterFactory.InputFFField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputFFField));
+ node.getFRList().setNewReference(
+ tuple.getFieldLength(NodeSequenceWriterFactory.InputFRField) / PositionWritable.LENGTH,
+ tuple.getFieldData(NodeSequenceWriterFactory.InputFRField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputFRField));
+ node.getRFList().setNewReference(
+ tuple.getFieldLength(NodeSequenceWriterFactory.InputRFField) / PositionWritable.LENGTH,
+ tuple.getFieldData(NodeSequenceWriterFactory.InputRFField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputRFField));
+ node.getRRList().setNewReference(
+ tuple.getFieldLength(NodeSequenceWriterFactory.InputRRField) / PositionWritable.LENGTH,
+ tuple.getFieldData(NodeSequenceWriterFactory.InputRRField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputRRField));
node.getKmer().setNewReference(
Marshal.getInt(tuple.getFieldData(NodeSequenceWriterFactory.InputCountOfKmerField),
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
index fe9afdf..2d420c3 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
@@ -59,7 +59,7 @@
public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
- public static final boolean DEFAULT_REVERSED = false;
+ public static final boolean DEFAULT_REVERSED = true;
public static final String JOB_PLAN_GRAPHBUILD = "graphbuild";
public static final String JOB_PLAN_GRAPHSTAT = "graphstat";
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index 02d9f9c..94d619a 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -220,7 +220,7 @@
// (ReadID,PosInRead,{OtherPosition,...},Kmer)
AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec,
- MapKmerPositionToReadOperator.readIDOutputRec);
+ MapKmerPositionToReadOperator.readIDOutputRec, readLength, kmerSize);
connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapKmerToRead, ncNodeNames,
new OneToOneConnectorDescriptor(jobSpec));
return mapKmerToRead;
@@ -237,7 +237,7 @@
jobSpec));
RecordDescriptor readIDFinalRec = new RecordDescriptor(
- new ISerializerDeserializer[1 + MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
+ new ISerializerDeserializer[1 + 2 * MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateReadIDAggregateFactory(),
new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(),
new ReadIDNormarlizedComputeFactory(), ReadIDPointable.FACTORY,
@@ -283,7 +283,6 @@
hadoopJobConfFactory.getConf(), kmerWriter);
connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
new OneToOneConnectorDescriptor(jobSpec));
- // jobSpec.addRoot(writeKmerOperator);
return writeKmerOperator;
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
index fb0a3de..201be03 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
@@ -99,19 +99,27 @@
plist.setNewReference(posCount, buffer, fieldOffset);
fieldOffset += plist.getLength();
- int kmerbytes = Marshal.getInt(buffer, fieldOffset);
- if (kmer.getLength() != kmerbytes) {
- throw new IllegalArgumentException("kmerlength is invalid");
+ int posInRead = (i + 1) / 2;
+ if (i % 2 == 0) {
+ posInRead = -posInRead;
}
- fieldOffset += 4;
- kmer.setNewReference(buffer, fieldOffset);
- fieldOffset += kmer.getLength();
+ String kmerString = "";
+ if (posInRead > 0) {
+ int kmerbytes = Marshal.getInt(buffer, fieldOffset);
+ if (kmer.getLength() != kmerbytes) {
+ throw new IllegalArgumentException("kmerlength is invalid");
+ }
+ fieldOffset += 4;
+ kmer.setNewReference(buffer, fieldOffset);
+ fieldOffset += kmer.getLength();
+ kmerString = kmer.toString();
+ }
- output.write(Integer.toString(i - 1).getBytes());
+ output.write(Integer.toString(posInRead).getBytes());
output.writeByte('\t');
output.write(plist.toString().getBytes());
output.writeByte('\t');
- output.write(kmer.toString().getBytes());
+ output.write(kmerString.getBytes());
output.writeByte('\t');
}
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
index ed79781..2872a2d 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
@@ -76,13 +76,18 @@
tuple.getFieldData(MapKmerPositionToReadOperator.OutputOtherReadIDListField),
tuple.getFieldStart(MapKmerPositionToReadOperator.OutputOtherReadIDListField));
- if (kmer.getLength() > tuple
- .getFieldLength(MapKmerPositionToReadOperator.OutputKmerField)) {
- throw new IllegalArgumentException("Not enough kmer bytes");
+
+ String kmerString = "";
+ if (posInRead > 0) {
+ if (kmer.getLength() > tuple
+ .getFieldLength(MapKmerPositionToReadOperator.OutputKmerField)) {
+ throw new IllegalArgumentException("Not enough kmer bytes");
+ }
+ kmer.setNewReference(
+ tuple.getFieldData(MapKmerPositionToReadOperator.OutputKmerField),
+ tuple.getFieldStart(MapKmerPositionToReadOperator.OutputKmerField));
+ kmerString = kmer.toString();
}
- kmer.setNewReference(
- tuple.getFieldData(MapKmerPositionToReadOperator.OutputKmerField),
- tuple.getFieldStart(MapKmerPositionToReadOperator.OutputKmerField));
output.write(Integer.toString(readID).getBytes());
output.writeByte('\t');
@@ -90,7 +95,7 @@
output.writeByte('\t');
output.write(plist.toString().getBytes());
output.writeByte('\t');
- output.write(kmer.toString().getBytes());
+ output.write(kmerString.getBytes());
output.writeByte('\n');
} catch (IOException e) {
throw new HyracksDataException(e);
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 745f62f..ec95aa6 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -94,7 +94,7 @@
conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_GROUPBY_READID, true);
- Assert.assertEquals(true, checkResults(EXPECTED_GROUPBYREADID, new int [] {2,5,8,11}));
+ Assert.assertEquals(true, checkResults(EXPECTED_GROUPBYREADID, new int [] {2,5,8,11,14,17,20,23}));
}
public void TestEndToEnd() throws Exception {
@@ -103,7 +103,7 @@
cleanUpReEntry();
conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_NODE, new int[] {1,2}));
+ Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_NODE, new int[] {1,2,3,4}));
}
@Before
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
index 13190dd..01c49e5 100755
--- a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
@@ -1,6 +1,6 @@
1 AATAGAAG
-2 AATAGAAG
+2 AATAGCTT
3 AATAGAAG
-4 AATAGAAG
+4 AATAGCTT
5 AATAGAAG
6 AGAAGAAG
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_generateNode b/genomix/genomix-hyracks/src/test/resources/expected/result_after_generateNode
index 2988303..9334b95 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_generateNode
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_generateNode
@@ -1,16 +1,14 @@
-((1,0) [] [(1,2)] AATAGA)
-((1,2) [(1,0)] [(6,0),(1,3)] TAGAA)
-((1,3) [(1,2)] [] AGAAG)
-((2,0) [] [(2,2)] AATAGA)
-((2,2) [(2,0)] [(6,0),(2,3)] TAGAA)
-((2,3) [(2,2)] [] AGAAG)
-((3,0) [] [(3,2)] AATAGA)
-((3,2) [(3,0)] [(6,0),(3,3)] TAGAA)
-((3,3) [(3,2)] [] AGAAG)
-((4,0) [] [(4,2)] AATAGA)
-((4,2) [(4,0)] [(6,0),(4,3)] TAGAA)
-((4,3) [(4,2)] [] AGAAG)
-((5,0) [] [(5,2)] AATAGA)
-((5,2) [(5,0)] [(6,0),(5,3)] TAGAA)
-((5,3) [(5,2)] [] AGAAG)
-((6,0) [(1,2),(2,2),(3,2),(5,2),(4,2)] [] AGAAGAAG)
+((1,1) [(1,3)] [] [] [] AATAGA)
+((1,3) [(6,1),(1,4)] [] [] [(1,1)] TAGAA)
+((1,4) [(6,2)] [] [] [(1,3)] AGAAG)
+((2,1) [] [] [] [] AATAGCTT)
+((3,1) [(3,3)] [] [] [] AATAGA)
+((3,3) [(6,1),(3,4)] [] [] [(3,1)] TAGAA)
+((3,4) [(6,2)] [] [] [(3,3)] AGAAG)
+((4,1) [] [] [] [] AATAGCTT)
+((5,1) [(5,3)] [] [] [] AATAGA)
+((5,3) [(6,1),(5,4)] [] [] [(5,1)] TAGAA)
+((5,4) [(6,2)] [] [] [(5,3)] AGAAG)
+((6,1) [(6,2)] [] [] [(5,3),(3,3),(1,3)] AGAAG)
+((6,2) [(6,3)] [] [] [(1,4),(3,4),(5,4),(6,1)] GAAGA)
+((6,3) [] [] [] [(6,2)] AAGAAG)
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read b/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read
index 1091d2e..3502e95 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read
@@ -1,24 +1,48 @@
-AAGAA (6,2)
-AATAG (1,0)
-AATAG (2,0)
-AATAG (3,0)
-AATAG (4,0)
-AATAG (5,0)
-AGAAG (1,3)
-AGAAG (2,3)
-AGAAG (3,3)
-AGAAG (4,3)
-AGAAG (5,3)
-AGAAG (6,0)
-AGAAG (6,3)
-ATAGA (1,1)
-ATAGA (2,1)
-ATAGA (3,1)
-ATAGA (4,1)
-ATAGA (5,1)
-GAAGA (6,1)
-TAGAA (1,2)
-TAGAA (2,2)
-TAGAA (3,2)
-TAGAA (4,2)
-TAGAA (5,2)
+AAGAA (6,3)
+AAGCT (2,-4)
+AAGCT (4,-4)
+AATAG (1,1)
+AATAG (2,1)
+AATAG (3,1)
+AATAG (4,1)
+AATAG (5,1)
+AGAAG (1,4)
+AGAAG (3,4)
+AGAAG (5,4)
+AGAAG (6,1)
+AGAAG (6,4)
+AGCTA (2,-3)
+AGCTA (4,-3)
+AGCTT (2,4)
+AGCTT (4,4)
+ATAGA (1,2)
+ATAGA (3,2)
+ATAGA (5,2)
+ATAGC (2,2)
+ATAGC (4,2)
+CTATT (1,-1)
+CTATT (2,-1)
+CTATT (3,-1)
+CTATT (4,-1)
+CTATT (5,-1)
+CTTCT (1,-4)
+CTTCT (3,-4)
+CTTCT (5,-4)
+CTTCT (6,-1)
+CTTCT (6,-4)
+GAAGA (6,2)
+GCTAT (2,-2)
+GCTAT (4,-2)
+TAGAA (1,3)
+TAGAA (3,3)
+TAGAA (5,3)
+TAGCT (2,3)
+TAGCT (4,3)
+TCTAT (1,-2)
+TCTAT (3,-2)
+TCTAT (5,-2)
+TCTTC (6,-2)
+TTCTA (1,-3)
+TTCTA (3,-3)
+TTCTA (5,-3)
+TTCTT (6,-3)
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId
index 2585102..b52a848 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId
@@ -1,24 +1,48 @@
-1 0 [] AATAG
-1 1 [] ATAGA
-1 2 [] TAGAA
-1 3 [(6,0)] AGAAG
-2 0 [] AATAG
-2 1 [] ATAGA
-2 2 [] TAGAA
-2 3 [(6,0)] AGAAG
-3 0 [] AATAG
-3 1 [] ATAGA
-3 2 [] TAGAA
-3 3 [(6,0)] AGAAG
-4 0 [] AATAG
-4 1 [] ATAGA
-4 2 [] TAGAA
-4 3 [(6,0)] AGAAG
-5 0 [] AATAG
-5 1 [] ATAGA
-5 2 [] TAGAA
-5 3 [(6,0)] AGAAG
-6 0 [(1,3),(2,3),(3,3),(5,3),(4,3)] AGAAG
-6 1 [] GAAGA
-6 2 [] AAGAA
-6 3 [] AGAAG
+1 -1 []
+1 -2 []
+1 -3 []
+1 -4 [(6,-1)]
+1 1 [] AATAG
+1 2 [] ATAGA
+1 3 [] TAGAA
+1 4 [(6,1)] AGAAG
+2 -1 []
+2 -2 []
+2 -3 []
+2 -4 []
+2 1 [] AATAG
+2 2 [] ATAGC
+2 3 [] TAGCT
+2 4 [] AGCTT
+3 -1 []
+3 -2 []
+3 -3 []
+3 -4 [(6,-1)]
+3 1 [] AATAG
+3 2 [] ATAGA
+3 3 [] TAGAA
+3 4 [(6,1)] AGAAG
+4 -1 []
+4 -2 []
+4 -3 []
+4 -4 []
+4 1 [] AATAG
+4 2 [] ATAGC
+4 3 [] TAGCT
+4 4 [] AGCTT
+5 -1 []
+5 -2 []
+5 -3 []
+5 -4 [(6,-1)]
+5 1 [] AATAG
+5 2 [] ATAGA
+5 3 [] TAGAA
+5 4 [(6,1)] AGAAG
+6 -1 [(1,-4),(3,-4),(5,-4)]
+6 -2 []
+6 -3 []
+6 -4 []
+6 1 [(1,4),(3,4),(5,4)] AGAAG
+6 2 [] GAAGA
+6 3 [] AAGAA
+6 4 [] AGAAG
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate
index 499200a..9035f33 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate
@@ -1,6 +1,18 @@
-AAGAA [(6,2)]
-AATAG [(1,0),(2,0),(3,0),(4,0),(5,0)]
-AGAAG [(1,3),(2,3),(3,3),(4,3),(5,3),(6,0),(6,3)]
-ATAGA [(1,1),(2,1),(3,1),(4,1),(5,1)]
-GAAGA [(6,1)]
-TAGAA [(1,2),(2,2),(3,2),(4,2),(5,2)]
\ No newline at end of file
+AAGAA [(6,3)]
+AAGCT [(2,-4),(4,-4)]
+AATAG [(1,1),(2,1),(3,1),(4,1),(5,1)]
+AGAAG [(1,4),(3,4),(5,4),(6,1),(6,4)]
+AGCTA [(2,-3),(4,-3)]
+AGCTT [(2,4),(4,4)]
+ATAGA [(1,2),(3,2),(5,2)]
+ATAGC [(2,2),(4,2)]
+CTATT [(1,-1),(2,-1),(3,-1),(4,-1),(5,-1)]
+CTTCT [(1,-4),(3,-4),(5,-4),(6,-1),(6,-4)]
+GAAGA [(6,2)]
+GCTAT [(2,-2),(4,-2)]
+TAGAA [(1,3),(3,3),(5,3)]
+TAGCT [(2,3),(4,3)]
+TCTAT [(1,-2),(3,-2),(5,-2)]
+TCTTC [(6,-2)]
+TTCTA [(1,-3),(3,-3),(5,-3)]
+TTCTT [(6,-3)]
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage b/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
index 7d2e065..2cc283d 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
@@ -1,6 +1,6 @@
-1 0 [] AATAG 1 [] ATAGA 2 [] TAGAA 3 [(6,0)] AGAAG
-2 0 [] AATAG 1 [] ATAGA 2 [] TAGAA 3 [(6,0)] AGAAG
-3 0 [] AATAG 1 [] ATAGA 2 [] TAGAA 3 [(6,0)] AGAAG
-4 0 [] AATAG 1 [] ATAGA 2 [] TAGAA 3 [(6,0)] AGAAG
-5 0 [] AATAG 1 [] ATAGA 2 [] TAGAA 3 [(6,0)] AGAAG
-6 0 [(1,3),(2,3),(3,3),(5,3),(4,3)] AGAAG 1 [] GAAGA 2 [] AAGAA 3 [] AGAAG
+1 1 [] AATAG -1 [] 2 [] ATAGA -2 [] 3 [] TAGAA -3 [] 4 [(6,1)] AGAAG -4 [(6,-1)]
+2 1 [] AATAG -1 [] 2 [] ATAGC -2 [] 3 [] TAGCT -3 [] 4 [] AGCTT -4 []
+3 1 [] AATAG -1 [] 2 [] ATAGA -2 [] 3 [] TAGAA -3 [] 4 [(6,1)] AGAAG -4 [(6,-1)]
+4 1 [] AATAG -1 [] 2 [] ATAGC -2 [] 3 [] TAGCT -3 [] 4 [] AGCTT -4 []
+5 1 [] AATAG -1 [] 2 [] ATAGA -2 [] 3 [] TAGAA -3 [] 4 [(6,1)] AGAAG -4 [(6,-1)]
+6 1 [(3,4),(1,4),(5,4)] AGAAG -1 [(1,-4),(3,-4),(5,-4)] 2 [] GAAGA -2 [] 3 [] AAGAA -3 [] 4 [] AGAAG -4 []