Merge commit '951870ef4642f2003c4240eeef702fc90600fe77' into nanzhang/genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index 11d0998..34fb0f6 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -132,6 +132,9 @@
public static int getCountByDataLength(int length) {
if (length % PositionWritable.LENGTH != 0) {
+ for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
+ System.out.println(ste);
+ }
throw new IllegalArgumentException("Length of positionlist is invalid");
}
return length / PositionWritable.LENGTH;
diff --git a/genomix/genomix-hadoop/pom.xml b/genomix/genomix-hadoop/pom.xml
index fe83c3c..610092a 100755
--- a/genomix/genomix-hadoop/pom.xml
+++ b/genomix/genomix-hadoop/pom.xml
@@ -156,5 +156,23 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>genomix-hyracks</artifactId>
+ <version>0.2.6-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>0.2.6-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>0.2.6-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
new file mode 100644
index 0000000..d8d4429
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
@@ -0,0 +1,63 @@
+package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.genomix.hadoop.pmcommon.GenomixMiniClusterTest;
+import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.type.NodeWritable;
+
+@SuppressWarnings("deprecation")
+public class TestPathMergeH3 extends GenomixMiniClusterTest {
+ protected String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
+ protected String HDFS_SEQUENCE = "/00-sequence/";
+ protected String HDFS_GRAPHBUILD = "/01-graphbuild/";
+ protected String HDFS_MERGED = "/02-graphmerge/";
+
+ protected String GRAPHBUILD_FILE = "result.graphbuild.txt";
+ protected String PATHMERGE_FILE = "result.mergepath.txt";
+
+ {
+ KMER_LENGTH = 5;
+ READ_LENGTH = 8;
+ HDFS_PATHS = new ArrayList<String>(Arrays.asList(HDFS_SEQUENCE, HDFS_GRAPHBUILD, HDFS_MERGED));
+ // we have to specify what kind of keys and values this job has
+ key = new NodeWritable(KMER_LENGTH);
+ value = NullWritable.get();
+ }
+
+ @Test
+ public void TestBuildGraph() throws Exception {
+ cleanUpOutput();
+ copyLocalToDFS(LOCAL_SEQUENCE_FILE, HDFS_SEQUENCE);
+ buildGraph();
+ }
+
+// @Test
+ public void TestMergeOneIteration() throws Exception {
+ cleanUpOutput();
+ copyLocalToDFS(LOCAL_SEQUENCE_FILE, HDFS_SEQUENCE);
+ buildGraph();
+ MergePathsH3Driver h3 = new MergePathsH3Driver();
+ h3.run(HDFS_GRAPHBUILD, HDFS_MERGED, 2, KMER_LENGTH, 1, ACTUAL_ROOT + "conf.xml", null);
+ copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, conf);
+ }
+
+
+
+ public void buildGraph() throws Exception {
+ FileInputFormat.setInputPaths(conf, HDFS_SEQUENCE);
+ FileOutputFormat.setOutputPath(conf, new Path(HDFS_GRAPHBUILD));
+ conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
+ conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+ driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+ copyResultsToLocal(HDFS_GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD_FILE, conf);
+ }
+}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
new file mode 100644
index 0000000..544f5fc
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
@@ -0,0 +1,192 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.junit.After;
+import org.junit.Before;
+
+import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.test.TestUtils;
+import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
+
+/*
+ * A base class providing most of the boilerplate for Genomix-based tests
+ */
+@SuppressWarnings("deprecation")
+public class GenomixMiniClusterTest {
+ protected int KMER_LENGTH = 5;
+ protected int READ_LENGTH = 8;
+
+ // subclass should modify this to include the HDFS directories that should be cleaned up
+ protected ArrayList<String> HDFS_PATHS = new ArrayList<String>();
+
+ protected String EXPECTED_ROOT = "src/test/resources/expected/";
+ protected String ACTUAL_ROOT = "src/test/resources/actual/";
+
+ protected String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
+
+ protected MiniDFSCluster dfsCluster;
+ protected JobConf conf = new JobConf();
+ protected int numberOfNC = 1;
+ protected int numPartitionPerMachine = 1;
+ protected Driver driver;
+
+ protected Writable key;
+ protected Writable value;
+ protected MiniMRCluster mrCluster;
+
+
+ @Before
+ public void setUp() throws Exception {
+ cleanupStores();
+ HyracksUtils.init();
+ FileUtils.forceMkdir(new File(ACTUAL_ROOT));
+ FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
+ startHDFS();
+
+ conf.setInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH);
+ conf.setInt(GenomixJobConf.READ_LENGTH, READ_LENGTH);
+ driver = new Driver(HyracksUtils.CC_HOST,
+ HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
+ }
+
+ /*
+ * Merge and copy a DFS directory to a local destination, converting to text if necessary.
+ * Also locally store the binary-formatted result if available.
+ */
+ protected void copyResultsToLocal(String hdfsSrcDir, String localDestFile, Configuration conf) throws IOException {
+ String fileFormat = conf.get(GenomixJobConf.OUTPUT_FORMAT);
+ if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(fileFormat)) {
+ // for text files, just concatenate them together
+ FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
+ FileSystem.getLocal(new Configuration()), new Path(localDestFile),
+ false, conf, null);
+ } else {
+ // file is binary
+ // merge and store the binary format
+// FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
+// FileSystem.getLocal(new Configuration()), new Path(localDestFile + ".bin"),
+// false, conf, null);
+ // load the Node's and write them out as text locally
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.mkdirs(new Path(localDestFile).getParent());
+ File filePathTo = new File(localDestFile);
+ BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+ FileSystem dfs = FileSystem.get(conf);
+ SequenceFile.Reader reader = new SequenceFile.Reader(dfs, new Path(hdfsSrcDir + "part-0"), conf);
+ SequenceFile.Writer writer = new SequenceFile.Writer(lfs, new JobConf(), new Path(localDestFile + ".bin"), reader.getKeyClass(), reader.getValueClass());
+
+ for (int i=0; i < java.lang.Integer.MAX_VALUE; i++) {
+ Path path = new Path(hdfsSrcDir + "part-" + i);
+ if (!dfs.exists(path)) {
+ break;
+ }
+ if (dfs.getFileStatus(path).getLen() == 0) {
+ continue;
+ }
+ reader = new SequenceFile.Reader(dfs, path, conf);
+ while (reader.next(key, value)) {
+ if (key == null || value == null) {
+ break;
+ }
+ bw.write(key.toString() + "\t" + value.toString());
+ System.out.println(key.toString() + "\t" + value.toString());
+ bw.newLine();
+ writer.append(key, value);
+
+ }
+ reader.close();
+ }
+ writer.close();
+ bw.close();
+ }
+
+ }
+
+ protected boolean checkResults(String expectedPath, String actualPath, int[] poslistField) throws Exception {
+ File dumped = new File(actualPath);
+ if (poslistField != null) {
+ TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
+ } else {
+ TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+ }
+ return true;
+ }
+
+ protected void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ protected void startHDFS() throws IOException {
+ conf.addResource(new Path(HADOOP_CONF_ROOT + "core-site.xml"));
+ conf.addResource(new Path(HADOOP_CONF_ROOT + "mapred-site.xml"));
+ conf.addResource(new Path(HADOOP_CONF_ROOT + "hdfs-site.xml"));
+
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ //mrCluster = new MiniMRCluster(4, dfsCluster.getFileSystem().getUri().toString(), 2);
+
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(ACTUAL_ROOT + "conf.xml")));
+ conf.writeXml(confOutput);
+ confOutput.close();
+ }
+
+ protected void copyLocalToDFS(String localSrc, String hdfsDest) throws IOException {
+ FileSystem dfs = FileSystem.get(conf);
+ Path src = new Path(localSrc);
+ Path dest = new Path(hdfsDest);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
+ }
+
+ /*
+ * Remove the local "actual" folder and any hdfs folders in use by this test
+ */
+ public void cleanUpOutput() throws IOException {
+ // local cleanup
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ if (lfs.exists(new Path(ACTUAL_ROOT))) {
+ lfs.delete(new Path(ACTUAL_ROOT), true);
+ }
+ // dfs cleanup
+ FileSystem dfs = FileSystem.get(conf);
+ for (String path : HDFS_PATHS) {
+ if (dfs.exists(new Path(path))) {
+ dfs.delete(new Path(path), true);
+ }
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ HyracksUtils.deinit();
+ cleanupHDFS();
+ }
+
+ protected void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ //mrCluster.shutdown();
+ }
+}
diff --git a/genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt b/genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt
new file mode 100755
index 0000000..13190dd
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt
@@ -0,0 +1,6 @@
+1 AATAGAAG
+2 AATAGAAG
+3 AATAGAAG
+4 AATAGAAG
+5 AATAGAAG
+6 AGAAGAAG
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/core-site.xml b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/core-site.xml
new file mode 100644
index 0000000..3e5bacb
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/core-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+ <property>
+ <name>fs.default.name</name>
+ <value>hdfs://127.0.0.1:31888</value>
+ </property>
+ <property>
+ <name>hadoop.tmp.dir</name>
+ <value>/tmp/hadoop</value>
+ </property>
+
+
+</configuration>
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/hdfs-site.xml b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/hdfs-site.xml
new file mode 100644
index 0000000..b1b1902
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/hdfs-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+ <property>
+ <name>dfs.replication</name>
+ <value>1</value>
+ </property>
+
+ <property>
+ <name>dfs.block.size</name>
+ <value>65536</value>
+ </property>
+
+</configuration>
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/log4j.properties b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/log4j.properties
new file mode 100755
index 0000000..d5e6004
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/log4j.properties
@@ -0,0 +1,94 @@
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=FATAL,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=FATAL
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Rolling File Appender
+#
+
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# FSNamesystem Audit logging
+# All audit events are logged at INFO level
+#
+log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+# Jets3t library
+log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/mapred-site.xml b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/mapred-site.xml
new file mode 100644
index 0000000..525e7d5
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/mapred-site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+ <property>
+ <name>mapred.job.tracker</name>
+ <value>localhost:29007</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.map.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.reduce.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.max.split.size</name>
+ <value>2048</value>
+ </property>
+
+</configuration>
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index fe8224f..3667d43 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -45,19 +45,17 @@
public static final int OutputKmerField = 0;
public static final int OutputPosition = 1;
- private KmerBytesWritable kmer;
- private PositionReference pos;
- private boolean bReversed;
+ private final boolean bReversed;
private final int readLength;
+ private final int kmerSize;
public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
null });
public ReadsKeyValueParserFactory(int readlength, int k, boolean bGenerateReversed) {
bReversed = bGenerateReversed;
- kmer = new KmerBytesWritable(k);
- pos = new PositionReference();
this.readLength = readlength;
+ this.kmerSize = k;
}
@Override
@@ -69,6 +67,9 @@
return new IKeyValueParser<LongWritable, Text>() {
+ private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+ private PositionReference pos = new PositionReference();
+
@Override
public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
String[] geneLine = value.toString().split("\\t"); // Read the Real Gene Line
@@ -97,17 +98,16 @@
private void SplitReads(int readID, byte[] array, IFrameWriter writer) {
/** first kmer */
- int k = kmer.getKmerLength();
- if (k >= array.length) {
+ if (kmerSize >= array.length) {
return;
}
kmer.setByRead(array, 0);
InsertToFrame(kmer, readID, 1, writer);
/** middle kmer */
- for (int i = k; i < array.length; i++) {
+ for (int i = kmerSize; i < array.length; i++) {
kmer.shiftKmerWithNextChar(array[i]);
- InsertToFrame(kmer, readID, i - k + 2, writer);
+ InsertToFrame(kmer, readID, i - kmerSize + 2, writer);
}
if (bReversed) {
@@ -115,9 +115,9 @@
kmer.setByReadReverse(array, 0);
InsertToFrame(kmer, readID, -1, writer);
/** middle kmer */
- for (int i = k; i < array.length; i++) {
+ for (int i = kmerSize; i < array.length; i++) {
kmer.shiftKmerWithPreCode(GeneCode.getPairedCodeFromSymbol(array[i]));
- InsertToFrame(kmer, readID, -(i - k + 2), writer);
+ InsertToFrame(kmer, readID, -(i - kmerSize + 2), writer);
}
}
}
@@ -149,8 +149,6 @@
@Override
public void open(IFrameWriter writer) throws HyracksDataException {
- // TODO Auto-generated method stub
-
}
@Override
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
index 0f791a1..eab197e 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -18,6 +18,9 @@
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -31,11 +34,12 @@
public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
-
+ private static final Log LOG = LogFactory.getLog(MergeKmerAggregateFactory.class);
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
throws HyracksDataException {
+ final int frameSize = ctx.getFrameSize();
return new IAggregatorDescriptor() {
private PositionReference position = new PositionReference();
@@ -90,6 +94,9 @@
DataOutput fieldOutput = tupleBuilder.getDataOutput();
ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
try {
+ if (inputVal.getLength() > frameSize/2){
+ LOG.warn("MergeKmer: output data size is too big: " + inputVal.getLength());
+ }
fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
tupleBuilder.addFieldEndOffset();
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
index 0d51035..974dead 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -4,6 +4,9 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -22,6 +25,7 @@
private static final long serialVersionUID = 1L;
private final int ValidPosCount;
+ private static final Log LOG = LogFactory.getLog(MergeReadIDAggregateFactory.class);
public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
ValidPosCount = getPositionCount(readLength, kmerLength);
@@ -45,6 +49,7 @@
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
throws HyracksDataException {
+ final int frameSize = ctx.getFrameSize();
return new IAggregatorDescriptor() {
class PositionArray {
@@ -168,6 +173,7 @@
}
DataOutput fieldOutput = tupleBuilder.getDataOutput();
try {
+ int totalSize = 0;
for (int i = 0; i < ValidPosCount; i++) {
fieldOutput.write(positionArray.forwardStorages[i].getByteArray(),
positionArray.forwardStorages[i].getStartOffset(),
@@ -178,6 +184,22 @@
positionArray.reverseStorages[i].getStartOffset(),
positionArray.reverseStorages[i].getLength());
tupleBuilder.addFieldEndOffset();
+
+ totalSize += positionArray.forwardStorages[i].getLength()
+ + positionArray.reverseStorages[i].getLength();
+ }
+ if (totalSize > frameSize / 2) {
+ int leadbyte = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+ int readID = accessor.getBuffer().getInt(
+ leadbyte + accessor.getFieldStartOffset(tIndex, InputReadIDField));
+ LOG.warn("MergeReadID on read:" + readID + " is of size: " + totalSize + ", current frameSize:"
+ + frameSize + "\n Recommendate to enlarge the FrameSize");
+ }
+ if (totalSize > frameSize){
+ for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
+ System.out.println(ste);
+ }
+ throw new HyracksDataException("Data is too long");
}
} catch (IOException e) {
throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
index f614846..7ea065d 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
@@ -33,15 +33,15 @@
*
*/
private static final long serialVersionUID = 1L;
- private KmerBytesWritable kmer;
- private PositionListWritable plist;
-
+
+ private final int kmerSize;
public KMerTextWriterFactory(int k) {
- kmer = new KmerBytesWritable(k);
- plist = new PositionListWritable();
+ kmerSize =k;
}
public class TupleWriter implements ITupleWriter {
+ private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+ private PositionListWritable plist = new PositionListWritable();
@Override
public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
try {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
index 2d420c3..1f12bb5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
@@ -50,7 +50,7 @@
public static final int DEFAULT_KMERLEN = 21;
public static final int DEFAULT_READLEN = 124;
- public static final int DEFAULT_FRAME_SIZE = 32768;
+ public static final int DEFAULT_FRAME_SIZE = 128*1024;
public static final int DEFAULT_FRAME_LIMIT = 4096;
public static final int DEFAULT_TABLE_SIZE = 10485767;
public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
index c6a69d4..28e4ff5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
@@ -56,13 +56,15 @@
*
*/
private static final long serialVersionUID = 1L;
- private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
- private PositionWritable pos = new PositionWritable();
+
@Override
public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
return new ITupleWriter(){
+ private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+ private PositionWritable pos = new PositionWritable();
+
@Override
public void open(DataOutput output) throws HyracksDataException {
// TODO Auto-generated method stub
diff --git a/genomix/genomix-hyracks/src/main/resources/scripts/startDebugNc.sh b/genomix/genomix-hyracks/src/main/resources/scripts/startDebugNc.sh
index c335475..5f89bcc 100644
--- a/genomix/genomix-hyracks/src/main/resources/scripts/startDebugNc.sh
+++ b/genomix/genomix-hyracks/src/main/resources/scripts/startDebugNc.sh
@@ -12,11 +12,11 @@
#Clean up temp dir
#rm -rf $NCTMP_DIR2
-mkdir $NCTMP_DIR2
+mkdir -p $NCTMP_DIR2
#Clean up log dir
#rm -rf $NCLOGS_DIR2
-mkdir $NCLOGS_DIR2
+mkdir -p $NCLOGS_DIR2
#Clean up I/O working dir
@@ -35,16 +35,14 @@
#Get node ID
NODEID=`hostname | cut -d '.' -f 1`
-NODEID=${NODEID}2
#Set JAVA_OPTS
export JAVA_OPTS=$NCJAVA_OPTS2
-cd $HYRACKS_HOME
-HYRACKS_HOME=`pwd`
+GENOMIX_HOME=`pwd`
#Enter the temp dir
cd $NCTMP_DIR2
#Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS2}" &> $NCLOGS_DIR2/$NODEID.log &
+${GENOMIX_HOME}/bin/genomixnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 07cfab4..ec95aa6 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -54,7 +54,7 @@
private JobConf conf = new JobConf();
private int numberOfNC = 2;
- private int numPartitionPerMachine = 1;
+ private int numPartitionPerMachine = 2;
private Driver driver;
diff --git a/genomix/genomix-pregelix/data/input/.out.crc b/genomix/genomix-pregelix/data/input/.out.crc
new file mode 100644
index 0000000..422fddb
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/.out.crc
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/test/result.graphbuild.txt.bin b/genomix/genomix-pregelix/data/input/test/result.graphbuild.txt.bin
new file mode 100644
index 0000000..4865aeb
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/test/result.graphbuild.txt.bin
Binary files differ
diff --git a/genomix/genomix-pregelix/data/test/result.graphbuild.bidirectional.txt b/genomix/genomix-pregelix/data/test/result.graphbuild.bidirectional.txt
new file mode 100644
index 0000000..a41ce56
--- /dev/null
+++ b/genomix/genomix-pregelix/data/test/result.graphbuild.bidirectional.txt
@@ -0,0 +1,18 @@
+((2,1) [(2,3)] [] [] [] AATAGA) (null)
+((2,3) [(6,1),(2,4)] [] [] [(2,1)] TAGAA) (null)
+((2,4) [(6,2)] [] [] [(2,3)] AGAAG) (null)
+((4,1) [(4,3)] [] [] [] AATAGA) (null)
+((4,3) [(6,1),(4,4)] [] [] [(4,1)] TAGAA) (null)
+((4,4) [(6,2)] [] [] [(4,3)] AGAAG) (null)
+((6,1) [(6,2)] [] [] [(2,3),(1,3),(3,3),(4,3),(5,3)] AGAAG) (null)
+((6,2) [(6,3)] [] [] [(2,4),(3,4),(1,4),(4,4),(5,4),(6,1)] GAAGA) (null)
+((6,3) [] [] [] [(6,2)] AAGAAG) (null)
+((1,1) [(1,3)] [] [] [] AATAGA) (null)
+((1,3) [(6,1),(1,4)] [] [] [(1,1)] TAGAA) (null)
+((1,4) [(6,2)] [] [] [(1,3)] AGAAG) (null)
+((3,1) [(3,3)] [] [] [] AATAGA) (null)
+((3,3) [(6,1),(3,4)] [] [] [(3,1)] TAGAA) (null)
+((3,4) [(6,2)] [] [] [(3,3)] AGAAG) (null)
+((5,1) [(5,3)] [] [] [] AATAGA) (null)
+((5,3) [(6,1),(5,4)] [] [] [(5,1)] TAGAA) (null)
+((5,4) [(6,2)] [] [] [(5,3)] AGAAG) (null)
diff --git a/genomix/genomix-pregelix/data/test/result.graphbuild.txt.bin b/genomix/genomix-pregelix/data/test/result.graphbuild.txt.bin
new file mode 100644
index 0000000..4865aeb
--- /dev/null
+++ b/genomix/genomix-pregelix/data/test/result.graphbuild.txt.bin
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
index 3fe04ff..0b0055b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
@@ -66,8 +66,10 @@
/**
* set the vertex value
*/
- vertexValue.setIncomingList(node.getIncomingList());
- vertexValue.setOutgoingList(node.getOutgoingList());
+ vertexValue.setFFList(node.getFFList());
+ vertexValue.setFRList(node.getFRList());
+ vertexValue.setRFList(node.getRFList());
+ vertexValue.setRRList(node.getRRList());
vertexValue.setMergeChain(node.getKmer());
vertexValue.setState(State.NON_VERTEX);
vertex.setVertexValue(vertexValue);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
index 4011838..91634d8 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
@@ -69,8 +69,10 @@
/**
* set the vertex value
*/
- vertexValue.setIncomingList(node.getIncomingList());
- vertexValue.setOutgoingList(node.getOutgoingList());
+ vertexValue.setFFList(node.getFFList());
+ vertexValue.setFRList(node.getFRList());
+ vertexValue.setRFList(node.getRFList());
+ vertexValue.setRRList(node.getRRList());
vertexValue.setMergeChain(node.getKmer());
vertexValue.setState(State.NON_VERTEX);
vertex.setVertexValue(vertexValue);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
new file mode 100644
index 0000000..fae1970
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
@@ -0,0 +1,62 @@
+package edu.uci.ics.genomix.pregelix.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.type.PositionListWritable;
+
+public class AdjacencyListWritable implements WritableComparable<AdjacencyListWritable>{
+ private PositionListWritable forwardList;
+ private PositionListWritable reverseList;
+
+ public AdjacencyListWritable(){
+ forwardList = new PositionListWritable();
+ reverseList = new PositionListWritable();
+ }
+
+ public void set(AdjacencyListWritable adjacencyList){
+ forwardList.set(adjacencyList.getForwardList());
+ reverseList.set(adjacencyList.getReverseList());
+ }
+
+ public void reset(){
+ forwardList.reset();
+ reverseList.reset();
+ }
+
+ public PositionListWritable getForwardList() {
+ return forwardList;
+ }
+
+ public void setForwardList(PositionListWritable forwardList) {
+ this.forwardList = forwardList;
+ }
+
+ public PositionListWritable getReverseList() {
+ return reverseList;
+ }
+
+ public void setReverseList(PositionListWritable reverseList) {
+ this.reverseList = reverseList;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ forwardList.readFields(in);
+ reverseList.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ forwardList.write(out);
+ reverseList.write(out);
+ }
+
+ @Override
+ public int compareTo(AdjacencyListWritable o) {
+ return 0;
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/IncomingListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/IncomingListWritable.java
new file mode 100644
index 0000000..8dec857
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/IncomingListWritable.java
@@ -0,0 +1,56 @@
+package edu.uci.ics.genomix.pregelix.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.type.PositionListWritable;
+
+public class IncomingListWritable implements WritableComparable<IncomingListWritable>{
+ private PositionListWritable reverseForwardList;
+ private PositionListWritable reverseReverseList;
+
+ public IncomingListWritable(){
+ reverseForwardList = new PositionListWritable();
+ reverseReverseList = new PositionListWritable();
+ }
+
+ public PositionListWritable getReverseForwardList() {
+ return reverseForwardList;
+ }
+
+ public void setReverseForwardList(PositionListWritable reverseForwardList) {
+ this.reverseForwardList = reverseForwardList;
+ }
+
+ public PositionListWritable getReverseReverseList() {
+ return reverseReverseList;
+ }
+
+ public void setReverseReverseList(PositionListWritable reverseReverseList) {
+ this.reverseReverseList = reverseReverseList;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ reverseForwardList.readFields(in);
+ reverseReverseList.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ reverseForwardList.write(out);
+ reverseReverseList.write(out);
+ }
+
+ @Override
+ public int compareTo(IncomingListWritable o) {
+ return 0;
+ }
+
+ public int inDegree(){
+ return reverseReverseList.getCountOfPosition() + reverseForwardList.getCountOfPosition();
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index 31d313d..f0a6b58 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -9,7 +9,6 @@
import edu.uci.ics.genomix.pregelix.type.CheckMessage;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
public class MessageWritable implements WritableComparable<MessageWritable> {
@@ -20,7 +19,7 @@
*/
private PositionWritable sourceVertexId;
private KmerBytesWritable chainVertexId;
- private PositionListWritable neighberNode; //incoming or outgoing
+ private AdjacencyListWritable neighberNode; //incoming or outgoing
private byte message;
private byte checkMessage;
@@ -28,12 +27,12 @@
public MessageWritable() {
sourceVertexId = new PositionWritable();
chainVertexId = new KmerBytesWritable(0);
- neighberNode = new PositionListWritable();
+ neighberNode = new AdjacencyListWritable();
message = Message.NON;
checkMessage = (byte) 0;
}
- public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, PositionListWritable neighberNode, byte message) {
+ public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
checkMessage = 0;
if (sourceVertexId != null) {
checkMessage |= CheckMessage.SOURCE;
@@ -79,11 +78,11 @@
}
}
- public PositionListWritable getNeighberNode() {
+ public AdjacencyListWritable getNeighberNode() {
return neighberNode;
}
- public void setNeighberNode(PositionListWritable neighberNode) {
+ public void setNeighberNode(AdjacencyListWritable neighberNode) {
if(neighberNode != null){
checkMessage |= CheckMessage.NEIGHBER;
this.neighberNode.set(neighberNode);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/OutgoingListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/OutgoingListWritable.java
new file mode 100644
index 0000000..275954d
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/OutgoingListWritable.java
@@ -0,0 +1,56 @@
+package edu.uci.ics.genomix.pregelix.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.type.PositionListWritable;
+
+public class OutgoingListWritable implements WritableComparable<OutgoingListWritable>{
+ private PositionListWritable forwardForwardList;
+ private PositionListWritable forwardReverseList;
+
+ public OutgoingListWritable(){
+ forwardForwardList = new PositionListWritable();
+ forwardReverseList = new PositionListWritable();
+ }
+
+ public PositionListWritable getForwardForwardList() {
+ return forwardForwardList;
+ }
+
+ public void setForwardForwardList(PositionListWritable forwardForwardList) {
+ this.forwardForwardList = forwardForwardList;
+ }
+
+ public PositionListWritable getForwardReverseList() {
+ return forwardReverseList;
+ }
+
+ public void setForwardReverseList(PositionListWritable forwardReverseList) {
+ this.forwardReverseList = forwardReverseList;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ forwardForwardList.readFields(in);
+ forwardReverseList.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ forwardForwardList.write(out);
+ forwardReverseList.write(out);
+ }
+
+ @Override
+ public int compareTo(OutgoingListWritable o) {
+ return 0;
+ }
+
+ public int outDegree(){
+ return forwardForwardList.getCountOfPosition() + forwardReverseList.getCountOfPosition();
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
index 2554c8e..985c2ac 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
@@ -10,44 +10,82 @@
public class ValueStateWritable implements WritableComparable<ValueStateWritable> {
- private PositionListWritable incomingList;
- private PositionListWritable outgoingList;
+ private AdjacencyListWritable incomingList;
+ private AdjacencyListWritable outgoingList;
private byte state;
private KmerBytesWritable mergeChain;
public ValueStateWritable() {
- incomingList = new PositionListWritable();
- outgoingList = new PositionListWritable();
+ incomingList = new AdjacencyListWritable();
+ outgoingList = new AdjacencyListWritable();
state = State.NON_VERTEX;
mergeChain = new KmerBytesWritable(0);
}
- public ValueStateWritable(PositionListWritable incomingList, PositionListWritable outgoingList,
+ public ValueStateWritable(PositionListWritable forwardForwardList, PositionListWritable forwardReverseList,
+ PositionListWritable reverseForwardList, PositionListWritable reverseReverseList,
byte state, KmerBytesWritable mergeChain) {
- set(incomingList, outgoingList, state, mergeChain);
+ set(forwardForwardList, forwardReverseList,
+ reverseForwardList, reverseReverseList,
+ state, mergeChain);
}
- public void set(PositionListWritable incomingList, PositionListWritable outgoingList,
+ public void set(PositionListWritable forwardForwardList, PositionListWritable forwardReverseList,
+ PositionListWritable reverseForwardList, PositionListWritable reverseReverseList,
byte state, KmerBytesWritable mergeChain) {
- this.incomingList.set(incomingList);
- this.outgoingList.set(outgoingList);
+ this.incomingList.setForwardList(reverseForwardList);
+ this.incomingList.setReverseList(reverseReverseList);
+ this.outgoingList.setForwardList(forwardForwardList);
+ this.outgoingList.setReverseList(forwardReverseList);
this.state = state;
this.mergeChain.set(mergeChain);
}
- public PositionListWritable getIncomingList() {
+ public PositionListWritable getFFList() {
+ return outgoingList.getForwardList();
+ }
+
+ public PositionListWritable getFRList() {
+ return outgoingList.getReverseList();
+ }
+
+ public PositionListWritable getRFList() {
+ return incomingList.getForwardList();
+ }
+
+ public PositionListWritable getRRList() {
+ return incomingList.getReverseList();
+ }
+
+ public void setFFList(PositionListWritable forwardForwardList){
+ outgoingList.setForwardList(forwardForwardList);
+ }
+
+ public void setFRList(PositionListWritable forwardReverseList){
+ outgoingList.setReverseList(forwardReverseList);
+ }
+
+ public void setRFList(PositionListWritable reverseForwardList){
+ incomingList.setForwardList(reverseForwardList);
+ }
+
+ public void setRRList(PositionListWritable reverseReverseList){
+ incomingList.setReverseList(reverseReverseList);
+ }
+
+ public AdjacencyListWritable getIncomingList() {
return incomingList;
}
- public void setIncomingList(PositionListWritable incomingList) {
+ public void setIncomingList(AdjacencyListWritable incomingList) {
this.incomingList = incomingList;
}
- public PositionListWritable getOutgoingList() {
+ public AdjacencyListWritable getOutgoingList() {
return outgoingList;
}
- public void setOutgoingList(PositionListWritable outgoingList) {
+ public void setOutgoingList(AdjacencyListWritable outgoingList) {
this.outgoingList = outgoingList;
}
@@ -94,14 +132,21 @@
@Override
public String toString() {
- return state + "\t" + getLengthOfMergeChain() + "\t" + mergeChain.toString();
+ StringBuilder sbuilder = new StringBuilder();
+ sbuilder.append('(');
+ sbuilder.append(outgoingList.getForwardList().toString()).append('\t');
+ sbuilder.append(outgoingList.getReverseList().toString()).append('\t');
+ sbuilder.append(incomingList.getForwardList().toString()).append('\t');
+ sbuilder.append(incomingList.getReverseList().toString()).append('\t');
+ sbuilder.append(mergeChain.toString()).append(')');
+ return sbuilder.toString();
}
- public int inDegree() {
- return incomingList.getCountOfPosition();
+ public int inDegree(){
+ return incomingList.getForwardList().getCountOfPosition() + incomingList.getReverseList().getCountOfPosition();
}
-
- public int outDegree() {
- return outgoingList.getCountOfPosition();
+
+ public int outDegree(){
+ return outgoingList.getForwardList().getCountOfPosition() + outgoingList.getReverseList().getCountOfPosition();
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
index 6be0eee..c3bb663 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
@@ -75,12 +75,18 @@
* get destination vertex
*/
public PositionWritable getNextDestVertexId(ValueStateWritable value) {
- posIterator = value.getOutgoingList().iterator();
+ if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
+ posIterator = value.getFFList().iterator();
+ else // #FRList() > 0
+ posIterator = value.getFRList().iterator();
return posIterator.next();
}
public PositionWritable getPreDestVertexId(ValueStateWritable value) {
- posIterator = value.getIncomingList().iterator();
+ if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
+ posIterator = value.getRFList().iterator();
+ else // #RRList() > 0
+ posIterator = value.getRRList().iterator();
return posIterator.next();
}
@@ -88,7 +94,12 @@
* head send message to all next nodes
*/
public void sendMsgToAllNextNodes(ValueStateWritable value) {
- posIterator = value.getOutgoingList().iterator();
+ posIterator = value.getFFList().iterator(); // FFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFRList().iterator(); // FRList
while(posIterator.hasNext()){
destVertexId.set(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
@@ -99,7 +110,12 @@
* head send message to all previous nodes
*/
public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
- posIterator = value.getIncomingList().iterator();
+ posIterator = value.getRFList().iterator(); // RFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getRRList().iterator(); // RRList
while(posIterator.hasNext()){
destVertexId.set(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
index af38072..4fcc09e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
@@ -80,12 +80,18 @@
* get destination vertex
*/
public PositionWritable getNextDestVertexId(ValueStateWritable value) {
- posIterator = value.getOutgoingList().iterator();
+ if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
+ posIterator = value.getFFList().iterator();
+ else // #FRList() > 0
+ posIterator = value.getFRList().iterator();
return posIterator.next();
}
public PositionWritable getPreDestVertexId(ValueStateWritable value) {
- posIterator = value.getIncomingList().iterator();
+ if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
+ posIterator = value.getRFList().iterator();
+ else // #RRList() > 0
+ posIterator = value.getRRList().iterator();
return posIterator.next();
}
@@ -93,7 +99,12 @@
* head send message to all next nodes
*/
public void sendMsgToAllNextNodes(ValueStateWritable value) {
- posIterator = value.getOutgoingList().iterator();
+ posIterator = value.getFFList().iterator(); // FFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFRList().iterator(); // FRList
while(posIterator.hasNext()){
destVertexId.set(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
@@ -104,7 +115,12 @@
* head send message to all previous nodes
*/
public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
- posIterator = value.getIncomingList().iterator();
+ posIterator = value.getRFList().iterator(); // RFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getRRList().iterator(); // RRList
while(posIterator.hasNext()){
destVertexId.set(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
index 6f4354a..8a03aa7 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
@@ -87,12 +87,18 @@
* get destination vertex
*/
public PositionWritable getNextDestVertexId(ValueStateWritable value) {
- posIterator = value.getOutgoingList().iterator();
+ if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
+ posIterator = value.getFFList().iterator();
+ else // #FRList() > 0
+ posIterator = value.getFRList().iterator();
return posIterator.next();
}
public PositionWritable getPreDestVertexId(ValueStateWritable value) {
- posIterator = value.getIncomingList().iterator();
+ if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
+ posIterator = value.getRFList().iterator();
+ else // #RRList() > 0
+ posIterator = value.getRRList().iterator();
return posIterator.next();
}
@@ -100,7 +106,12 @@
* head send message to all next nodes
*/
public void sendMsgToAllNextNodes(ValueStateWritable value) {
- posIterator = value.getOutgoingList().iterator();
+ posIterator = value.getFFList().iterator(); // FFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFRList().iterator(); // FRList
while(posIterator.hasNext()){
destVertexId.set(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
@@ -111,7 +122,12 @@
* head send message to all previous nodes
*/
public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
- posIterator = value.getIncomingList().iterator();
+ posIterator = value.getRFList().iterator(); // RFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getRRList().iterator(); // RRList
while(posIterator.hasNext()){
destVertexId.set(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
new file mode 100644
index 0000000..b81491b
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
@@ -0,0 +1,58 @@
+package edu.uci.ics.genomix.pregelix.sequencefile;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class ConvertNodeToIdValue {
+
+ public static void convert(Path inFile, Path outFile)
+ throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fileSys = FileSystem.get(conf);
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, outFile, PositionWritable.class,
+ ValueStateWritable.class, CompressionType.NONE);
+ NodeWritable node = new NodeWritable();
+ NullWritable value = NullWritable.get();
+ PositionWritable outputKey = new PositionWritable();
+ ValueStateWritable outputValue = new ValueStateWritable();
+
+ while(reader.next(node, value)) {
+ System.out.println(node.getNodeID().toString());
+ outputKey.set(node.getNodeID());
+ outputValue.setFFList(node.getFFList());
+ outputValue.setFRList(node.getFRList());
+ outputValue.setRFList(node.getRFList());
+ outputValue.setRRList(node.getRRList());
+ outputValue.setMergeChain(node.getKmer());
+ outputValue.setState(State.NON_VERTEX);
+ writer.append(outputKey, outputValue);
+ }
+ writer.close();
+ reader.close();
+ }
+
+ public static void main(String[] args) throws IOException {
+ Path dir = new Path("data/test");
+ Path outDir = new Path("data/input");
+ FileUtils.cleanDirectory(new File("data/input"));
+ Path inFile = new Path(dir, "result.graphbuild.txt.bin");
+ Path outFile = new Path(outDir, "out");
+ convert(inFile,outFile);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index f2a61be..1740744 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -10,9 +10,7 @@
* @param vertexValue
*/
public static boolean isPathVertex(ValueStateWritable value) {
- if (value.inDegree() == 1 && value.outDegree() == 1)
- return true;
- return false;
+ return value.inDegree() == 1 && value.outDegree() == 1;
}
/**
@@ -21,9 +19,7 @@
* @param vertexValue
*/
public static boolean isHeadVertex(ValueStateWritable value) {
- if (value.outDegree() > 0 && !isPathVertex(value))
- return true;
- return false;
+ return value.outDegree() > 0 && !isPathVertex(value);
}
/**
@@ -32,9 +28,7 @@
* @param vertexValue
*/
public static boolean isRearVertex(ValueStateWritable value) {
- if (value.inDegree() > 0 && !isPathVertex(value))
- return true;
- return false;
+ return value.inDegree() > 0 && !isPathVertex(value);
}
/**
@@ -42,9 +36,7 @@
*/
public static boolean isCycle(KmerBytesWritable kmer, KmerBytesWritable mergeChain, int kmerSize) {
String chain = mergeChain.toString().substring(1);
- if (chain.contains(kmer.toString()))
- return true;
- return false;
+ return chain.contains(kmer.toString());
/*subKmer.set(vertexId);
for(int istart = 1; istart < mergeChain.getKmerLength() - kmerSize + 1; istart++){
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index f1fcdac..cdc97c1 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -73,8 +73,8 @@
public static void main(String[] args) throws IOException {
genNaiveAlgorithmForMergeGraph();
- genLogAlgorithmForMergeGraph();
- genP3ForMergeGraph();
+ //genLogAlgorithmForMergeGraph();
+ //genP3ForMergeGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
index c8aaa84..a8f5a81 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
@@ -43,9 +43,9 @@
public class PathMergeSmallTestSuite extends TestSuite {
private static final Logger LOGGER = Logger.getLogger(PathMergeSmallTestSuite.class.getName());
- public static final String PreFix = "data/PathTestSet"; //"graphbuildresult";
+ public static final String PreFix = "data/input"; //"graphbuildresult";
public static final String[] TestDir = { PreFix + File.separator
- + "LongPath"};/*, PreFix + File.separator
+ + "test"};/*, PreFix + File.separator
/*+ "CyclePath"};, PreFix + File.separator
+ "SimplePath", PreFix + File.separator
+ "SinglePath", PreFix + File.separator