cleanning continue
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
new file mode 100644
index 0000000..714d77e
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
@@ -0,0 +1,192 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.test.TestUtils;
+import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
+
+/*
+ * A base class providing most of the boilerplate for Genomix-based tests
+ */
+@SuppressWarnings("deprecation")
+public class GenomixMiniClusterTest {
+ protected int KMER_LENGTH = 5;
+ protected int READ_LENGTH = 8;
+
+ // subclass should modify this to include the HDFS directories that should be cleaned up
+ protected ArrayList<String> HDFS_PATHS = new ArrayList<String>();
+
+ protected static String EXPECTED_ROOT = "src/test/resources/expected/";
+ protected static String ACTUAL_ROOT = "src/test/resources/actual/";
+
+ protected static String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
+
+ protected static MiniDFSCluster dfsCluster;
+ protected static MiniMRCluster mrCluster;
+ private static FileSystem dfs;
+ protected static JobConf conf = new JobConf();
+ protected static int numberOfNC = 1;
+ protected static int numPartitionPerMachine = 1;
+ protected static Driver driver;
+
+ @BeforeClass
+ public static void setUpMiniCluster() throws Exception {
+ cleanupStores();
+ startHDFS();
+ HyracksUtils.init();
+ FileUtils.forceMkdir(new File(ACTUAL_ROOT));
+ FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
+ driver = new Driver(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
+ }
+
+ /*
+ * Merge and copy a DFS directory to a local destination, converting to text if necessary.
+ * Also locally store the binary-formatted result if available.
+ */
+ protected static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
+ Configuration conf) throws IOException {
+ if (resultsAreText) {
+ // for text files, just concatenate them together
+ FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+ new Path(localDestFile), false, conf, null);
+ } else {
+ // file is binary
+// // save the entire binary output dir
+// FileUtil.copy(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+// new Path(localDestFile + ".bindir"), false, conf);
+
+ // also load the Nodes and write them out as text locally.
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.mkdirs(new Path(localDestFile).getParent());
+ File filePathTo = new File(localDestFile);
+ BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+
+ FileStatus[] files = dfs.globStatus(new Path(hdfsSrcDir + "*"));
+ SequenceFile.Reader reader = new SequenceFile.Reader(dfs, files[0].getPath(), conf);
+ String destBinDir = localDestFile.substring(0, localDestFile.lastIndexOf("."));
+// FileUtil.copy(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+// new Path(destBinDir), false, conf);
+ SequenceFile.Writer writer = new SequenceFile.Writer(lfs, new JobConf(), new Path(localDestFile
+ + ".binmerge"), reader.getKeyClass(), reader.getValueClass());
+
+ Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+ for (FileStatus f : files) {
+ if (f.getLen() == 0) {
+ continue;
+ }
+ reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
+ while (reader.next(key, value)) {
+ if (key == null || value == null) {
+ break;
+ }
+ bw.write(key.toString() + "\t" + value.toString());
+ System.out.println(key.toString() + "\t" + value.toString());
+ bw.newLine();
+ writer.append(key, value);
+
+ }
+ reader.close();
+ }
+ writer.close();
+ bw.close();
+ }
+
+ }
+
+ protected static boolean checkResults(String expectedPath, String actualPath, int[] poslistField) throws Exception {
+ File dumped = new File(actualPath);
+ if (poslistField != null) {
+ TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
+ } else {
+ TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+ }
+ return true;
+ }
+
+ protected static void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ protected static void startHDFS() throws IOException {
+ conf.addResource(new Path(HADOOP_CONF_ROOT + "core-site.xml"));
+ // conf.addResource(new Path(HADOOP_CONF_ROOT + "mapred-site.xml"));
+ conf.addResource(new Path(HADOOP_CONF_ROOT + "hdfs-site.xml"));
+
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ dfs = dfsCluster.getFileSystem();
+ mrCluster = new MiniMRCluster(4, dfs.getUri().toString(), 2);
+ System.out.println(dfs.getUri().toString());
+
+ DataOutputStream confOutput = new DataOutputStream(
+ new FileOutputStream(new File(HADOOP_CONF_ROOT + "conf.xml")));
+ conf.writeXml(confOutput);
+ confOutput.close();
+ }
+
+ protected static void copyLocalToDFS(String localSrc, String hdfsDest) throws IOException {
+ Path dest = new Path(hdfsDest);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(new Path(localSrc), dest);
+ }
+
+ /*
+ * Remove the local "actual" folder and any hdfs folders in use by this test
+ */
+ public void cleanUpOutput() throws IOException {
+// // local cleanup
+// FileSystem lfs = FileSystem.getLocal(new Configuration());
+// if (lfs.exists(new Path(ACTUAL_ROOT))) {
+// lfs.delete(new Path(ACTUAL_ROOT), true);
+// }
+ // dfs cleanup
+ for (String path : HDFS_PATHS) {
+ if (dfs.exists(new Path(path))) {
+ dfs.delete(new Path(path), true);
+ }
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ HyracksUtils.deinit();
+ cleanupHDFS();
+ }
+
+ protected static void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ mrCluster.shutdown();
+ }
+}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
new file mode 100644
index 0000000..be53c5e
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
@@ -0,0 +1,248 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+//import edu.uci.ics.genomix.hadoop.velvetgraphbuilding.GraphBuildingDriver;
+import edu.uci.ics.genomix.hyracks.test.TestUtils;
+
+/*
+ * A base class providing most of the boilerplate for Hadoop-based tests
+ */
+@SuppressWarnings("deprecation")
+public class HadoopMiniClusterTest {
+ protected int KMER_LENGTH = 5;
+ protected int READ_LENGTH = 8;
+
+ // subclass should modify this to include the HDFS directories that should be cleaned up
+ protected ArrayList<String> HDFS_PATHS = new ArrayList<String>();
+
+ protected static final String EXPECTED_ROOT = "src/test/resources/expected/";
+ protected static final String ACTUAL_ROOT = "src/test/resources/actual/";
+ protected static final String INPUT_ROOT = "src/test/resources/input/";
+
+ protected static String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
+ protected static String HADOOP_CONF = HADOOP_CONF_ROOT + "conf.xml";
+
+ protected static MiniDFSCluster dfsCluster;
+ protected static MiniMRCluster mrCluster;
+ protected static FileSystem dfs;
+ protected static JobConf conf = new JobConf();
+ protected static int numberOfNC = 1;
+ protected static int numPartitionPerMachine = 1;
+
+ @BeforeClass
+ public static void setUpMiniCluster() throws Exception {
+ cleanupStores();
+ startHDFS();
+ FileUtils.forceMkdir(new File(ACTUAL_ROOT));
+ FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
+ }
+
+ protected static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
+ Configuration conf) throws IOException {
+ copyResultsToLocal(hdfsSrcDir, localDestFile, resultsAreText, conf, true);
+ }
+
+ public static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
+ Configuration conf, boolean ignoreZeroOutputs) throws IOException {
+ copyResultsToLocal(hdfsSrcDir, localDestFile, resultsAreText,
+ conf, ignoreZeroOutputs, dfs);
+ }
+
+ /*
+ * Merge and copy a DFS directory to a local destination, converting to text if necessary.
+ * Also locally store the binary-formatted result if available.
+ */
+ public static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
+ Configuration conf, boolean ignoreZeroOutputs, FileSystem dfs) throws IOException {
+ if (resultsAreText) {
+ // for text files, just concatenate them together
+ FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+ new Path(localDestFile), false, conf, null);
+ } else {
+ // file is binary
+ // save the entire binary output dir
+ FileUtil.copy(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+ new Path(localDestFile + ".bindir"), false, conf);
+
+ // chomp through output files
+ FileStatus[] files = ArrayUtils.addAll(dfs.globStatus(new Path(hdfsSrcDir + "*")), dfs.globStatus(new Path(hdfsSrcDir + "*/*")));
+ FileStatus validFile = null;
+ for (FileStatus f : files) {
+ if (f.getLen() != 0) {
+ validFile = f;
+ break;
+ }
+ }
+ if (validFile == null) {
+ if (ignoreZeroOutputs) {
+ // just make a dummy output dir
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.mkdirs(new Path(localDestFile).getParent());
+ return;
+ }
+ else {
+ throw new IOException("No non-zero outputs in source directory " + hdfsSrcDir);
+ }
+ }
+
+ // also load the Nodes and write them out as text locally.
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.mkdirs(new Path(localDestFile).getParent());
+ File filePathTo = new File(localDestFile);
+ if (filePathTo.exists() && filePathTo.isDirectory()) {
+ filePathTo = new File(localDestFile + "/data");
+ }
+ BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+ SequenceFile.Reader reader = new SequenceFile.Reader(dfs, validFile.getPath(), conf);
+ SequenceFile.Writer writer = new SequenceFile.Writer(lfs, new JobConf(), new Path(localDestFile
+ + ".binmerge"), reader.getKeyClass(), reader.getValueClass());
+
+ Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+ for (FileStatus f : files) {
+ if (f.getLen() == 0) {
+ continue;
+ }
+ reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
+ while (reader.next(key, value)) {
+ if (key == null || value == null) {
+ break;
+ }
+ bw.write(key.toString() + "\t" + value.toString());
+ System.out.println(key.toString() + "\t" + value.toString());
+ bw.newLine();
+ writer.append(key, value);
+
+ }
+ reader.close();
+ }
+ writer.close();
+ bw.close();
+ }
+
+ }
+
+ protected static boolean checkResults(String expectedPath, String actualPath, int[] poslistField) throws Exception {
+ File dumped = new File(actualPath);
+ if (poslistField != null) {
+ TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
+ } else {
+ TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+ }
+ return true;
+ }
+
+ protected static void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ protected static void startHDFS() throws IOException {
+// conf.addResource(new Path(HADOOP_CONF_ROOT + "core-site.xml"));
+ // conf.addResource(new Path(HADOOP_CONF_ROOT + "mapred-site.xml"));
+// conf.addResource(new Path(HADOOP_CONF_ROOT + "hdfs-site.xml"));
+
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ dfs = dfsCluster.getFileSystem();
+ mrCluster = new MiniMRCluster(4, dfs.getUri().toString(), 2);
+ System.out.println(dfs.getUri().toString());
+
+ DataOutputStream confOutput = new DataOutputStream(
+ new FileOutputStream(new File(HADOOP_CONF)));
+ conf.writeXml(confOutput);
+ confOutput.close();
+ }
+
+ protected static void copyLocalToDFS(String localSrc, String hdfsDest) throws IOException {
+ Path dest = new Path(hdfsDest);
+ dfs.mkdirs(dest);
+ System.out.println("copying from " + localSrc + " to " + dest);
+ for (File f : new File(localSrc).listFiles()) {
+ dfs.copyFromLocalFile(new Path(f.getAbsolutePath()), dest);
+ }
+ }
+
+ /*
+ * Remove the local "actual" folder and any hdfs folders in use by this test
+ */
+ public void cleanUpOutput() throws IOException {
+ // local cleanup
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ if (lfs.exists(new Path(ACTUAL_ROOT))) {
+ lfs.delete(new Path(ACTUAL_ROOT), true);
+ }
+ // dfs cleanup
+ for (String path : HDFS_PATHS) {
+ if (dfs.exists(new Path(path))) {
+ dfs.delete(new Path(path), true);
+ }
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ cleanupHDFS();
+ }
+
+ protected static void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ mrCluster.shutdown();
+ }
+
+// public void buildGraph() throws IOException {
+// JobConf buildConf = new JobConf(conf); // use a separate conf so we don't interfere with other jobs
+// FileInputFormat.setInputPaths(buildConf, SEQUENCE);
+// FileOutputFormat.setOutputPath(buildConf, new Path(INPUT_GRAPH));
+//
+// GraphBuildingDriver tldriver = new GraphBuildingDriver();
+// tldriver.run(SEQUENCE, INPUT_GRAPH, 2, kmerByteSize, READ_LENGTH, false, true, HADOOP_CONF_ROOT + "conf.xml");
+//
+// boolean resultsAreText = true;
+// copyResultsToLocal(INPUT_GRAPH, ACTUAL_ROOT + INPUT_GRAPH, resultsAreText, buildConf);
+// }
+//
+// private void prepareGraph() throws IOException {
+// if (regenerateGraph) {
+// copyLocalToDFS(LOCAL_SEQUENCE_FILE, SEQUENCE);
+// buildGraph();
+// copyLocalToDFS(ACTUAL_ROOT + INPUT_GRAPH + readsFile + ".binmerge", INPUT_GRAPH);
+// } else {
+// copyLocalToDFS(EXPECTED_ROOT + INPUT_GRAPH + readsFile + ".binmerge", INPUT_GRAPH);
+// }
+// }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java
new file mode 100644
index 0000000..0457de9
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.data.std.api.AbstractPointable;
+import edu.uci.ics.hyracks.data.std.api.IComparable;
+import edu.uci.ics.hyracks.data.std.api.IHashable;
+import edu.uci.ics.hyracks.data.std.api.INumeric;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+
+public final class KmerPointable extends AbstractPointable implements IHashable, IComparable, INumeric {
+ public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isFixedLength() {
+ return false;
+ }
+
+ @Override
+ public int getFixedLength() {
+ return -1;
+ }
+ };
+
+ public static final IPointableFactory FACTORY = new IPointableFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IPointable createPointable() {
+ return new KmerPointable();
+ }
+
+ @Override
+ public ITypeTraits getTypeTraits() {
+ return TYPE_TRAITS;
+ }
+ };
+
+ public static short getShortReverse(byte[] bytes, int offset, int length) {
+ if (length < 2) {
+ return (short) (bytes[offset] & 0xff);
+ }
+ return (short) (((bytes[offset + length - 1] & 0xff) << 8) + (bytes[offset + length - 2] & 0xff));
+ }
+
+ public static int getIntReverse(byte[] bytes, int offset, int length) {
+ int shortValue = getShortReverse(bytes, offset, length) & 0xffff;
+
+ if (length < 3) {
+ return shortValue;
+ }
+ if (length == 3) {
+ return (((bytes[offset + 2] & 0xff) << 16) + ((bytes[offset + 1] & 0xff) << 8) + ((bytes[offset] & 0xff)));
+ }
+ return ((bytes[offset + length - 1] & 0xff) << 24) + ((bytes[offset + length - 2] & 0xff) << 16)
+ + ((bytes[offset + length - 3] & 0xff) << 8) + ((bytes[offset + length - 4] & 0xff) << 0);
+ }
+
+ public static long getLongReverse(byte[] bytes, int offset, int length) {
+ if (length < 8) {
+ return ((long) getIntReverse(bytes, offset, length)) & 0x0ffffffffL;
+ }
+ return (((long) (bytes[offset + length - 1] & 0xff)) << 56)
+ + (((long) (bytes[offset + length - 2] & 0xff)) << 48)
+ + (((long) (bytes[offset + length - 3] & 0xff)) << 40)
+ + (((long) (bytes[offset + length - 4] & 0xff)) << 32)
+ + (((long) (bytes[offset + length - 5] & 0xff)) << 24)
+ + (((long) (bytes[offset + length - 6] & 0xff)) << 16)
+ + (((long) (bytes[offset + length - 7] & 0xff)) << 8) + (((long) (bytes[offset + length - 8] & 0xff)));
+ }
+
+ @Override
+ public int compareTo(IPointable pointer) {
+ return compareTo(pointer.getByteArray(), pointer.getStartOffset(), pointer.getLength());
+ }
+
+ @Override
+ public int compareTo(byte[] bytes, int offset, int length) {
+
+ if (this.length != length) {
+ return this.length - length;
+ }
+ for (int i = length - 1; i >= 0; i--) {
+ int cmp = (this.bytes[this.start + i] & 0xff) - (bytes[offset + i] & 0xff);
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ public int hash() {
+ int hash = KmerHashPartitioncomputerFactory.hashBytes(bytes, start, length);
+ return hash;
+ }
+
+ @Override
+ public byte byteValue() {
+ return bytes[start + length - 1];
+ }
+
+ @Override
+ public short shortValue() {
+ return getShortReverse(bytes, start, length);
+ }
+
+ @Override
+ public int intValue() {
+ return getIntReverse(bytes, start, length);
+ }
+
+ @Override
+ public long longValue() {
+ return getLongReverse(bytes, start, length);
+ }
+
+ @Override
+ public float floatValue() {
+ return Float.intBitsToFloat(intValue());
+ }
+
+ @Override
+ public double doubleValue() {
+ return Double.longBitsToDouble(longValue());
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
new file mode 100644
index 0000000..60c0682
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import edu.uci.ics.genomix.velvet.oldtype.NodeWritable;
+
+public class NodeReference extends NodeWritable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public NodeReference(int kmerSize) {
+ super(kmerSize);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
new file mode 100644
index 0000000..47a3047
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import edu.uci.ics.genomix.oldtype.PositionListWritable;
+import edu.uci.ics.hyracks.data.std.api.IValueReference;
+
+public class PositionListReference extends PositionListWritable implements IValueReference {
+
+ public PositionListReference(int countByDataLength, byte[] byteArray, int startOffset) {
+ super(countByDataLength, byteArray, startOffset);
+ }
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
new file mode 100644
index 0000000..f066dc7
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.hyracks.data.std.api.IValueReference;
+
+public class PositionReference extends PositionWritable implements IValueReference {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
new file mode 100644
index 0000000..de56b83
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+
+@SuppressWarnings("deprecation")
+public class GenomixJobConf extends JobConf {
+
+ public static final String JOB_NAME = "genomix";
+
+ /** Kmers length */
+ public static final String KMER_LENGTH = "genomix.kmerlen";
+ /** Read length */
+ public static final String READ_LENGTH = "genomix.readlen";
+ /** Frame Size */
+ public static final String FRAME_SIZE = "genomix.framesize";
+ /** Frame Limit, hyracks need */
+ public static final String FRAME_LIMIT = "genomix.framelimit";
+ /** Table Size, hyracks need */
+ public static final String TABLE_SIZE = "genomix.tablesize";
+ /** Groupby types */
+ public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
+ /** Graph outputformat */
+ public static final String OUTPUT_FORMAT = "genomix.graph.output";
+ /** Get reversed Kmer Sequence */
+ public static final String REVERSED_KMER = "genomix.kmer.reversed";
+
+ /** Configurations used by hybrid groupby function in graph build phrase */
+ public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
+ public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
+ public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
+ public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
+ public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
+
+ public static final int DEFAULT_KMERLEN = 21;
+ public static final int DEFAULT_READLEN = 124;
+ public static final int DEFAULT_FRAME_SIZE = 128 * 1024;
+ public static final int DEFAULT_FRAME_LIMIT = 4096;
+ public static final int DEFAULT_TABLE_SIZE = 10485767;
+ public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
+ public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
+ public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
+ public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
+ public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
+
+ public static final boolean DEFAULT_REVERSED = true;
+
+ public static final String JOB_PLAN_GRAPHBUILD = "graphbuild";
+ public static final String JOB_PLAN_GRAPHSTAT = "graphstat";
+
+ public static final String GROUPBY_TYPE_HYBRID = "hybrid";
+ public static final String GROUPBY_TYPE_EXTERNAL = "external";
+ public static final String GROUPBY_TYPE_PRECLUSTER = "precluster";
+ public static final String OUTPUT_FORMAT_BINARY = "binary";
+ public static final String OUTPUT_FORMAT_TEXT = "text";
+
+ public GenomixJobConf() throws IOException {
+ super(new Configuration());
+ }
+
+ public GenomixJobConf(Configuration conf) throws IOException {
+ super(conf);
+ }
+
+ /**
+ * Set the kmer length
+ *
+ * @param the
+ * desired frame kmerByteSize
+ */
+ final public void setKmerLength(int kmerlength) {
+ setInt(KMER_LENGTH, kmerlength);
+ }
+
+ final public void setFrameSize(int frameSize) {
+ setInt(FRAME_SIZE, frameSize);
+ }
+
+ final public void setFrameLimit(int frameLimit) {
+ setInt(FRAME_LIMIT, frameLimit);
+ }
+
+ final public void setTableSize(int tableSize) {
+ setInt(TABLE_SIZE, tableSize);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
new file mode 100644
index 0000000..c8cb701
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+
+public abstract class JobGen implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ protected final ConfFactory confFactory;
+ protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+
+ public JobGen(GenomixJobConf job) throws HyracksDataException {
+ this.confFactory = new ConfFactory(job);
+ }
+
+ public abstract JobSpecification generateJob() throws HyracksException;
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
new file mode 100644
index 0000000..7571653
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -0,0 +1,384 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
+import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateReadIDAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeReadIDAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerTextWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeTextWriterFactory;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+@SuppressWarnings("deprecation")
+public class JobGenBrujinGraph extends JobGen {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public enum GroupbyType {
+ EXTERNAL,
+ PRECLUSTER,
+ HYBRIDHASH,
+ }
+
+ public enum OutputFormat {
+ TEXT,
+ BINARY,
+ }
+
+ protected ConfFactory hadoopJobConfFactory;
+ protected static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+ protected String[] ncNodeNames;
+ protected String[] readSchedule;
+
+ protected int readLength;
+ protected int kmerSize;
+ protected int frameLimits;
+ protected int frameSize;
+ protected int tableSize;
+ protected GroupbyType groupbyType;
+ protected OutputFormat outputFormat;
+ protected boolean bGenerateReversedKmer;
+
+ protected void logDebug(String status) {
+ LOG.debug(status + " nc nodes:" + ncNodeNames.length);
+ }
+
+ public JobGenBrujinGraph(GenomixJobConf job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) throws HyracksDataException {
+ super(job);
+ String[] nodes = new String[ncMap.size()];
+ ncMap.keySet().toArray(nodes);
+ ncNodeNames = new String[nodes.length * numPartitionPerMachine];
+ for (int i = 0; i < numPartitionPerMachine; i++) {
+ System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
+ }
+ initJobConfiguration(scheduler);
+ }
+
+ private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+ IAggregatorDescriptorFactory aggeragater, IAggregatorDescriptorFactory merger,
+ ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+ IPointableFactory pointable, RecordDescriptor outRed) {
+ return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, normalizer,
+ aggeragater, merger, outRed, new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(pointable) }),
+ tableSize), true);
+ }
+
+ private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec, int[] keyFields,
+ IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
+ ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+ IPointableFactory pointable, RecordDescriptor combineRed, RecordDescriptor finalRec)
+ throws HyracksDataException {
+
+ Object[] obj = new Object[3];
+
+ switch (groupbyType) {
+ case EXTERNAL:
+ obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
+ combineRed);
+ obj[1] = new MToNPartitioningConnectorDescriptor(jobSpec, partition);
+ obj[2] = newExternalGroupby(jobSpec, keyFields, merger, merger, partition, normalizer, pointable,
+ finalRec);
+ break;
+ case PRECLUSTER:
+ default:
+
+ obj[0] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, aggregator,
+ combineRed);
+ obj[1] = new MToNPartitioningMergingConnectorDescriptor(jobSpec, partition, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) });
+ obj[2] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, merger,
+ finalRec);
+ jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ break;
+ }
+ return obj;
+ }
+
+ public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+ try {
+ InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
+ .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
+
+ return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
+ hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(readLength,
+ kmerSize, bGenerateReversedKmer));
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public static void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
+ IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
+ jobSpec.connect(conn, preOp, 0, nextOp, 0);
+ }
+
+ public AbstractOperatorDescriptor generateGroupbyKmerJob(JobSpecification jobSpec,
+ AbstractOperatorDescriptor readOperator) throws HyracksDataException {
+ int[] keyFields = new int[] { 0 }; // the id of grouped key
+
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+ ReadsKeyValueParserFactory.readKmerOutputRec);
+ connectOperators(jobSpec, readOperator, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
+ jobSpec));
+
+ RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+ jobSpec.setFrameSize(frameSize);
+
+ Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateKmerAggregateFactory(),
+ new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
+ new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
+ AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
+ logDebug("LocalKmerGroupby Operator");
+ connectOperators(jobSpec, sorter, ncNodeNames, kmerLocalAggregator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+
+ logDebug("CrossKmerGroupby Operator");
+ IConnectorDescriptor kmerConnPartition = (IConnectorDescriptor) objs[1];
+ AbstractOperatorDescriptor kmerCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+ connectOperators(jobSpec, kmerLocalAggregator, ncNodeNames, kmerCrossAggregator, ncNodeNames, kmerConnPartition);
+ return kmerCrossAggregator;
+ }
+
+ public AbstractOperatorDescriptor generateMapperFromKmerToRead(JobSpecification jobSpec,
+ AbstractOperatorDescriptor kmerCrossAggregator) {
+ // Map (Kmer, {(ReadID,PosInRead),...}) into
+ // (ReadID,PosInRead,{OtherPosition,...},Kmer)
+
+ AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec,
+ MapKmerPositionToReadOperator.readIDOutputRec, readLength, kmerSize);
+ connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapKmerToRead, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ return mapKmerToRead;
+ }
+
+ public AbstractOperatorDescriptor generateGroupbyReadJob(JobSpecification jobSpec,
+ AbstractOperatorDescriptor mapKmerToRead) throws HyracksDataException {
+ int[] keyFields = new int[] { 0 }; // the id of grouped key
+ // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+ MapKmerPositionToReadOperator.readIDOutputRec);
+ connectOperators(jobSpec, mapKmerToRead, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
+ jobSpec));
+
+ RecordDescriptor readIDFinalRec = new RecordDescriptor(
+ new ISerializerDeserializer[1 + 2 * MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
+ Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateReadIDAggregateFactory(),
+ new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(), null,
+ IntegerPointable.FACTORY, AggregateReadIDAggregateFactory.readIDAggregateRec, readIDFinalRec);
+ AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
+ connectOperators(jobSpec, sorter, ncNodeNames, readLocalAggregator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+
+ logDebug("Group by ReadID merger");
+ IConnectorDescriptor readconn = (IConnectorDescriptor) objs[1];
+ AbstractOperatorDescriptor readCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+ connectOperators(jobSpec, readLocalAggregator, ncNodeNames, readCrossAggregator, ncNodeNames, readconn);
+ return readCrossAggregator;
+ }
+
+ public AbstractOperatorDescriptor generateMapperFromReadToNode(JobSpecification jobSpec,
+ AbstractOperatorDescriptor readCrossAggregator) {
+ // Map (ReadID, [(Poslist,Kmer) ... ]) to (Node, IncomingList,
+ // OutgoingList, Kmer)
+
+ AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec,
+ MapReadToNodeOperator.nodeOutputRec, kmerSize, true);
+ connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ return mapEachReadToNode;
+ }
+
+ public AbstractOperatorDescriptor generateKmerWritorOperator(JobSpecification jobSpec,
+ AbstractOperatorDescriptor kmerCrossAggregator) throws HyracksException {
+ // Output Kmer
+ ITupleWriterFactory kmerWriter = null;
+ switch (outputFormat) {
+ case TEXT:
+ kmerWriter = new KMerTextWriterFactory(kmerSize);
+ break;
+ case BINARY:
+ default:
+ kmerWriter = new KMerSequenceWriterFactory(hadoopJobConfFactory.getConf());
+ break;
+ }
+ logDebug("WriteOperator");
+ HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+ hadoopJobConfFactory.getConf(), kmerWriter);
+ connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ return writeKmerOperator;
+ }
+
+ public AbstractOperatorDescriptor generateNodeWriterOpertator(JobSpecification jobSpec,
+ AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
+ ITupleWriterFactory nodeWriter = null;
+ switch (outputFormat) {
+ case TEXT:
+ nodeWriter = new NodeTextWriterFactory(kmerSize);
+ break;
+ case BINARY:
+ default:
+ nodeWriter = new NodeSequenceWriterFactory(hadoopJobConfFactory.getConf());
+ break;
+ }
+ logDebug("WriteOperator");
+ // Output Node
+ HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+ hadoopJobConfFactory.getConf(), nodeWriter);
+ connectOperators(jobSpec, mapEachReadToNode, ncNodeNames, writeNodeOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ return writeNodeOperator;
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ logDebug("ReadKmer Operator");
+
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+ logDebug("Group by Kmer");
+ AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+ // logDebug("Write kmer to result");
+ // generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+
+ logDebug("Map Kmer to Read Operator");
+ lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
+
+ logDebug("Group by Read Operator");
+ lastOperator = generateGroupbyReadJob(jobSpec, lastOperator);
+
+ logDebug("Generate final node");
+ lastOperator = generateMapperFromReadToNode(jobSpec, lastOperator);
+ logDebug("Write node to result");
+ lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);
+
+ jobSpec.addRoot(lastOperator);
+ return jobSpec;
+ }
+
+ protected void initJobConfiguration(Scheduler scheduler) throws HyracksDataException {
+ Configuration conf = confFactory.getConf();
+ readLength = conf.getInt(GenomixJobConf.READ_LENGTH, GenomixJobConf.DEFAULT_READLEN);
+ kmerSize = conf.getInt(GenomixJobConf.KMER_LENGTH, GenomixJobConf.DEFAULT_KMERLEN);
+ if (kmerSize % 2 == 0) {
+ kmerSize--;
+ conf.setInt(GenomixJobConf.KMER_LENGTH, kmerSize);
+ }
+ frameLimits = conf.getInt(GenomixJobConf.FRAME_LIMIT, GenomixJobConf.DEFAULT_FRAME_LIMIT);
+ tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
+ frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, GenomixJobConf.DEFAULT_FRAME_SIZE);
+
+ bGenerateReversedKmer = conf.getBoolean(GenomixJobConf.REVERSED_KMER, GenomixJobConf.DEFAULT_REVERSED);
+
+ String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+ if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_EXTERNAL)) {
+ groupbyType = GroupbyType.EXTERNAL;
+ } else if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_PRECLUSTER)) {
+ groupbyType = GroupbyType.PRECLUSTER;
+ } else {
+ groupbyType = GroupbyType.HYBRIDHASH;
+ }
+
+ String output = conf.get(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
+ if (output.equalsIgnoreCase("text")) {
+ outputFormat = OutputFormat.TEXT;
+ } else {
+ outputFormat = OutputFormat.BINARY;
+ }
+ try {
+ hadoopJobConfFactory = new ConfFactory(new JobConf(conf));
+ InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
+ .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
+ readSchedule = scheduler.getLocationConstraints(splits);
+ } catch (IOException ex) {
+ throw new HyracksDataException(ex);
+ }
+
+ LOG.info("Genomix Graph Build Configuration");
+ LOG.info("Kmer:" + kmerSize);
+ LOG.info("Groupby type:" + type);
+ LOG.info("Output format:" + output);
+ LOG.info("Frame limit" + frameLimits);
+ LOG.info("Frame kmerByteSize" + frameSize);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
new file mode 100644
index 0000000..b4b1e73
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenCheckReader extends JobGenBrujinGraph {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public JobGenCheckReader(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) throws HyracksDataException {
+ super(job, scheduler, ncMap, numPartitionPerMachine);
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ logDebug("ReadKmer Operator");
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+ logDebug("Write kmer to result");
+ generateRootByWriteKmerReader(jobSpec, readOperator);
+
+ return jobSpec;
+ }
+
+ public AbstractSingleActivityOperatorDescriptor generateRootByWriteKmerReader(JobSpecification jobSpec,
+ HDFSReadOperatorDescriptor readOperator) throws HyracksException {
+ // Output Kmer
+ HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+ hadoopJobConfFactory.getConf(), new ITupleWriterFactory() {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new ITupleWriter() {
+
+ private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+ private PositionWritable pos = new PositionWritable();
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+ }
+
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ try {
+ if (kmer.getLength() > tuple
+ .getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
+ throw new IllegalArgumentException("Not enough kmer bytes");
+ }
+ kmer.setNewReference(
+ tuple.getFieldData(ReadsKeyValueParserFactory.OutputKmerField),
+ tuple.getFieldStart(ReadsKeyValueParserFactory.OutputKmerField));
+ pos.setNewReference(tuple.getFieldData(ReadsKeyValueParserFactory.OutputPosition),
+ tuple.getFieldStart(ReadsKeyValueParserFactory.OutputPosition));
+
+ output.write(kmer.toString().getBytes());
+ output.writeByte('\t');
+ output.write(pos.toString().getBytes());
+ output.writeByte('\n');
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+
+ }
+
+ };
+ }
+
+ });
+ connectOperators(jobSpec, readOperator, ncNodeNames, writeKmerOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ jobSpec.addRoot(writeKmerOperator);
+ return writeKmerOperator;
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java
new file mode 100644
index 0000000..5202ba2
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenCreateKmerInfo extends JobGenBrujinGraph {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public JobGenCreateKmerInfo(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) throws HyracksDataException {
+ super(job, scheduler, ncMap, numPartitionPerMachine);
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ logDebug("ReadKmer Operator");
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+ logDebug("Group by Kmer");
+ AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+ logDebug("Write kmer to result");
+ lastOperator = generateKmerWritorOperator(jobSpec, lastOperator);
+ jobSpec.addRoot(lastOperator);
+
+ return jobSpec;
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
new file mode 100644
index 0000000..1e78b79
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenGroupbyReadID extends JobGenBrujinGraph {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public JobGenGroupbyReadID(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) throws HyracksDataException {
+ super(job, scheduler, ncMap, numPartitionPerMachine);
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ logDebug("ReadKmer Operator");
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+ logDebug("Group by Kmer");
+ AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+ //logDebug("Write kmer to result");
+ //generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+
+ logDebug("Map Kmer to Read Operator");
+ lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
+
+ logDebug("Group by Read Operator");
+ lastOperator = generateGroupbyReadJob(jobSpec, lastOperator);
+
+ logDebug("Write node to result");
+ lastOperator = generateRootByWriteReadIDAggregationResult(jobSpec, lastOperator);
+ jobSpec.addRoot(lastOperator);
+ return jobSpec;
+ }
+
+ public AbstractOperatorDescriptor generateRootByWriteReadIDAggregationResult(JobSpecification jobSpec,
+ AbstractOperatorDescriptor readCrossAggregator) throws HyracksException {
+ HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+ hadoopJobConfFactory.getConf(), new ITupleWriterFactory() {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new ITupleWriter() {
+
+ private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+ private PositionListWritable plist = new PositionListWritable();
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ int readId = Marshal.getInt(tuple.getFieldData(0), tuple.getFieldStart(0));
+ try {
+ output.write((Integer.toString(readId) + "\t").getBytes());
+ for (int i = 1; i < tuple.getFieldCount(); i++) {
+ int fieldOffset = tuple.getFieldStart(i);
+ while (fieldOffset < tuple.getFieldStart(i) + tuple.getFieldLength(i)) {
+ byte[] buffer = tuple.getFieldData(i);
+ // read poslist
+ int posCount = PositionListWritable.getCountByDataLength(Marshal.getInt(
+ buffer, fieldOffset));
+ fieldOffset += 4;
+ plist.setNewReference(posCount, buffer, fieldOffset);
+ fieldOffset += plist.getLength();
+
+ int posInRead = (i + 1) / 2;
+ if (i % 2 == 0) {
+ posInRead = -posInRead;
+ }
+ String kmerString = "";
+ if (posInRead > 0) {
+ int kmerbytes = Marshal.getInt(buffer, fieldOffset);
+ if (kmer.getLength() != kmerbytes) {
+ throw new IllegalArgumentException("kmerlength is invalid");
+ }
+ fieldOffset += 4;
+ kmer.setNewReference(buffer, fieldOffset);
+ fieldOffset += kmer.getLength();
+ kmerString = kmer.toString();
+ }
+
+ output.write(Integer.toString(posInRead).getBytes());
+ output.writeByte('\t');
+ output.write(plist.toString().getBytes());
+ output.writeByte('\t');
+ output.write(kmerString.getBytes());
+ output.writeByte('\t');
+ }
+ }
+ output.writeByte('\n');
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+
+ }
+
+ };
+ }
+
+ });
+ connectOperators(jobSpec, readCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+
+ return writeKmerOperator;
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
new file mode 100644
index 0000000..8e727959
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenMapKmerToRead extends JobGenBrujinGraph {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public JobGenMapKmerToRead(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) throws HyracksDataException {
+ super(job, scheduler, ncMap, numPartitionPerMachine);
+ }
+
+ public AbstractOperatorDescriptor generateRootByWriteMapperFromKmerToReadID(JobSpecification jobSpec,
+ AbstractOperatorDescriptor mapper) throws HyracksException {
+ // Output Kmer
+ HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+ hadoopJobConfFactory.getConf(), new ITupleWriterFactory() {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new ITupleWriter() {
+
+ private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+ private PositionListWritable plist = new PositionListWritable();
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ try {
+ int readID = Marshal.getInt(
+ tuple.getFieldData(MapKmerPositionToReadOperator.OutputReadIDField),
+ tuple.getFieldStart(MapKmerPositionToReadOperator.OutputReadIDField));
+ byte posInRead = tuple
+ .getFieldData(MapKmerPositionToReadOperator.OutputPosInReadField)[tuple
+ .getFieldStart(MapKmerPositionToReadOperator.OutputPosInReadField)];
+ int posCount = PositionListWritable.getCountByDataLength(tuple
+ .getFieldLength(MapKmerPositionToReadOperator.OutputOtherReadIDListField));
+ plist.setNewReference(
+ posCount,
+ tuple.getFieldData(MapKmerPositionToReadOperator.OutputOtherReadIDListField),
+ tuple.getFieldStart(MapKmerPositionToReadOperator.OutputOtherReadIDListField));
+
+ String kmerString = "";
+ if (posInRead > 0) {
+ if (kmer.getLength() > tuple
+ .getFieldLength(MapKmerPositionToReadOperator.OutputKmerField)) {
+ throw new IllegalArgumentException("Not enough kmer bytes");
+ }
+ kmer.setNewReference(
+ tuple.getFieldData(MapKmerPositionToReadOperator.OutputKmerField),
+ tuple.getFieldStart(MapKmerPositionToReadOperator.OutputKmerField));
+ kmerString = kmer.toString();
+ }
+
+ output.write(Integer.toString(readID).getBytes());
+ output.writeByte('\t');
+ output.write(Integer.toString(posInRead).getBytes());
+ output.writeByte('\t');
+ output.write(plist.toString().getBytes());
+ output.writeByte('\t');
+ output.write(kmerString.getBytes());
+ output.writeByte('\n');
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+
+ }
+
+ };
+ }
+
+ });
+ connectOperators(jobSpec, mapper, ncNodeNames, writeKmerOperator, ncNodeNames, new OneToOneConnectorDescriptor(
+ jobSpec));
+ jobSpec.addRoot(writeKmerOperator);
+ return writeKmerOperator;
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ logDebug("ReadKmer Operator");
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+ logDebug("Group by Kmer");
+ AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+ logDebug("Map Kmer to Read Operator");
+ lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
+
+ generateRootByWriteMapperFromKmerToReadID(jobSpec, lastOperator);
+
+ return jobSpec;
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenUnMerged.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenUnMerged.java
new file mode 100644
index 0000000..21b6385
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenUnMerged.java
@@ -0,0 +1,34 @@
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.Map;
+
+import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenUnMerged extends JobGenBrujinGraph {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public JobGenUnMerged(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) throws HyracksDataException {
+ super(job, scheduler, ncMap, numPartitionPerMachine);
+ }
+
+ @Override
+ public AbstractOperatorDescriptor generateMapperFromReadToNode(JobSpecification jobSpec,
+ AbstractOperatorDescriptor readCrossAggregator) {
+ AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec,
+ MapReadToNodeOperator.nodeOutputRec, kmerSize, false);
+ connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ return mapEachReadToNode;
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/IncomingListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/IncomingListWritable.java
index 2dad500..8dec857 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/IncomingListWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/IncomingListWritable.java
@@ -6,7 +6,7 @@
import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
public class IncomingListWritable implements WritableComparable<IncomingListWritable>{
private PositionListWritable reverseForwardList;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
index 6b32b51..c42bf32 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
@@ -6,7 +6,7 @@
import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.genomix.pregelix.type.CheckMessage;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.type.KmerBytesWritable;
@@ -59,7 +59,8 @@
checkMessage = 0;
if (sourceVertexId != null) {
checkMessage |= CheckMessage.SOURCE;
- this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+
+ this.sourceVertexId.set((byte)0, sourceVertexId.getReadId(),sourceVertexId.getPosId());
}
if (chainVertexId != null) {
checkMessage |= CheckMessage.ACUTUALKMER;
@@ -91,7 +92,7 @@
public void setSourceVertexId(PositionWritable sourceVertexId) {
if (sourceVertexId != null) {
checkMessage |= CheckMessage.SOURCE;
- this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+ this.sourceVertexId.set((byte)0, sourceVertexId.getReadId(),sourceVertexId.getPosId());
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/OutgoingListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/OutgoingListWritable.java
index af14f00..275954d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/OutgoingListWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/OutgoingListWritable.java
@@ -6,7 +6,7 @@
import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
public class OutgoingListWritable implements WritableComparable<OutgoingListWritable>{
private PositionListWritable forwardForwardList;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index 039f5b4..50b1518 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -4,7 +4,7 @@
import java.util.Iterator;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;