Merge branch 'anbangx/fullstack_bidirection' of https://code.google.com/p/hyracks into anbangx/fullstack_bidirection
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index 11d0998..34fb0f6 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -132,6 +132,9 @@
public static int getCountByDataLength(int length) {
if (length % PositionWritable.LENGTH != 0) {
+ for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
+ System.out.println(ste);
+ }
throw new IllegalArgumentException("Length of positionlist is invalid");
}
return length / PositionWritable.LENGTH;
diff --git a/genomix/genomix-hadoop/pom.xml b/genomix/genomix-hadoop/pom.xml
index fe83c3c..610092a 100755
--- a/genomix/genomix-hadoop/pom.xml
+++ b/genomix/genomix-hadoop/pom.xml
@@ -156,5 +156,23 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>genomix-hyracks</artifactId>
+ <version>0.2.6-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>0.2.6-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>0.2.6-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
new file mode 100644
index 0000000..b405161
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
@@ -0,0 +1,215 @@
+package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.test.TestUtils;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
+
+@SuppressWarnings("deprecation")
+public class TestPathMergeH3 {
+ private static final int KMER_LENGTH = 5;
+ private static final int READ_LENGTH = 8;
+
+ private static final String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
+ private static final String HDFS_SEQUENCE = "/00-sequence/";
+ private static final String HDFS_GRAPHBUILD = "/01-graphbuild/";
+ private static final String HDFS_MERGED = "/02-graphmerge/";
+
+ private static final String EXPECTED_ROOT = "src/test/resources/expected/";
+ private static final String ACTUAL_ROOT = "src/test/resources/actual/";
+ private static final String GRAPHBUILD_FILE = "result.graphbuild.txt";
+ private static final String PATHMERGE_FILE = "result.mergepath.txt";
+
+ private static final String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
+
+ private MiniDFSCluster dfsCluster;
+
+ private static JobConf conf = new JobConf();
+ private int numberOfNC = 2;
+ private int numPartitionPerMachine = 1;
+
+ private Driver driver;
+
+ @Test
+ public void TestBuildGraph() throws Exception {
+ copySequenceToDFS();
+ buildGraph();
+ }
+
+// @Test
+ public void TestMergeOneIteration() throws Exception {
+ copySequenceToDFS();
+ buildGraph();
+ MergePathsH3Driver h3 = new MergePathsH3Driver();
+ h3.run(HDFS_GRAPHBUILD, HDFS_MERGED, 2, KMER_LENGTH, 1, null, conf);
+ copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, conf);
+ }
+
+
+
+ public void buildGraph() throws Exception {
+ FileInputFormat.setInputPaths(conf, HDFS_SEQUENCE);
+ FileOutputFormat.setOutputPath(conf, new Path(HDFS_GRAPHBUILD));
+ conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
+ conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+ driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+ copyResultsToLocal(HDFS_GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD_FILE, conf);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ cleanupStores();
+ HyracksUtils.init();
+ FileUtils.forceMkdir(new File(ACTUAL_ROOT));
+ FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
+ startHDFS();
+
+ conf.setInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH);
+ conf.setInt(GenomixJobConf.READ_LENGTH, READ_LENGTH);
+ driver = new Driver(HyracksUtils.CC_HOST,
+ HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
+ }
+
+ /*
+ * Merge and copy a DFS directory to a local destination, converting to text if necessary. Also locally store the binary-formatted result if available.
+ */
+ private static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, Configuration conf) throws IOException {
+ String fileFormat = conf.get(GenomixJobConf.OUTPUT_FORMAT);
+ if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(fileFormat)) {
+ // for text files, just concatenate them together
+ FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
+ FileSystem.getLocal(new Configuration()), new Path(localDestFile),
+ false, conf, null);
+ } else {
+ // file is binary
+ // merge and store the binary format
+ FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
+ FileSystem.getLocal(new Configuration()), new Path(localDestFile + ".bin"),
+ false, conf, null);
+ // load the Node's and write them out as text locally
+ FileSystem.getLocal(new Configuration()).mkdirs(new Path(localDestFile).getParent());
+ File filePathTo = new File(localDestFile);
+ BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+ for (int i=0; i < java.lang.Integer.MAX_VALUE; i++) {
+ Path path = new Path(hdfsSrcDir + "part-" + i);
+ FileSystem dfs = FileSystem.get(conf);
+ if (!dfs.exists(path)) {
+ break;
+ }
+ if (dfs.getFileStatus(path).getLen() == 0) {
+ continue;
+ }
+ SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
+ NodeWritable key = new NodeWritable(conf.getInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH));
+ NullWritable value = NullWritable.get();
+ while (reader.next(key, value)) {
+ if (key == null || value == null) {
+ break;
+ }
+ bw.write(key.toString() + "\t" + value.toString());
+ System.out.println(key.toString() + "\t" + value.toString());
+ bw.newLine();
+ }
+ reader.close();
+ }
+ bw.close();
+ }
+
+ }
+
+ private boolean checkResults(String expectedPath, String actualPath, int[] poslistField) throws Exception {
+ File dumped = new File(actualPath);
+ if (poslistField != null) {
+ TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
+ } else {
+ TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+ }
+ return true;
+ }
+
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(HADOOP_CONF_ROOT + "core-site.xml"));
+ conf.addResource(new Path(HADOOP_CONF_ROOT + "mapred-site.xml"));
+ conf.addResource(new Path(HADOOP_CONF_ROOT + "hdfs-site.xml"));
+
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_ROOT + "conf.xml")));
+ conf.writeXml(confOutput);
+ confOutput.close();
+ }
+
+ private void copySequenceToDFS() throws IOException {
+ FileSystem dfs = FileSystem.get(conf);
+ Path src = new Path(LOCAL_SEQUENCE_FILE);
+ Path dest = new Path(HDFS_SEQUENCE);
+ dfs.mkdirs(dest);
+ // dfs.mkdirs(result);
+ dfs.copyFromLocalFile(src, dest);
+ }
+
+ @BeforeClass
+ public static void cleanUpEntry() throws IOException {
+ // local cleanup
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ if (lfs.exists(new Path(ACTUAL_ROOT))) {
+ lfs.delete(new Path(ACTUAL_ROOT), true);
+ }
+ // dfs cleanup
+ FileSystem dfs = FileSystem.get(conf);
+ String[] paths = {HDFS_SEQUENCE, HDFS_GRAPHBUILD, HDFS_MERGED};
+ for (String path : paths) {
+ if (dfs.exists(new Path(path))) {
+ dfs.delete(new Path(path), true);
+ }
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ HyracksUtils.deinit();
+ cleanupHDFS();
+ }
+
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
+}
diff --git a/genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt b/genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt
new file mode 100755
index 0000000..13190dd
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt
@@ -0,0 +1,6 @@
+1 AATAGAAG
+2 AATAGAAG
+3 AATAGAAG
+4 AATAGAAG
+5 AATAGAAG
+6 AGAAGAAG
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/core-site.xml b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/core-site.xml
new file mode 100644
index 0000000..3e5bacb
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/core-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+ <property>
+ <name>fs.default.name</name>
+ <value>hdfs://127.0.0.1:31888</value>
+ </property>
+ <property>
+ <name>hadoop.tmp.dir</name>
+ <value>/tmp/hadoop</value>
+ </property>
+
+
+</configuration>
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/hdfs-site.xml b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/hdfs-site.xml
new file mode 100644
index 0000000..b1b1902
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/hdfs-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+ <property>
+ <name>dfs.replication</name>
+ <value>1</value>
+ </property>
+
+ <property>
+ <name>dfs.block.size</name>
+ <value>65536</value>
+ </property>
+
+</configuration>
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/log4j.properties b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/log4j.properties
new file mode 100755
index 0000000..d5e6004
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/log4j.properties
@@ -0,0 +1,94 @@
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=FATAL,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=FATAL
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Rolling File Appender
+#
+
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# FSNamesystem Audit logging
+# All audit events are logged at INFO level
+#
+log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+# Jets3t library
+log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/mapred-site.xml b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/mapred-site.xml
new file mode 100644
index 0000000..525e7d5
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/mapred-site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+ <property>
+ <name>mapred.job.tracker</name>
+ <value>localhost:29007</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.map.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.reduce.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.max.split.size</name>
+ <value>2048</value>
+ </property>
+
+</configuration>
diff --git a/genomix/genomix-hyracks/pom.xml b/genomix/genomix-hyracks/pom.xml
index ebc49f2..8377df6 100644
--- a/genomix/genomix-hyracks/pom.xml
+++ b/genomix/genomix-hyracks/pom.xml
@@ -35,7 +35,7 @@
<configuration>
<programs>
<program>
- <mainClass>edu.uci.ics.genomix.driver.Driver</mainClass>
+ <mainClass>edu.uci.ics.genomix.hyracks.driver.Driver</mainClass>
<name>genomix</name>
</program>
<program>
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
index f625143..8609519 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
@@ -137,8 +137,10 @@
private void appendNodeToBuilder(int tIndex, PositionReference pos, ArrayBackedValueStorage posList2,
ArrayTupleBuilder builder2) {
try {
+ builder2.reset();
builder2.addField(pos.getByteArray(), pos.getStartOffset(), PositionReference.INTBYTES);
builder2.addField(pos.getByteArray(), pos.getStartOffset() + PositionReference.INTBYTES, 1);
+
if (posList2 == null) {
builder2.addFieldEndOffset();
} else {
@@ -163,7 +165,6 @@
throw new IllegalStateException();
}
}
- builder2.reset();
} catch (HyracksDataException e) {
throw new IllegalStateException(
"Failed to Add a field to the tuple by copying the data bytes from a byte array.");
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
index a775d7e..98d21af 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -247,6 +247,7 @@
return;
}
try {
+ builder.reset();
builder.addField(node.getNodeID().getByteArray(), node.getNodeID().getStartOffset(), node.getNodeID()
.getLength());
builder.getDataOutput().writeInt(node.getCount());
@@ -268,7 +269,6 @@
throw new IllegalStateException("Failed to append tuplebuilder to frame");
}
}
- builder.reset();
} catch (IOException e) {
throw new IllegalStateException("Failed to Add a field to the tupleBuilder.");
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index fe8224f..3667d43 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -45,19 +45,17 @@
public static final int OutputKmerField = 0;
public static final int OutputPosition = 1;
- private KmerBytesWritable kmer;
- private PositionReference pos;
- private boolean bReversed;
+ private final boolean bReversed;
private final int readLength;
+ private final int kmerSize;
public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
null });
public ReadsKeyValueParserFactory(int readlength, int k, boolean bGenerateReversed) {
bReversed = bGenerateReversed;
- kmer = new KmerBytesWritable(k);
- pos = new PositionReference();
this.readLength = readlength;
+ this.kmerSize = k;
}
@Override
@@ -69,6 +67,9 @@
return new IKeyValueParser<LongWritable, Text>() {
+ private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+ private PositionReference pos = new PositionReference();
+
@Override
public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
String[] geneLine = value.toString().split("\\t"); // Read the Real Gene Line
@@ -97,17 +98,16 @@
private void SplitReads(int readID, byte[] array, IFrameWriter writer) {
/** first kmer */
- int k = kmer.getKmerLength();
- if (k >= array.length) {
+ if (kmerSize >= array.length) {
return;
}
kmer.setByRead(array, 0);
InsertToFrame(kmer, readID, 1, writer);
/** middle kmer */
- for (int i = k; i < array.length; i++) {
+ for (int i = kmerSize; i < array.length; i++) {
kmer.shiftKmerWithNextChar(array[i]);
- InsertToFrame(kmer, readID, i - k + 2, writer);
+ InsertToFrame(kmer, readID, i - kmerSize + 2, writer);
}
if (bReversed) {
@@ -115,9 +115,9 @@
kmer.setByReadReverse(array, 0);
InsertToFrame(kmer, readID, -1, writer);
/** middle kmer */
- for (int i = k; i < array.length; i++) {
+ for (int i = kmerSize; i < array.length; i++) {
kmer.shiftKmerWithPreCode(GeneCode.getPairedCodeFromSymbol(array[i]));
- InsertToFrame(kmer, readID, -(i - k + 2), writer);
+ InsertToFrame(kmer, readID, -(i - kmerSize + 2), writer);
}
}
}
@@ -149,8 +149,6 @@
@Override
public void open(IFrameWriter writer) throws HyracksDataException {
- // TODO Auto-generated method stub
-
}
@Override
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
index 0f791a1..eab197e 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -18,6 +18,9 @@
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -31,11 +34,12 @@
public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
-
+ private static final Log LOG = LogFactory.getLog(MergeKmerAggregateFactory.class);
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
throws HyracksDataException {
+ final int frameSize = ctx.getFrameSize();
return new IAggregatorDescriptor() {
private PositionReference position = new PositionReference();
@@ -90,6 +94,9 @@
DataOutput fieldOutput = tupleBuilder.getDataOutput();
ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
try {
+ if (inputVal.getLength() > frameSize/2){
+ LOG.warn("MergeKmer: output data size is too big: " + inputVal.getLength());
+ }
fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
tupleBuilder.addFieldEndOffset();
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
index 0d51035..974dead 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -4,6 +4,9 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -22,6 +25,7 @@
private static final long serialVersionUID = 1L;
private final int ValidPosCount;
+ private static final Log LOG = LogFactory.getLog(MergeReadIDAggregateFactory.class);
public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
ValidPosCount = getPositionCount(readLength, kmerLength);
@@ -45,6 +49,7 @@
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
throws HyracksDataException {
+ final int frameSize = ctx.getFrameSize();
return new IAggregatorDescriptor() {
class PositionArray {
@@ -168,6 +173,7 @@
}
DataOutput fieldOutput = tupleBuilder.getDataOutput();
try {
+ int totalSize = 0;
for (int i = 0; i < ValidPosCount; i++) {
fieldOutput.write(positionArray.forwardStorages[i].getByteArray(),
positionArray.forwardStorages[i].getStartOffset(),
@@ -178,6 +184,22 @@
positionArray.reverseStorages[i].getStartOffset(),
positionArray.reverseStorages[i].getLength());
tupleBuilder.addFieldEndOffset();
+
+ totalSize += positionArray.forwardStorages[i].getLength()
+ + positionArray.reverseStorages[i].getLength();
+ }
+ if (totalSize > frameSize / 2) {
+ int leadbyte = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+ int readID = accessor.getBuffer().getInt(
+ leadbyte + accessor.getFieldStartOffset(tIndex, InputReadIDField));
+ LOG.warn("MergeReadID on read:" + readID + " is of size: " + totalSize + ", current frameSize:"
+ + frameSize + "\n Recommendate to enlarge the FrameSize");
+ }
+ if (totalSize > frameSize){
+ for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
+ System.out.println(ste);
+ }
+ throw new HyracksDataException("Data is too long");
}
} catch (IOException e) {
throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
index f614846..7ea065d 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
@@ -33,15 +33,15 @@
*
*/
private static final long serialVersionUID = 1L;
- private KmerBytesWritable kmer;
- private PositionListWritable plist;
-
+
+ private final int kmerSize;
public KMerTextWriterFactory(int k) {
- kmer = new KmerBytesWritable(k);
- plist = new PositionListWritable();
+ kmerSize =k;
}
public class TupleWriter implements ITupleWriter {
+ private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+ private PositionListWritable plist = new PositionListWritable();
@Override
public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
try {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
index 9fb2d04..00409ef 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
@@ -4,6 +4,7 @@
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Writer;
@@ -59,7 +60,7 @@
@Override
public void open(DataOutput output) throws HyracksDataException {
try {
- writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, NodeWritable.class, null,
+ writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, NodeWritable.class, NullWritable.class,
CompressionType.NONE, null);
} catch (IOException e) {
throw new HyracksDataException(e);
@@ -85,7 +86,7 @@
tuple.getFieldData(InputKmerBytesField), tuple.getFieldStart(InputKmerBytesField));
try {
- writer.append(node, null);
+ writer.append(node, NullWritable.get());
} catch (IOException e) {
throw new HyracksDataException(e);
}
@@ -93,8 +94,6 @@
@Override
public void close(DataOutput output) throws HyracksDataException {
- // TODO Auto-generated method stub
-
}
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
index 2d420c3..1f12bb5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
@@ -50,7 +50,7 @@
public static final int DEFAULT_KMERLEN = 21;
public static final int DEFAULT_READLEN = 124;
- public static final int DEFAULT_FRAME_SIZE = 32768;
+ public static final int DEFAULT_FRAME_SIZE = 128*1024;
public static final int DEFAULT_FRAME_LIMIT = 4096;
public static final int DEFAULT_TABLE_SIZE = 10485767;
public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
index c6a69d4..28e4ff5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
@@ -56,13 +56,15 @@
*
*/
private static final long serialVersionUID = 1L;
- private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
- private PositionWritable pos = new PositionWritable();
+
@Override
public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
return new ITupleWriter(){
+ private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+ private PositionWritable pos = new PositionWritable();
+
@Override
public void open(DataOutput output) throws HyracksDataException {
// TODO Auto-generated method stub
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
index 968123e..2872a2d 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
@@ -76,10 +76,11 @@
tuple.getFieldData(MapKmerPositionToReadOperator.OutputOtherReadIDListField),
tuple.getFieldStart(MapKmerPositionToReadOperator.OutputOtherReadIDListField));
+
String kmerString = "";
if (posInRead > 0) {
if (kmer.getLength() > tuple
- .getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
+ .getFieldLength(MapKmerPositionToReadOperator.OutputKmerField)) {
throw new IllegalArgumentException("Not enough kmer bytes");
}
kmer.setNewReference(
diff --git a/genomix/genomix-hyracks/src/main/resources/scripts/startDebugNc.sh b/genomix/genomix-hyracks/src/main/resources/scripts/startDebugNc.sh
index c335475..5f89bcc 100644
--- a/genomix/genomix-hyracks/src/main/resources/scripts/startDebugNc.sh
+++ b/genomix/genomix-hyracks/src/main/resources/scripts/startDebugNc.sh
@@ -12,11 +12,11 @@
#Clean up temp dir
#rm -rf $NCTMP_DIR2
-mkdir $NCTMP_DIR2
+mkdir -p $NCTMP_DIR2
#Clean up log dir
#rm -rf $NCLOGS_DIR2
-mkdir $NCLOGS_DIR2
+mkdir -p $NCLOGS_DIR2
#Clean up I/O working dir
@@ -35,16 +35,14 @@
#Get node ID
NODEID=`hostname | cut -d '.' -f 1`
-NODEID=${NODEID}2
#Set JAVA_OPTS
export JAVA_OPTS=$NCJAVA_OPTS2
-cd $HYRACKS_HOME
-HYRACKS_HOME=`pwd`
+GENOMIX_HOME=`pwd`
#Enter the temp dir
cd $NCTMP_DIR2
#Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS2}" &> $NCLOGS_DIR2/$NODEID.log &
+${GENOMIX_HOME}/bin/genomixnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 68258d7..ec95aa6 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -15,6 +15,7 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
@@ -26,7 +27,7 @@
import edu.uci.ics.genomix.hyracks.driver.Driver;
import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
@SuppressWarnings("deprecation")
public class JobRunStepByStepTest {
@@ -53,16 +54,16 @@
private JobConf conf = new JobConf();
private int numberOfNC = 2;
- private int numPartitionPerMachine = 1;
+ private int numPartitionPerMachine = 2;
private Driver driver;
@Test
public void TestAll() throws Exception {
-// TestReader();
-// TestGroupbyKmer();
-// TestMapKmerToRead();
-// TestGroupByReadID();
+ TestReader();
+ TestGroupbyKmer();
+ TestMapKmerToRead();
+ TestGroupByReadID();
TestEndToEnd();
}
@@ -97,7 +98,8 @@
}
public void TestEndToEnd() throws Exception {
- conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+ //conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+ conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
cleanUpReEntry();
conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
@@ -187,18 +189,14 @@
}
SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
- // KmerBytesWritable key = (KmerBytesWritable)
- // ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJobConf.KMER_LENGTH, KmerSize));
- // KmerCountValue value = (KmerCountValue)
- // ReflectionUtils.newInstance(reader.getValueClass(), conf);
- KmerBytesWritable value = null;
- while (reader.next(key, value)) {
- if (key == null || value == null) {
+ NodeWritable node = new NodeWritable(conf.getInt(GenomixJobConf.KMER_LENGTH, KmerSize));
+ NullWritable value = NullWritable.get();
+ while (reader.next(node, value)) {
+ if (node == null) {
break;
}
- bw.write(key.toString() + "\t" + value.toString());
- System.out.println(key.toString() + "\t" + value.toString());
+ bw.write(node.toString() );
+ System.out.println(node.toString());
bw.newLine();
}
reader.close();
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
index 6e6a504..d22bd0c 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
@@ -164,6 +164,9 @@
}
private static boolean containStrings(String lineExpected, String actualLine, int[] poslistField) {
+// if (lineExpected.equals(actualLine)){
+// return true;
+// }
String[] fieldsExp = lineExpected.split("\\\t");
String[] fieldsAct = actualLine.split("\\\t");
if (fieldsAct.length != fieldsExp.length) {