Fix BubbleAddVertex and pass test
diff --git a/genomix/genomix-hadoop/data/webmap/MergeBubble.txt b/genomix/genomix-hadoop/data/webmap/MergeBubble.txt
new file mode 100644
index 0000000..087f43e
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/MergeBubble.txt
@@ -0,0 +1,2 @@
+1 AATAGAA
+2 AATACAA
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
index c35ad7f..a15ceeb 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
@@ -6,7 +6,6 @@
import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
import edu.uci.ics.genomix.type.KmerListWritable;
public class AdjacencyListWritable implements WritableComparable<AdjacencyListWritable>{
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
index ebb4f74..f5aa3a1 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
@@ -4,11 +4,10 @@
import org.apache.hadoop.io.NullWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
@@ -47,7 +46,7 @@
* Remove tip or single node when l > constant
*/
public class BubbleAddVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "BubbleAddVertex.kmerSize";
public static int kmerSize = -1;
@@ -64,40 +63,42 @@
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if(getSuperstep() == 1){
- if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 2){
- getVertexValue().getFFList().append(2, (byte)1);
+ if(getVertexId().toString().equals("ATA")){
+ KmerBytesWritable vertexId = new KmerBytesWritable(kmerSize);
+ vertexId.setByRead("GTA".getBytes(), 0);
+ getVertexValue().getFRList().append(vertexId);
- //add tip vertex
+ //add bridge vertex
@SuppressWarnings("rawtypes")
Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
vertex.getMsgList().clear();
vertex.getEdges().clear();
- PositionWritable vertexId = new PositionWritable();
- VertexValueWritable vertexValue = new VertexValueWritable();
+ VertexValueWritable vertexValue = new VertexValueWritable(kmerSize);
/**
* set the src vertex id
*/
- vertexId.set(2, (byte)1);
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
- byte[] array = { 'T', 'A', 'G', 'C', 'C', 'A', 'G'}; //TAGCCAG
- KmerBytesWritable kmer = new KmerBytesWritable(array.length);
- kmer.setByRead(array, 0);
- vertexValue.setKmer(kmer);
- PositionListWritable plist = new PositionListWritable();
- plist.append(new PositionWritable(1, (byte)2));
- vertexValue.setRRList(plist);
- PositionListWritable plist2 = new PositionListWritable();
- plist2.append(new PositionWritable(1, (byte)4));
- vertexValue.setFFList(plist2);
+ KmerListWritable kmerFRList = new KmerListWritable(kmerSize);
+ kmerFRList.append(getVertexId());
+ vertexValue.setFRList(kmerFRList);
+ KmerBytesWritable otherVertexId = new KmerBytesWritable(kmerSize);
+ otherVertexId.setByRead("AGA".getBytes(), 0);
+ KmerListWritable kmerRFList = new KmerListWritable(kmerSize);
+ kmerRFList.append(otherVertexId);
+ vertexValue.setRFList(kmerRFList);
+ vertexValue.setKmer(vertexId);
vertex.setVertexValue(vertexValue);
addVertex(vertexId, vertex);
+ }
+ else if(getVertexId().toString().equals("AGA")){
+ KmerBytesWritable brdgeVertexId = new KmerBytesWritable(kmerSize);
+ brdgeVertexId.setByRead("GTA".getBytes(), 0);
+ getVertexValue().getRFList().append(brdgeVertexId);
}
- if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 4)
- getVertexValue().getRRList().append(2, (byte)1);
}
voteToHalt();
}
@@ -111,7 +112,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index b782294..d630b5f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -7,6 +7,7 @@
import org.apache.hadoop.io.NullWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -51,7 +52,7 @@
* Naive Algorithm for path merge graph
*/
public class BubbleMergeVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MergeBubbleMessageWritable> {
+ Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MergeBubbleMessageWritable> {
public static final String KMER_SIZE = "BubbleMergeVertex.kmerSize";
public static final String ITERATIONS = "BubbleMergeVertex.iteration";
public static int kmerSize = -1;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
deleted file mode 100644
index fa5ae19..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package edu.uci.ics.genomix.pregelix.sequencefile;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-
-import edu.uci.ics.genomix.oldtype.NodeWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
-
-
-public class ConvertNodeToIdValue {
-
- public static void convert(Path inFile, Path outFile)
- throws IOException {
- Configuration conf = new Configuration();
- FileSystem fileSys = FileSystem.get(conf);
-
- SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
- SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, outFile, PositionWritable.class,
- VertexValueWritable.class, CompressionType.NONE);
- NodeWritable node = new NodeWritable();
- NullWritable value = NullWritable.get();
- PositionWritable outputKey = new PositionWritable();
- VertexValueWritable outputValue = new VertexValueWritable();
-
- while(reader.next(node, value)) {
- System.out.println(node.getNodeID().toString());
- outputKey.set(node.getNodeID());
- outputValue.setFFList(node.getFFList());
- outputValue.setFRList(node.getFRList());
- outputValue.setRFList(node.getRFList());
- outputValue.setRRList(node.getRRList());
- outputValue.setKmer(node.getKmer());
- outputValue.setState(State.IS_HEAD);
- writer.append(outputKey, outputValue);
- }
- writer.close();
- reader.close();
- }
-
- public static void main(String[] args) throws IOException {
- Path dir = new Path("data/test");
- Path outDir = new Path("data/input");
- FileUtils.cleanDirectory(new File("data/input"));
- Path inFile = new Path(dir, "result.graphbuild.txt.bin");
- Path outFile = new Path(outDir, "out");
- convert(inFile,outFile);
- }
-}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index fbe47ce..ff5f404 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -195,23 +195,23 @@
generateBridgeRemoveGraphJob("BridgeRemoveGraph", outputBase
+ "BridgeRemoveGraph.xml");
}
-//
-// private static void generateBubbleAddGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(BubbleAddVertex.class);
-// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(BubbleAddVertex.KMER_SIZE, 3);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genBubbleAddGraph() throws IOException {
-// generateBubbleAddGraphJob("BubbleAddGraph", outputBase
-// + "BubbleAddGraph.xml");
-// }
+
+ private static void generateBubbleAddGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(BubbleAddVertex.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(BubbleAddVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genBubbleAddGraph() throws IOException {
+ generateBubbleAddGraphJob("BubbleAddGraph", outputBase
+ + "BubbleAddGraph.xml");
+ }
//
// private static void generateBubbleMergeGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
@@ -247,6 +247,7 @@
genBridgeAddGraph();
genTipRemoveGraph();
genBridgeRemoveGraph();
+ genBubbleAddGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleAddSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleAddSmallTestSuite.java
new file mode 100644
index 0000000..a9c2774
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleAddSmallTestSuite.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.pregelix.JobRun;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import junit.framework.Test;
+import junit.framework.TestResult;
+import junit.framework.TestSuite;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+
+@SuppressWarnings("deprecation")
+public class BubbleAddSmallTestSuite extends TestSuite {
+ private static final Logger LOGGER = Logger.getLogger(BubbleAddSmallTestSuite.class.getName());
+
+ public static final String PreFix = "data/PathMergeTestSet";
+ public static final String[] TestDir = { PreFix + File.separator
+ + "5"};
+ private static final String ACTUAL_RESULT_DIR = "data/actual/bubbleadd";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+ private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+ private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
+ private static final String PATH_TO_ONLY = "src/test/resources/only_bubbleadd.txt";
+
+ public static final String HDFS_INPUTPATH = "/PathTestSet";
+
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
+
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
+
+ public void setUp() throws Exception {
+ ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+ ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+ cleanupStores();
+ PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
+ LOGGER.info("Hyracks mini-cluster started");
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
+ }
+
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
+
+ for (String testDir : TestDir) {
+ File src = new File(testDir);
+ Path dest = new Path(HDFS_INPUTPATH + File.separator + src.getName());
+ dfs.mkdirs(dest);
+ //src.listFiles()
+ //src.listFiles((FilenameFilter)(new WildcardFileFilter("part*")))
+ for (File f : src.listFiles()) {
+ dfs.copyFromLocalFile(new Path(f.getAbsolutePath()), dest);
+ }
+ }
+
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
+
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ /**
+ * cleanup hdfs cluster
+ */
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+ public void tearDown() throws Exception {
+ PregelixHyracksIntegrationUtil.deinit();
+ LOGGER.info("Hyracks mini-cluster shut down");
+ cleanupHDFS();
+ }
+
+ public static Test suite() throws Exception {
+ List<String> onlys = getFileList(PATH_TO_ONLY);
+ File testData = new File(PATH_TO_JOBS);
+ File[] queries = testData.listFiles();
+ BubbleAddSmallTestSuite testSuite = new BubbleAddSmallTestSuite();
+ testSuite.setUp();
+ boolean onlyEnabled = false;
+ FileSystem dfs = FileSystem.get(testSuite.conf);
+
+ if (onlys.size() > 0) {
+ onlyEnabled = true;
+ }
+
+ for (File qFile : queries) {
+ if (qFile.isFile()) {
+ if (onlyEnabled && !isInList(onlys, qFile.getName())) {
+ continue;
+ } else {
+ for (String testPathStr : TestDir) {
+ File testDir = new File(testPathStr);
+ String resultFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "bin" + File.separator + testDir.getName();
+ String textFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "txt" + File.separator + testDir.getName();
+ testSuite.addTest(new BasicSmallTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile
+ .getAbsolutePath().toString(), dfs,
+ HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName));
+ }
+ }
+ }
+ }
+ return testSuite;
+ }
+
+ /**
+ * Runs the tests and collects their result in a TestResult.
+ */
+ @Override
+ public void run(TestResult result) {
+ try {
+ int testCount = countTestCases();
+ for (int i = 0; i < testCount; i++) {
+ // cleanupStores();
+ Test each = this.testAt(i);
+ if (result.shouldStop())
+ break;
+ runTest(each, result);
+ }
+ tearDown();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ protected static List<String> getFileList(String ignorePath) throws FileNotFoundException, IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+ String s = null;
+ List<String> ignores = new ArrayList<String>();
+ while ((s = reader.readLine()) != null) {
+ ignores.add(s);
+ }
+ reader.close();
+ return ignores;
+ }
+
+ private static String jobExtToResExt(String fname) {
+ int dot = fname.lastIndexOf('.');
+ return fname.substring(0, dot);
+ }
+
+ private static boolean isInList(List<String> onlys, String name) {
+ for (String only : onlys)
+ if (name.indexOf(only) >= 0)
+ return true;
+ return false;
+ }
+
+}