reset maxIteration
diff --git a/genomix/genomix-pregelix/data/sequencefile/BridgePath b/genomix/genomix-pregelix/data/PathTestSet/BridgePath/BridgePath
similarity index 100%
rename from genomix/genomix-pregelix/data/sequencefile/BridgePath
rename to genomix/genomix-pregelix/data/PathTestSet/BridgePath/BridgePath
Binary files differ
diff --git a/genomix/genomix-pregelix/data/PathTestSet/CyclePath/part-0 b/genomix/genomix-pregelix/data/PathTestSet/CyclePath/part-0
new file mode 100755
index 0000000..b986370
--- /dev/null
+++ b/genomix/genomix-pregelix/data/PathTestSet/CyclePath/part-0
Binary files differ
diff --git a/genomix/genomix-pregelix/data/PathTestSet/CyclePath/part-1 b/genomix/genomix-pregelix/data/PathTestSet/CyclePath/part-1
new file mode 100755
index 0000000..729417d
--- /dev/null
+++ b/genomix/genomix-pregelix/data/PathTestSet/CyclePath/part-1
Binary files differ
diff --git a/genomix/genomix-pregelix/data/sequencefile/LongPath b/genomix/genomix-pregelix/data/PathTestSet/LongPath/LongPath
similarity index 100%
rename from genomix/genomix-pregelix/data/sequencefile/LongPath
rename to genomix/genomix-pregelix/data/PathTestSet/LongPath/LongPath
Binary files differ
diff --git a/genomix/genomix-pregelix/data/sequencefile/Path b/genomix/genomix-pregelix/data/PathTestSet/Path/Path
similarity index 100%
rename from genomix/genomix-pregelix/data/sequencefile/Path
rename to genomix/genomix-pregelix/data/PathTestSet/Path/Path
Binary files differ
diff --git a/genomix/genomix-pregelix/data/sequencefile/CyclePath b/genomix/genomix-pregelix/data/PathTestSet/RingPath/CyclePath
similarity index 100%
rename from genomix/genomix-pregelix/data/sequencefile/CyclePath
rename to genomix/genomix-pregelix/data/PathTestSet/RingPath/CyclePath
Binary files differ
diff --git a/genomix/genomix-pregelix/data/sequencefile/SimplePath b/genomix/genomix-pregelix/data/PathTestSet/SimplePath/SimplePath
similarity index 100%
rename from genomix/genomix-pregelix/data/sequencefile/SimplePath
rename to genomix/genomix-pregelix/data/PathTestSet/SimplePath/SimplePath
Binary files differ
diff --git a/genomix/genomix-pregelix/data/sequencefile/SinglePath b/genomix/genomix-pregelix/data/PathTestSet/SinglePath/SinglePath
similarity index 100%
rename from genomix/genomix-pregelix/data/sequencefile/SinglePath
rename to genomix/genomix-pregelix/data/PathTestSet/SinglePath/SinglePath
Binary files differ
diff --git a/genomix/genomix-pregelix/data/PathTestSet/ThreeKmer/part-0 b/genomix/genomix-pregelix/data/PathTestSet/ThreeKmer/part-0
new file mode 100755
index 0000000..b02d917
--- /dev/null
+++ b/genomix/genomix-pregelix/data/PathTestSet/ThreeKmer/part-0
Binary files differ
diff --git a/genomix/genomix-pregelix/data/PathTestSet/ThreeKmer/part-1 b/genomix/genomix-pregelix/data/PathTestSet/ThreeKmer/part-1
new file mode 100755
index 0000000..37e9986
--- /dev/null
+++ b/genomix/genomix-pregelix/data/PathTestSet/ThreeKmer/part-1
Binary files differ
diff --git a/genomix/genomix-pregelix/data/sequencefile/TreePath b/genomix/genomix-pregelix/data/PathTestSet/TreePath/TreePath
similarity index 100%
rename from genomix/genomix-pregelix/data/sequencefile/TreePath
rename to genomix/genomix-pregelix/data/PathTestSet/TreePath/TreePath
Binary files differ
diff --git a/genomix/genomix-pregelix/data/PathTestSet/TwoKmer/part-0 b/genomix/genomix-pregelix/data/PathTestSet/TwoKmer/part-0
new file mode 100755
index 0000000..7973858
--- /dev/null
+++ b/genomix/genomix-pregelix/data/PathTestSet/TwoKmer/part-0
Binary files differ
diff --git a/genomix/genomix-pregelix/data/PathTestSet/TwoKmer/part-1 b/genomix/genomix-pregelix/data/PathTestSet/TwoKmer/part-1
new file mode 100755
index 0000000..02d893d
--- /dev/null
+++ b/genomix/genomix-pregelix/data/PathTestSet/TwoKmer/part-1
Binary files differ
diff --git a/genomix/genomix-pregelix/graph/mergeTest/TwoKmer b/genomix/genomix-pregelix/graph/mergeTest/TwoKmer
index 86790c6..8c2a74b 100644
--- a/genomix/genomix-pregelix/graph/mergeTest/TwoKmer
+++ b/genomix/genomix-pregelix/graph/mergeTest/TwoKmer
@@ -1,2 +1 @@
-ACACT |G 1
-CACTG A| 1
+ACACTG
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
index 285976a..47db449 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
@@ -36,7 +36,7 @@
@Override
public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
InterruptedException {
- if(vertex.getVertexValue().getLengthOfMergeChain() != NaiveAlgorithmForPathMergeVertex.kmerSize)
+ //if(vertex.getVertexValue().getLengthOfMergeChain() != NaiveAlgorithmForPathMergeVertex.kmerSize)
getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/Graph.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/Graph.java
index df09a64..298864e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/Graph.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/Graph.java
@@ -50,6 +50,6 @@
public static void main(String[] args) throws Exception
{
Graph g = new Graph();
- g.start("Path");
+ g.start("result.txt.txt");
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
index 057ab08..e4317f0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
@@ -19,7 +19,7 @@
public ValueStateWritable() {
state = State.NON_VERTEX;
- mergeChain = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+ mergeChain = new VKmerBytesWritable(-1);
}
public ValueStateWritable(byte adjMap, byte state, VKmerBytesWritable mergeChain) {
@@ -87,7 +87,7 @@
@Override
public String toString() {
- if(mergeChain.getKmerLength() == -1)
+ if(mergeChain.getKmerLength() == -1 || mergeChain.getKmerLength() == 0)
return GeneCode.getSymbolFromBitMap(adjMap);
return GeneCode.getSymbolFromBitMap(adjMap) + "\t" +
getLengthOfMergeChain() + "\t" +
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
index d75ea48..6db02c7 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
@@ -71,7 +71,7 @@
if(kmerSize == -1)
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
}
public void findDestination(){
destVertexId.set(msg.getSourceVertexId());
@@ -171,12 +171,12 @@
//head node sends message to path node
else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
while (msgIterator.hasNext()){
- if(getSuperstep() == 3 && GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap()))
- voteToHalt();
- else{
- msg = msgIterator.next();
- sendMsgToPathVertex();
- }
+ //if(getSuperstep() == 3 && GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap()))
+ // voteToHalt();
+ //else{
+ msg = msgIterator.next();
+ sendMsgToPathVertex();
+ //}
}
}
//path node sends message back to head node
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java
index b613ec1..0ac4345 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java
@@ -67,7 +67,7 @@
if(kmerSize == -1)
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 20);
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
}
/**
* get destination vertex
@@ -206,7 +206,7 @@
/**
* set vertexValue's state chainVertexId, value
*/
- public void setVertexValueAttributes(){
+ public boolean setVertexValueAttributes(){
if(msg.getMessage() == Message.END){
if(getVertexValue().getState() != State.START_VERTEX)
getVertexValue().setState(State.END_VERTEX);
@@ -220,10 +220,19 @@
chainVertexId.set(getVertexValue().getMergeChain());
lastKmer.set(kmerFactory.getLastKmerFromChain(msg.getLengthOfChain() - kmerSize + 1, msg.getChainVertexId()));
chainVertexId.set(kmerFactory.mergeTwoKmer(chainVertexId, lastKmer));
- getVertexValue().setMergeChain(chainVertexId);
+ if(GraphVertexOperation.isCycle(getVertexId(), chainVertexId)){
+ getVertexValue().setMergeChain(null);
+ getVertexValue().setAdjMap(GraphVertexOperation.reverseAdjMap(getVertexValue().getAdjMap(),
+ chainVertexId.getGeneCodeAtPosition(kmerSize)));
+ getVertexValue().setState(State.CYCLE);
+ return false;
+ }
+ else
+ getVertexValue().setMergeChain(chainVertexId);
byte tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getAdjMap(), msg.getAdjMap());
getVertexValue().setAdjMap(tmpVertexValue);
+ return true;
}
/**
* send message to self
@@ -280,8 +289,10 @@
else{
if(msgIterator.hasNext()){
msg = msgIterator.next();
- mergeChainVertex(msgIterator);
- sendMsgToPathVertex(getVertexValue().getMergeChain(), getVertexValue().getAdjMap());
+ if(mergeChainVertex(msgIterator))
+ sendMsgToPathVertex(getVertexValue().getMergeChain(), getVertexValue().getAdjMap());
+ else
+ voteToHalt();
}
if(getVertexValue().getState() == State.END_VERTEX || getVertexValue().getState() == State.FINAL_DELETE){
voteToHalt();
@@ -296,7 +307,7 @@
* path response message to head
*/
public void responseMsgToHeadVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- if(msgIterator.hasNext()){
+ if(msgIterator.hasNext()){
msg = msgIterator.next();
responseMsgToHeadVertex();
}
@@ -310,8 +321,8 @@
/**
* merge chainVertex and store in vertexVal.chainVertexId
*/
- public void mergeChainVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- setVertexValueAttributes();
+ public boolean mergeChainVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ return setVertexValueAttributes();
}
@Override
public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
index 55ddcff..84d846e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
@@ -23,9 +23,9 @@
Configuration conf = new Configuration();
FileSystem fileSys = FileSystem.get(conf);
- Path p = new Path("output");
+ Path p = new Path("graphbuildresult/CyclePath2_result");
//Path p2 = new Path("data/result");
- Path outFile = new Path("output2");
+ Path outFile = new Path("here");
SequenceFile.Reader reader;
SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
outFile, KmerBytesWritable.class, KmerCountValue.class,
@@ -33,7 +33,7 @@
KmerBytesWritable key = new KmerBytesWritable(kmerSize);
KmerCountValue value = new KmerCountValue();
- File dir = new File("output");
+ File dir = new File("graphbuildresult/CyclePath2_result");
for(File child : dir.listFiles()){
String name = child.getAbsolutePath();
Path inFile = new Path(p, name);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
index 249c293..026bba2 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
@@ -1,9 +1,12 @@
package edu.uci.ics.genomix.pregelix.sequencefile;
import java.io.BufferedWriter;
+import java.io.File;
import java.io.FileWriter;
+import java.io.FilenameFilter;
import java.io.IOException;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -16,14 +19,16 @@
public class GenerateTextFile {
- public static void generateFromPathmergeResult() throws IOException{
- BufferedWriter bw = new BufferedWriter(new FileWriter("output2"));
+ public static void generateFromPathmergeResult(int kmerSize, String strSrcDir, String outPutDir) throws IOException{
Configuration conf = new Configuration();
- FileSystem fileSys = FileSystem.get(conf);
- for(int i = 0; i < 2; i++){
- Path path = new Path("output/part-" + i);
- SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
- KmerBytesWritable key = new KmerBytesWritable(55);
+ FileSystem fileSys = FileSystem.getLocal(conf);
+
+ fileSys.create(new Path(outPutDir));
+ BufferedWriter bw = new BufferedWriter(new FileWriter(outPutDir));
+ File srcPath = new File(strSrcDir);
+ for(File f : srcPath.listFiles((FilenameFilter)(new WildcardFileFilter("part*")))){
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(f.getAbsolutePath()), conf);
+ KmerBytesWritable key = new KmerBytesWritable(kmerSize);
ValueStateWritable value = new ValueStateWritable();
while(reader.next(key, value)){
@@ -111,7 +116,7 @@
* @throws IOException
*/
public static void main(String[] args) throws IOException {
- generateFromPathmergeResult();
+ //generateFromPathmergeResult();
//generateFromGraphbuildResult();
//generateSpecificLengthChainFromPathmergeResult(68);
//generateSpecificLengthChainFromLogPathmergeResult(68);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/testcase/GenerateTestInput.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/testcase/GenerateTestInput.java
index fc26f66..50f1679 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/testcase/GenerateTestInput.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/testcase/GenerateTestInput.java
@@ -56,7 +56,7 @@
OutputStreamWriter writer;
try {
writer = new OutputStreamWriter(new FileOutputStream("graph/55/SinglePath_55"));
- writer.write(simplePath(55,500,1));
+ writer.write(simplePath(55,320,1));
writer.close();
/*writer = new OutputStreamWriter(new FileOutputStream("graph/55/SimplePath_55"));
writer.write(simplePath(55,60,3));
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
index 4bb40c9..4257f89 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
@@ -9,7 +9,7 @@
public static final byte TODELETE = 4;
public static final byte FINAL_VERTEX = 5;
public static final byte FINAL_DELETE = 6;
- public static final byte KILL_SELF = 7;
+ public static final byte CYCLE = 7;
public final static class STATE_CONTENT{
@@ -37,8 +37,8 @@
case FINAL_DELETE:
r = "FINAL_DELETE";
break;
- case KILL_SELF:
- r = "KILL_SELF";
+ case CYCLE:
+ r = "CYCLE";
break;
}
return r;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/GraphVertexOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/GraphVertexOperation.java
index 47f2c63..f086df4 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/GraphVertexOperation.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/GraphVertexOperation.java
@@ -47,5 +47,19 @@
public static byte updateRightNeighber(byte oldVertexValue, byte newVertexValue){
return (byte) ((byte)(oldVertexValue & 0xF0) | (byte) (newVertexValue & 0x0F));
}
-
+ /**
+ * check if mergeChain is cycle
+ */
+ public static boolean isCycle(KmerBytesWritable vertexId, VKmerBytesWritable mergeChain){
+ String chain = mergeChain.toString().substring(1);
+ if(chain.contains(vertexId.toString()))
+ return true;
+ return false;
+ }
+ /**
+ * reverse neighber
+ */
+ public static byte reverseAdjMap(byte oldAdjMap, byte geneCode){
+ return (byte) ((oldAdjMap & 0xF0) | (GeneCode.getBitMapFromGeneCode(geneCode) & 0x0F));
+ }
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index cda48f7..ca033e6 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -23,28 +23,9 @@
public class JobGenerator {
- private static String outputBase = "src/test/resources/jobs/";
- private static String HDFS_INPUTPATH = "/webmap";
- private static String HDFS_OUTPUTPAH = "/result";
+ public static String outputBase = "src/test/resources/jobs/";
- private static void generateLoadGraphJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(LoadGraphVertex.class);
- job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
- FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void genLoadGraph() throws IOException {
- generateLoadGraphJob("LoadGraph", outputBase + "LoadGraph.xml");
- }
-
- private static void generateMergeGraphJob(String jobName, String outputPath) throws IOException {
+ private static void generateNaiveAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(NaiveAlgorithmForPathMergeVertex.class);
job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
@@ -52,34 +33,12 @@
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
- FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
- FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 5);
- //job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.ITERATIONS, 10);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
- private static void genMergeGraph() throws IOException {
- generateMergeGraphJob("MergeGraph", outputBase + "MergeGraph.xml");
- }
-
- private static void generateThreeStepLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(ThreeStepLogAlgorithmForPathMergeVertex.class);
- job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
- FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
- job.getConfiguration().setInt(ThreeStepLogAlgorithmForPathMergeVertex.KMER_SIZE, 5);
- job.getConfiguration().setInt(ThreeStepLogAlgorithmForPathMergeVertex.ITERATIONS, 5);
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void genThreeStepLogAlgorithmForMergeGraph() throws IOException {
- generateThreeStepLogAlgorithmForMergeGraphJob("ThreeStepLogAlgorithmForMergeGraph", outputBase + "ThreeStepLogAlgorithmForMergeGraph.xml");
+ private static void genNaiveAlgorithmForMergeGraph() throws IOException {
+ generateNaiveAlgorithmForMergeGraphJob("NaiveAlgorithmForMergeGraph", outputBase + "NaiveAlgorithmForMergeGraph.xml");
}
private static void generateTwoStepLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
@@ -90,9 +49,7 @@
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
- FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
- FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
- job.getConfiguration().setInt(TwoStepLogAlgorithmForPathMergeVertex.KMER_SIZE, 55);
+ job.getConfiguration().setInt(TwoStepLogAlgorithmForPathMergeVertex.KMER_SIZE, 5);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -105,11 +62,8 @@
* @throws IOException
*/
public static void main(String[] args) throws IOException {
- // TODO Auto-generated method stub
- //genLoadGraph();
- genMergeGraph();
- //genThreeStepLogAlgorithmForMergeGraph();
- //genTwoStepLogAlgorithmForMergeGraph();
+ genNaiveAlgorithmForMergeGraph();
+ genTwoStepLogAlgorithmForMergeGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
index 45d8185..1af0d6e 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
@@ -40,7 +40,7 @@
private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
private static final String FILE_EXTENSION_OF_RESULTS = "result";
- private static final String DATA_PATH = "data/sequencefile/TreePath";
+ private static final String DATA_PATH = "data/sequencefile/Path";
private static final String HDFS_PATH = "/webmap/";
private static final String HYRACKS_APP_NAME = "pregelix";
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java
index 7ecf31f..16fe317 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java
@@ -33,156 +33,168 @@
public class GraphBuildTest {
private static final String ACTUAL_RESULT_DIR = "graphbuildresult";
- private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_PATH = "data/sequencefile/BridgePath";
- private static final String HDFS_INPUT_PATH = "/BridgePath";
- private static final String HDFS_OUTPUT_PATH = "/BridgePath_result";
+ private static final String DATA_PATH = "data/TwoKmer";
+ private static final String HDFS_INPUT_PATH = "/CyclePath2";
+ private static final String HDFS_OUTPUT_PATH = "/CyclePath2_result";
- private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/result.txt";
- private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
- private static final String EXPECTED_PATH = "src/test/resources/expected/result2";
- private static final String EXPECTED_REVERSE_PATH = "src/test/resources/expected/result_reverse";
+ private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR
+ + HDFS_OUTPUT_PATH + "/result.txt";
+ private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
+ private static final String EXPECTED_PATH = "src/test/resources/expected/result2";
+ private static final String EXPECTED_REVERSE_PATH = "src/test/resources/expected/result_reverse";
- private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private MiniDFSCluster dfsCluster;
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
+ + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
- private JobConf conf = new JobConf();
- private int numberOfNC = 2;
- private int numPartitionPerMachine = 1;
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
+ private int numPartitionPerMachine = 1;
- private Driver driver;
+ private Driver driver;
- @Before
- public void setUp() throws Exception {
- cleanupStores();
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
- FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
- FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- startHDFS();
+ @Before
+ public void setUp() throws Exception {
+ cleanupStores();
+ edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
- FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
- FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
+ FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
+ FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
- conf.setInt(GenomixJob.KMER_LENGTH, 5);
- driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
- }
+ conf.setInt(GenomixJob.KMER_LENGTH, 5);
+ driver = new Driver(
+ edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
+ edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT,
+ numPartitionPerMachine);
+ }
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File("teststore"));
- FileUtils.forceMkdir(new File("build"));
- FileUtils.cleanDirectory(new File("teststore"));
- FileUtils.cleanDirectory(new File("build"));
- }
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
- private void startHDFS() throws IOException {
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path("build"), true);
- System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
- FileSystem dfs = FileSystem.get(conf);
- Path src = new Path(DATA_PATH);
- Path dest = new Path(HDFS_INPUT_PATH);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
+ Path src = new Path(DATA_PATH);
+ Path dest = new Path(HDFS_INPUT_PATH);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
- DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
- conf.writeXml(confOutput);
- confOutput.flush();
- confOutput.close();
- }
+ DataOutputStream confOutput = new DataOutputStream(
+ new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
- private void cleanUpReEntry() throws IOException {
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- if (lfs.exists(new Path(DUMPED_RESULT))) {
- lfs.delete(new Path(DUMPED_RESULT), true);
- }
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
- dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
- }
- }
+ private void cleanUpReEntry() throws IOException {
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ if (lfs.exists(new Path(DUMPED_RESULT))) {
+ lfs.delete(new Path(DUMPED_RESULT), true);
+ }
+ FileSystem dfs = FileSystem.get(conf);
+ if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
+ dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
+ }
+ }
- @Test
- public void TestAll() throws Exception {
- cleanUpReEntry();
- TestHybridGroupby();
- cleanUpReEntry();
- TestPreClusterGroupby();
- }
+ @Test
+ public void TestAll() throws Exception {
+ cleanUpReEntry();
+ TestHybridGroupby();
+ cleanUpReEntry();
+ TestPreClusterGroupby();
+ }
- public void TestPreClusterGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
- System.err.println("Testing PreClusterGroupBy");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_PATH));
- }
-
- public void TestHybridGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
- System.err.println("Testing HybridGroupBy");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_PATH));
- }
+ public void TestPreClusterGroupby() throws Exception {
+ conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
+ System.err.println("Testing PreClusterGroupBy");
+ driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+ Assert.assertEquals(true, checkResults(EXPECTED_PATH));
+ }
- private boolean checkResults(String expectedPath) throws Exception {
- File dumped = null;
- String format = conf.get(GenomixJob.OUTPUT_FORMAT);
- if ("text".equalsIgnoreCase(format)) {
- FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
- FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
- dumped = new File(DUMPED_RESULT);
- } else {
+ public void TestHybridGroupby() throws Exception {
+ conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
+ System.err.println("Testing HybridGroupBy");
+ driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+ Assert.assertEquals(true, checkResults(EXPECTED_PATH));
+ }
- FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
- File filePathTo = new File(CONVERT_RESULT);
- BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
- for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
- String partname = "/part-" + i;
- FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
- + partname), FileSystem.getLocal(new Configuration()),
- new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname), false, conf);
+ private boolean checkResults(String expectedPath) throws Exception {
+ File dumped = null;
+ String format = conf.get(GenomixJob.OUTPUT_FORMAT);
+ if ("text".equalsIgnoreCase(format)) {
+ FileUtil.copyMerge(FileSystem.get(conf),
+ new Path(HDFS_OUTPUT_PATH), FileSystem
+ .getLocal(new Configuration()), new Path(
+ DUMPED_RESULT), false, conf, null);
+ dumped = new File(DUMPED_RESULT);
+ } else {
- Path path = new Path(HDFS_OUTPUT_PATH + partname);
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.getFileStatus(path).getLen() == 0) {
- continue;
- }
- SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
- KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
- GenomixJob.DEFAULT_KMER));
- KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ FileSystem.getLocal(new Configuration()).mkdirs(
+ new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
+ File filePathTo = new File(CONVERT_RESULT);
+ BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+ for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
+ String partname = "/part-" + i;
+ FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
+ + partname), FileSystem.getLocal(new Configuration()),
+ new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH
+ + partname), false, conf);
- while (reader.next(key, value)) {
- if (key == null || value == null) {
- break;
- }
- bw.write(key.toString() + "\t" + value.toString());
- System.out.println(key.toString() + "\t" + value.toString());
- bw.newLine();
- }
- reader.close();
- }
- bw.close();
- dumped = new File(CONVERT_RESULT);
- }
+ Path path = new Path(HDFS_OUTPUT_PATH + partname);
+ FileSystem dfs = FileSystem.get(conf);
+ if (dfs.getFileStatus(path).getLen() == 0) {
+ continue;
+ }
+ SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path,
+ conf);
+ KmerBytesWritable key = new KmerBytesWritable(conf.getInt(
+ GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER));
+ KmerCountValue value = (KmerCountValue) ReflectionUtils
+ .newInstance(reader.getValueClass(), conf);
- //TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
- return true;
- }
+ while (reader.next(key, value)) {
+ if (key == null || value == null) {
+ break;
+ }
+ bw.write(key.toString() + "\t" + value.toString());
+ System.out
+ .println(key.toString() + "\t" + value.toString());
+ bw.newLine();
+ }
+ reader.close();
+ }
+ bw.close();
+ dumped = new File(CONVERT_RESULT);
+ }
- @After
- public void tearDown() throws Exception {
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
- cleanupHDFS();
- }
+ // TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+ return true;
+ }
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
+ @After
+ public void tearDown() throws Exception {
+ edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
+ cleanupHDFS();
+ }
+
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestCase.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestCase.java
index 8191496..67b6f21 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestCase.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestCase.java
@@ -15,8 +15,6 @@
package edu.uci.ics.genomix.pregelix.pathmerge;
-import java.io.File;
-
import junit.framework.TestCase;
import org.apache.hadoop.fs.FileSystem;
@@ -25,101 +23,67 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
-import edu.uci.ics.genomix.pregelix.example.util.TestUtils;
+import edu.uci.ics.genomix.pregelix.sequencefile.GenerateTextFile;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.core.base.IDriver.Plan;
import edu.uci.ics.pregelix.core.driver.Driver;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
public class PathMergeSmallTestCase extends TestCase {
- private static String HDFS_INPUTPATH = "/BridgePath";
- private static String HDFS_OUTPUTPAH = "/resultBridgePath";
+ private final PregelixJob job;
+ private final String resultFileDir;
+ private final String textFileDir;
+ private final String jobFile;
+ private final Driver driver = new Driver(this.getClass());
+ private final FileSystem dfs;
- /*private static String HDFS_INPUTPATH2 = "/CyclePath";
- private static String HDFS_OUTPUTPAH2 = "/resultCyclePath";
+ public PathMergeSmallTestCase(String hadoopConfPath, String jobName,
+ String jobFile, FileSystem dfs, String hdfsInput, String resultFile, String textFile)
+ throws Exception {
+ super("test");
+ this.jobFile = jobFile;
+ this.job = new PregelixJob("test");
+ this.job.getConfiguration().addResource(new Path(jobFile));
+ this.job.getConfiguration().addResource(new Path(hadoopConfPath));
+ FileInputFormat.setInputPaths(job, hdfsInput);
+ FileOutputFormat.setOutputPath(job, new Path(hdfsInput + "_result"));
+ this.textFileDir = textFile;
+ job.setJobName(jobName);
+ this.resultFileDir = resultFile;
+
+ this.dfs = dfs;
+ }
- private static String HDFS_INPUTPATH3 = "/LongPath";
- private static String HDFS_OUTPUTPAH3 = "/resultLongPath";
+ private void waitawhile() throws InterruptedException {
+ synchronized (this) {
+ this.wait(20);
+ }
+ }
- private static String HDFS_INPUTPATH4 = "/Path";
- private static String HDFS_OUTPUTPAH4 = "/resultPath";
+ @Test
+ public void test() throws Exception {
+ setUp();
+ Plan[] plans = new Plan[] { Plan.OUTER_JOIN };
+ for (Plan plan : plans) {
+ driver.runJob(job, plan, PregelixHyracksIntegrationUtil.CC_HOST,
+ PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT,
+ false);
+ }
+ compareResults();
+ tearDown();
+ waitawhile();
+ }
- private static String HDFS_INPUTPATH5 = "/SimplePath";
- private static String HDFS_OUTPUTPAH5 = "/resultSimplePath";
-
- private static String HDFS_INPUTPATH6 = "/SinglePath";
- private static String HDFS_OUTPUTPAH6 = "/resultSinglePath";
-
- private static String HDFS_INPUTPATH7 = "/TreePath";
- private static String HDFS_OUTPUTPAH7 = "/resultTreePath";*/
+ private void compareResults() throws Exception {
+ dfs.copyToLocalFile(FileOutputFormat.getOutputPath(job), new Path(
+ resultFileDir));
+ GenerateTextFile.generateFromPathmergeResult(5, resultFileDir, textFileDir);
+ // TestUtils.compareWithResultDir(new File(expectedFileDir), new
+ // File(resultFileDir));
+ }
- private final PregelixJob job;
- private final String resultFileDir;
- private final String jobFile;
- private final Driver driver = new Driver(this.getClass());
- private final FileSystem dfs;
+ public String toString() {
+ return jobFile;
+ }
- public PathMergeSmallTestCase(String hadoopConfPath, String jobName, String jobFile, String resultFile,
- FileSystem dfs) throws Exception {
- super("test");
- this.jobFile = jobFile;
- this.job = new PregelixJob("test");
- this.job.getConfiguration().addResource(new Path(jobFile));
- this.job.getConfiguration().addResource(new Path(hadoopConfPath));
- Path[] inputPaths = FileInputFormat.getInputPaths(job);
- if (inputPaths[0].toString().endsWith(HDFS_INPUTPATH)) {
- FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
- FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
- }
- /*else if (inputPaths[0].toString().endsWith(HDFS_INPUTPATH2)) {
- FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
- FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
- } else if (inputPaths[0].toString().endsWith(HDFS_INPUTPATH3)) {
- FileInputFormat.setInputPaths(job, HDFS_INPUTPATH3);
- FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
- } else if (inputPaths[0].toString().endsWith(HDFS_INPUTPATH4)) {
- FileInputFormat.setInputPaths(job, HDFS_INPUTPATH4);
- FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH4));
- } else if (inputPaths[0].toString().endsWith(HDFS_INPUTPATH5)) {
- FileInputFormat.setInputPaths(job, HDFS_INPUTPATH5);
- FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH5));
- } else if (inputPaths[0].toString().endsWith(HDFS_INPUTPATH6)) {
- FileInputFormat.setInputPaths(job, HDFS_INPUTPATH6);
- FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH6));
- } else if (inputPaths[0].toString().endsWith(HDFS_INPUTPATH7)) {
- FileInputFormat.setInputPaths(job, HDFS_INPUTPATH7);
- FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH7));
- }*/
- job.setJobName(jobName);
- this.resultFileDir = resultFile;
- this.dfs = dfs;
- }
-
- private void waitawhile() throws InterruptedException {
- synchronized (this) {
- this.wait(20);
- }
- }
-
- @Test
- public void test() throws Exception {
- setUp();
- Plan[] plans = new Plan[] { Plan.OUTER_JOIN };
- for (Plan plan : plans) {
- driver.runJob(job, plan, PregelixHyracksIntegrationUtil.CC_HOST,
- PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT, false);
- }
- compareResults();
- tearDown();
- waitawhile();
- }
-
- private void compareResults() throws Exception {
- dfs.copyToLocalFile(FileOutputFormat.getOutputPath(job), new Path(resultFileDir));
- //TestUtils.compareWithResultDir(new File(expectedFileDir), new File(resultFileDir));
- }
-
- public String toString() {
- return jobFile;
- }
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestSuite.java
index 5329515..43555c5 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestSuite.java
@@ -20,6 +20,7 @@
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -30,6 +31,7 @@
import junit.framework.TestSuite;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -41,195 +43,179 @@
@SuppressWarnings("deprecation")
public class PathMergeSmallTestSuite extends TestSuite {
- private static final Logger LOGGER = Logger.getLogger(PathMergeSmallTestSuite.class.getName());
+ private static final Logger LOGGER = Logger
+ .getLogger(PathMergeSmallTestSuite.class.getName());
- private static final String ACTUAL_RESULT_DIR = "actual";
- private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
- private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
- private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
- private static final String PATH_TO_IGNORE = "src/test/resources/ignore.txt";
- private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
+ public static final String PreFix = "data/PathTestSet";
+ public static final String[] TestDir = { PreFix + File.separator
+ + "TwoKmer", PreFix + File.separator
+ + "ThreeKmer", PreFix + File.separator
+ + "SinglePath", PreFix + File.separator
+ + "SimplePath", PreFix + File.separator
+ + "Path", PreFix + File.separator
+ + "BridgePath", PreFix + File.separator
+ + "CyclePath", PreFix + File.separator
+ + "CyclePath2", PreFix + File.separator
+ + "LongPath", PreFix + File.separator
+ + "TreePath"};
+ private static final String ACTUAL_RESULT_DIR = "actual";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+ private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+ private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
+ private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
- private static final String DATA_PATH = "data/input/BridgePath";
- private static final String HDFS_PATH = "/BridgePath/";
+ public static final String HDFS_INPUTPATH = "/PathTestSet";
- /*private static final String DATA_PATH2 = "data/sequencefile/CyclePath";
- private static final String HDFS_PATH2 = "/CyclePath/";
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
+ + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
- private static final String DATA_PATH3 = "data/sequencefile/LongPath";
- private static final String HDFS_PATH3 = "/LongPath/";
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
- private static final String DATA_PATH4 = "data/sequencefile/Path";
- private static final String HDFS_PATH4 = "/Path/";
+ public void setUp() throws Exception {
+ ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+ ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+ cleanupStores();
+ PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
+ LOGGER.info("Hyracks mini-cluster started");
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
+ }
- private static final String DATA_PATH5 = "data/sequencefile/SimplePath";
- private static final String HDFS_PATH5 = "/SimplePath/";
-
- private static final String DATA_PATH6 = "data/sequencefile/SinglePath";
- private static final String HDFS_PATH6 = "/SinglePath/";
-
- private static final String DATA_PATH7 = "data/sequencefile/TreePath";
- private static final String HDFS_PATH7 = "/TreePath/";*/
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
- private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private MiniDFSCluster dfsCluster;
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
- private JobConf conf = new JobConf();
- private int numberOfNC = 1;
+ for (String testDir : TestDir) {
+ File src = new File(testDir);
+ Path dest = new Path(HDFS_INPUTPATH + File.separator + src.getName());
+ dfs.mkdirs(dest);
+ //src.listFiles((FilenameFilter)(new WildcardFileFilter("part*")))
+ for (File f : src.listFiles()){
+ dfs.copyFromLocalFile(new Path(f.getAbsolutePath()), dest);
+ }
+ }
- public void setUp() throws Exception {
- ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
- ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
- cleanupStores();
- PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
- LOGGER.info("Hyracks mini-cluster started");
- FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
- FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- startHDFS();
- }
+ DataOutputStream confOutput = new DataOutputStream(
+ new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File("teststore"));
- FileUtils.forceMkdir(new File("build"));
- FileUtils.cleanDirectory(new File("teststore"));
- FileUtils.cleanDirectory(new File("build"));
- }
+ /**
+ * cleanup hdfs cluster
+ */
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
- private void startHDFS() throws IOException {
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path("build"), true);
- System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
- FileSystem dfs = FileSystem.get(conf);
- Path src = new Path(DATA_PATH);
- Path dest = new Path(HDFS_PATH);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
+ public void tearDown() throws Exception {
+ PregelixHyracksIntegrationUtil.deinit();
+ LOGGER.info("Hyracks mini-cluster shut down");
+ cleanupHDFS();
+ }
- /*src = new Path(DATA_PATH2);
- dest = new Path(HDFS_PATH2);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
+ public static Test suite() throws Exception {
+ List<String> onlys = getFileList(PATH_TO_ONLY);
+ File testData = new File(PATH_TO_JOBS);
+ File[] queries = testData.listFiles();
+ PathMergeSmallTestSuite testSuite = new PathMergeSmallTestSuite();
+ testSuite.setUp();
+ boolean onlyEnabled = false;
+ FileSystem dfs = FileSystem.get(testSuite.conf);
- src = new Path(DATA_PATH3);
- dest = new Path(HDFS_PATH3);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
+ if (onlys.size() > 0) {
+ onlyEnabled = true;
+ }
- src = new Path(DATA_PATH4);
- dest = new Path(HDFS_PATH4);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
+ for (File qFile : queries) {
+ if (qFile.isFile()) {
+ if (onlyEnabled && !isInList(onlys, qFile.getName())) {
+ continue;
+ } else {
+ for (String testPathStr : TestDir) {
+ File testDir = new File(testPathStr);
+ String resultFileName = ACTUAL_RESULT_DIR
+ + File.separator
+ + jobExtToResExt(qFile.getName())
+ + File.separator + "BinaryOutput"
+ + File.separator + testDir.getName();
+ String textFileName = ACTUAL_RESULT_DIR
+ + File.separator
+ + jobExtToResExt(qFile.getName())
+ + File.separator + "TextOutput"
+ + File.separator + testDir.getName();
+ testSuite.addTest(new PathMergeSmallTestCase(
+ HADOOP_CONF_PATH, qFile.getName(), qFile
+ .getAbsolutePath().toString(),
+ dfs, HDFS_INPUTPATH + File.separator + testDir.getName(),
+ resultFileName, textFileName));
+ }
+ }
+ }
+ }
+ return testSuite;
+ }
- src = new Path(DATA_PATH5);
- dest = new Path(HDFS_PATH5);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
-
- src = new Path(DATA_PATH6);
- dest = new Path(HDFS_PATH6);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
-
- src = new Path(DATA_PATH7);
- dest = new Path(HDFS_PATH7);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);*/
+ /**
+ * Runs the tests and collects their result in a TestResult.
+ */
+ @Override
+ public void run(TestResult result) {
+ try {
+ int testCount = countTestCases();
+ for (int i = 0; i < testCount; i++) {
+ // cleanupStores();
+ Test each = this.testAt(i);
+ if (result.shouldStop())
+ break;
+ runTest(each, result);
+ }
+ tearDown();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
- DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
- conf.writeXml(confOutput);
- confOutput.flush();
- confOutput.close();
- }
+ protected static List<String> getFileList(String ignorePath)
+ throws FileNotFoundException, IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+ String s = null;
+ List<String> ignores = new ArrayList<String>();
+ while ((s = reader.readLine()) != null) {
+ ignores.add(s);
+ }
+ reader.close();
+ return ignores;
+ }
- /**
- * cleanup hdfs cluster
- */
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
+ private static String jobExtToResExt(String fname) {
+ int dot = fname.lastIndexOf('.');
+ return fname.substring(0, dot);
+ }
- public void tearDown() throws Exception {
- PregelixHyracksIntegrationUtil.deinit();
- LOGGER.info("Hyracks mini-cluster shut down");
- cleanupHDFS();
- }
-
- public static Test suite() throws Exception {
- List<String> ignores = getFileList(PATH_TO_IGNORE);
- List<String> onlys = getFileList(PATH_TO_ONLY);
- File testData = new File(PATH_TO_JOBS);
- File[] queries = testData.listFiles();
- PathMergeSmallTestSuite testSuite = new PathMergeSmallTestSuite();
- testSuite.setUp();
- boolean onlyEnabled = false;
- FileSystem dfs = FileSystem.get(testSuite.conf);
-
- if (onlys.size() > 0) {
- onlyEnabled = true;
- }
- for (File qFile : queries) {
- if (isInList(ignores, qFile.getName()))
- continue;
-
- if (qFile.isFile()) {
- if (onlyEnabled && !isInList(onlys, qFile.getName())) {
- continue;
- } else {
- String resultFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName());
- testSuite.addTest(new PathMergeSmallTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile.getAbsolutePath()
- .toString(), resultFileName, dfs));
- }
- }
- }
- return testSuite;
- }
-
- /**
- * Runs the tests and collects their result in a TestResult.
- */
- @Override
- public void run(TestResult result) {
- try {
- int testCount = countTestCases();
- for (int i = 0; i < testCount; i++) {
- // cleanupStores();
- Test each = this.testAt(i);
- if (result.shouldStop())
- break;
- runTest(each, result);
- }
- tearDown();
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- protected static List<String> getFileList(String ignorePath) throws FileNotFoundException, IOException {
- BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
- String s = null;
- List<String> ignores = new ArrayList<String>();
- while ((s = reader.readLine()) != null) {
- ignores.add(s);
- }
- reader.close();
- return ignores;
- }
-
- private static String jobExtToResExt(String fname) {
- int dot = fname.lastIndexOf('.');
- return fname.substring(0, dot);
- }
-
- private static boolean isInList(List<String> onlys, String name) {
- for (String only : onlys)
- if (name.indexOf(only) >= 0)
- return true;
- return false;
- }
+ private static boolean isInList(List<String> onlys, String name) {
+ for (String only : onlys)
+ if (name.indexOf(only) >= 0)
+ return true;
+ return false;
+ }
}