refactor 'NaiveAlgorithm' to 'P1' and add test case for SplitRepeat
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
index 4330c77..9993ab0 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
@@ -379,8 +379,12 @@
s2.add((long) 2);
s2.add((long) 3);
Set<Long> intersection = new HashSet<Long>();
- intersection = s1;
+ intersection.addAll(s1);
intersection.retainAll(s2);
System.out.println(intersection.toString());
+ Set<Long> difference = new HashSet<Long>();
+ difference = s1;
+ difference.removeAll(s2);
+ System.out.println(difference.toString());
}
}
diff --git a/genomix/genomix-hadoop/data/webmap/SplitRepeat.txt b/genomix/genomix-hadoop/data/webmap/SplitRepeat.txt
new file mode 100644
index 0000000..bb03d70
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/SplitRepeat.txt
@@ -0,0 +1,2 @@
+1 AATAG
+2 CATAC
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
index 217e882..127ab3e 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
@@ -22,13 +22,13 @@
private JobConf conf = new JobConf();
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private static final String DATA_PATH = "data/webmap/pathmerge_TestSet/9";
+ private static final String DATA_PATH = "data/webmap/SplitRepeat.txt";
private static final String HDFS_PATH = "/webmap";
private static final String RESULT_PATH = "/result";
// private static final int COUNT_REDUCER = 2;
private static final int SIZE_KMER = 3;
- private static final int READ_LENGTH = 11;
+ private static final int READ_LENGTH = 5;
private MiniDFSCluster dfsCluster;
private MiniMRCluster mrCluster;
diff --git a/genomix/genomix-pregelix/data/SplitRepeat/SimpleTest/part-00000 b/genomix/genomix-pregelix/data/SplitRepeat/SimpleTest/part-00000
new file mode 100755
index 0000000..4977247
--- /dev/null
+++ b/genomix/genomix-pregelix/data/SplitRepeat/SimpleTest/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index 43bc73f..0da4a77 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -10,7 +10,7 @@
import org.kohsuke.args4j.Option;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P2ForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.pathmerge.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.P1ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.core.base.IDriver.Plan;
@@ -66,11 +66,11 @@
for (int i = 1; i < inputs.length; i++)
FileInputFormat.addInputPaths(job, inputs[0]);
FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
- job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
+ job.getConfiguration().setInt(P1ForPathMergeVertex.KMER_SIZE, options.sizeKmer);
job.getConfiguration().setInt(P2ForPathMergeVertex.KMER_SIZE, options.sizeKmer);
job.getConfiguration().setInt(P3ForPathMergeVertex.KMER_SIZE, options.sizeKmer);
if (options.numIteration > 0) {
- job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
+ job.getConfiguration().setInt(P1ForPathMergeVertex.ITERATIONS, options.numIteration);
job.getConfiguration().setInt(P2ForPathMergeVertex.ITERATIONS, options.numIteration);
job.getConfiguration().setInt(P3ForPathMergeVertex.ITERATIONS, options.numIteration);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index 7fe27e6..66dd474 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -120,6 +120,17 @@
}
}
+ public KmerBytesWritable getCreatedVertexId() {
+ return kmer;
+ }
+
+ public void setCreatedVertexId(KmerBytesWritable actualKmer) {
+ if (actualKmer != null) {
+ checkMessage |= CheckMessage.CHAIN;
+ this.kmer.set(actualKmer);
+ }
+ }
+
public AdjacencyListWritable getNeighberNode() {
return neighberNode;
}
@@ -165,7 +176,10 @@
}
public void setNodeIdList(PositionListWritable nodeIdList) {
- this.nodeIdList.set(nodeIdList);
+ if(nodeIdList != null){
+ checkMessage |= CheckMessage.NODEIDLIST;
+ this.nodeIdList.set(nodeIdList);
+ }
}
@Override
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
similarity index 95%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
index 9d27769..31fb897 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
@@ -49,10 +49,10 @@
/**
* Naive Algorithm for path merge graph
*/
-public class NaiveAlgorithmForPathMergeVertex extends
+public class P1ForPathMergeVertex extends
Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
- public static final String KMER_SIZE = "NaiveAlgorithmForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
+ public static final String KMER_SIZE = "P1ForPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "P1ForPathMergeVertex.iteration";
public static int kmerSize = -1;
private int maxIteration = -1;
@@ -251,8 +251,8 @@
}
public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(NaiveAlgorithmForPathMergeVertex.class.getSimpleName());
- job.setVertexClass(NaiveAlgorithmForPathMergeVertex.class);
+ PregelixJob job = new PregelixJob(P1ForPathMergeVertex.class.getSimpleName());
+ job.setVertexClass(P1ForPathMergeVertex.class);
/**
* BinaryInput and BinaryOutput
*/
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
index 640d6bc..f019fb0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
@@ -4,14 +4,18 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.BspUtils;
public class SplitRepeatVertex extends
BasicGraphCleanVertex{
@@ -86,6 +90,7 @@
{"FR", "RF"},
{"FR", "RR"}
};
+ public static Set<String> existKmerString = new HashSet<String>();
private Set<Long> readIdSet = new HashSet<Long>();
private Set<Long> incomingReadIdSet = new HashSet<Long>();
private Set<Long> outgoingReadIdSet = new HashSet<Long>();
@@ -94,16 +99,60 @@
private Set<Long> outgoingEdgeIntersection = new HashSet<Long>();
private Set<Long> neighborEdgeIntersection = new HashSet<Long>();
private Map<KmerBytesWritable, Set<Long>> kmerMap = new HashMap<KmerBytesWritable, Set<Long>>();
- private KmerListWritable incomingEdgeList = new KmerListWritable(kmerSize);
- private KmerListWritable outgoingEdgeList = new KmerListWritable(kmerSize);
+ private KmerListWritable incomingEdgeList = null;
+ private KmerListWritable outgoingEdgeList = null;
private byte incomingEdgeDir = 0;
private byte outgoingEdgeDir = 0;
+ protected KmerBytesWritable createdVertexId = null;
private CreatedVertex createdVertex = new CreatedVertex();
public static Set<CreatedVertex> createdVertexSet = new HashSet<CreatedVertex>();
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex() {
+ if (kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ if(incomingMsg == null)
+ incomingMsg = new MessageWritable(kmerSize);
+ if(outgoingMsg == null)
+ outgoingMsg = new MessageWritable(kmerSize);
+ else
+ outgoingMsg.reset(kmerSize);
+ if(incomingEdgeList == null)
+ incomingEdgeList = new KmerListWritable(kmerSize);
+ if(outgoingEdgeList == null)
+ outgoingEdgeList = new KmerListWritable(kmerSize);
+ if(createdVertexId == null)
+ createdVertexId = new KmerBytesWritable(kmerSize + 1);
+ }
+
+ /**
+ * Generate random string from [ACGT]
+ */
+ public String generaterRandomString(int n){
+ char[] chars = "ACGT".toCharArray();
+ StringBuilder sb = new StringBuilder();
+ Random random = new Random();
+ while(true){
+ for (int i = 0; i < n; i++) {
+ char c = chars[random.nextInt(chars.length)];
+ sb.append(c);
+ }
+ if(!existKmerString.contains(sb.toString()))
+ break;
+ }
+ existKmerString.add(sb.toString());
+ return sb.toString();
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
+ initVertex();
if(getSuperstep() == 1){
if(getVertexValue().getDegree() > 2){
outgoingMsg.setSourceVertexId(getVertexId());
@@ -167,7 +216,7 @@
//set outgoingEdge readId intersection
outgoingEdgeIntersection = selfReadIdSet;
outgoingEdgeIntersection.retainAll(outgoingReadIdSet);
- outgoingEdgeIntersection.removeAll(neighborEdgeIntersection);
+ outgoingEdgeIntersection.removeAll(neighborEdgeIntersection);
//set incomingEdge readId intersection
incomingEdgeIntersection = selfReadIdSet;
incomingEdgeIntersection.retainAll(incomingReadIdSet);
@@ -175,13 +224,15 @@
if(!neighborEdgeIntersection.isEmpty()){
createdVertex.clear();
- createdVertex.setCreatedVertexId(getVertexId());
+ createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
+ createdVertex.setCreatedVertexId(createdVertexId);
createdVertex.setIncomingDir(connectedTable[i][1]);
createdVertex.setOutgoingDir(connectedTable[i][0]);
createdVertex.setIncomingEdge(incomingEdge);
createdVertex.setOutgoingEdge(outgoingEdge);
createdVertexSet.add(createdVertex);
+ outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
outgoingMsg.setSourceVertexId(getVertexId());
outgoingMsg.setFlag(incomingEdgeDir);
sendMsg(incomingEdge, outgoingMsg);
@@ -191,11 +242,13 @@
if(!incomingEdgeIntersection.isEmpty()){
createdVertex.clear();
- createdVertex.setCreatedVertexId(getVertexId());
+ createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
+ createdVertex.setCreatedVertexId(createdVertexId);
createdVertex.setIncomingDir(connectedTable[i][1]);
createdVertex.setIncomingEdge(incomingEdge);
createdVertexSet.add(createdVertex);
+ outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
outgoingMsg.setSourceVertexId(getVertexId());
outgoingMsg.setFlag(incomingEdgeDir);
sendMsg(incomingEdge, outgoingMsg);
@@ -203,11 +256,13 @@
if(!outgoingEdgeIntersection.isEmpty()){
createdVertex.clear();
- createdVertex.setCreatedVertexId(getVertexId());
+ createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
+ createdVertex.setCreatedVertexId(createdVertexId);
createdVertex.setOutgoingDir(connectedTable[i][0]);
createdVertex.setOutgoingEdge(outgoingEdge);
createdVertexSet.add(createdVertex);
+ outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
outgoingMsg.setSourceVertexId(getVertexId());
outgoingMsg.setFlag(outgoingEdgeDir);
sendMsg(outgoingEdge, outgoingMsg);
@@ -216,7 +271,53 @@
}
}
} else if(getSuperstep() == 4){
-
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ /** update edgelist to new/created vertex **/
+ switch(incomingMsg.getFlag()){
+ case MessageFlag.DIR_FF:
+ getVertexValue().getFFList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getFFList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_FR:
+ getVertexValue().getFRList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getFRList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_RF:
+ getVertexValue().getRFList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getRFList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_RR:
+ getVertexValue().getRRList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getRRList().append(incomingMsg.getCreatedVertexId());
+ break;
+ }
+ /** add new/created vertex **/
+ for(CreatedVertex v : createdVertexSet){
+ Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+ VertexValueWritable vertexValue = new VertexValueWritable();
+ switch(v.incomingDir){
+ case "RF":
+ vertexValue.getRFList().append(v.incomingEdge);
+ break;
+ case "RR":
+ vertexValue.getRRList().append(v.incomingEdge);
+ break;
+ }
+ switch(v.outgoingDir){
+ case "FF":
+ vertexValue.getFFList().append(v.outgoingEdge);
+ break;
+ case "FR":
+ vertexValue.getFRList().append(v.outgoingEdge);
+ break;
+ }
+ vertex.setVertexId(v.getCreatedVertexId());
+ vertex.setVertexValue(vertexValue);
+ }
+ }
}
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 1dae62e..630b5aa 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -15,9 +15,10 @@
import edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P2ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.MapReduceVertex;
-import edu.uci.ics.genomix.pregelix.operator.pathmerge.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.P1ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P4ForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.splitrepeat.SplitRepeatVertex;
import edu.uci.ics.genomix.pregelix.operator.tipremove.TipAddVertex;
import edu.uci.ics.genomix.pregelix.operator.tipremove.TipRemoveVertex;
import edu.uci.ics.genomix.type.KmerBytesWritable;
@@ -29,13 +30,13 @@
// private static void generateNaiveAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(NaiveAlgorithmForPathMergeVertex.class);
+// job.setVertexClass(P1ForPathMergeVertex.class);
// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class); //GraphCleanInputFormat
// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
// job.setDynamicVertexValueSize(true);
// job.setOutputKeyClass(PositionWritable.class);
// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 3);
+// job.getConfiguration().setInt(P1ForPathMergeVertex.KMER_SIZE, 3);
// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
// }
//
@@ -111,6 +112,22 @@
private static void genMapReduceGraph() throws IOException {
generateMapReduceGraphJob("MapReduceGraph", outputBase + "MapReduceGraph.xml");
}
+
+ private static void generateSplitRepeatGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(SplitRepeatVertex.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(SplitRepeatVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genSplitRepeatGraph() throws IOException {
+ generateSplitRepeatGraphJob("SplitRepeatGraph", outputBase + "SplitRepeatGraph.xml");
+ }
// private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
// job.setVertexClass(TipAddVertex.class);
@@ -215,7 +232,7 @@
public static void main(String[] args) throws IOException {
//genNaiveAlgorithmForMergeGraph();
- genLogAlgorithmForMergeGraph();
+// genLogAlgorithmForMergeGraph();
//genP3ForMergeGraph();
//genTipAddGraph();
// genTipRemoveGraph();
@@ -225,6 +242,7 @@
// genBubbleMergeGraph();
// genP4ForMergeGraph();
// genMapReduceGraph();
+ genSplitRepeatGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatmallTestSuite.java
new file mode 100644
index 0000000..d87281d
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatmallTestSuite.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.pregelix.JobRun;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import junit.framework.Test;
+import junit.framework.TestResult;
+import junit.framework.TestSuite;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+
+@SuppressWarnings("deprecation")
+public class SplitRepeatmallTestSuite extends TestSuite {
+ private static final Logger LOGGER = Logger.getLogger(SplitRepeatmallTestSuite.class.getName());
+ //P4ForMergeGraph/bin/read
+ public static final String PreFix = "data/SplitRepeat"; //"graphbuildresult";
+ public static final String[] TestDir = { PreFix + File.separator
+ + "SimpleTest"};
+ private static final String ACTUAL_RESULT_DIR = "data/actual/splitrepeat";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+ private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+ private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
+ private static final String PATH_TO_ONLY = "src/test/resources/only_splitrepeat.txt";
+
+ public static final String HDFS_INPUTPATH = "/PathTestSet";
+
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
+
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
+
+ public void setUp() throws Exception {
+ ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+ ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+ cleanupStores();
+ PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
+ LOGGER.info("Hyracks mini-cluster started");
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
+ }
+
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
+
+ for (String testDir : TestDir) {
+ File src = new File(testDir);
+ Path dest = new Path(HDFS_INPUTPATH + File.separator + src.getName());
+ dfs.mkdirs(dest);
+ //src.listFiles()
+ //src.listFiles((FilenameFilter)(new WildcardFileFilter("part*")))
+ for (File f : src.listFiles()) {
+ dfs.copyFromLocalFile(new Path(f.getAbsolutePath()), dest);
+ }
+ }
+
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
+
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ /**
+ * cleanup hdfs cluster
+ */
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+ public void tearDown() throws Exception {
+ PregelixHyracksIntegrationUtil.deinit();
+ LOGGER.info("Hyracks mini-cluster shut down");
+ cleanupHDFS();
+ }
+
+ public static Test suite() throws Exception {
+ List<String> onlys = getFileList(PATH_TO_ONLY);
+ File testData = new File(PATH_TO_JOBS);
+ File[] queries = testData.listFiles();
+ SplitRepeatmallTestSuite testSuite = new SplitRepeatmallTestSuite();
+ testSuite.setUp();
+ boolean onlyEnabled = false;
+ FileSystem dfs = FileSystem.get(testSuite.conf);
+
+ if (onlys.size() > 0) {
+ onlyEnabled = true;
+ }
+
+ for (File qFile : queries) {
+ if (qFile.isFile()) {
+ if (onlyEnabled && !isInList(onlys, qFile.getName())) {
+ continue;
+ } else {
+ for (String testPathStr : TestDir) {
+ File testDir = new File(testPathStr);
+ String resultFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "bin" + File.separator + testDir.getName();
+ String textFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "txt" + File.separator + testDir.getName();
+ testSuite.addTest(new BasicSmallTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile
+ .getAbsolutePath().toString(), dfs,
+ HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName));
+ }
+ }
+ }
+ }
+ return testSuite;
+ }
+
+ /**
+ * Runs the tests and collects their result in a TestResult.
+ */
+ @Override
+ public void run(TestResult result) {
+ try {
+ int testCount = countTestCases();
+ for (int i = 0; i < testCount; i++) {
+ // cleanupStores();
+ Test each = this.testAt(i);
+ if (result.shouldStop())
+ break;
+ runTest(each, result);
+ }
+ tearDown();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ protected static List<String> getFileList(String ignorePath) throws FileNotFoundException, IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+ String s = null;
+ List<String> ignores = new ArrayList<String>();
+ while ((s = reader.readLine()) != null) {
+ ignores.add(s);
+ }
+ reader.close();
+ return ignores;
+ }
+
+ private static String jobExtToResExt(String fname) {
+ int dot = fname.lastIndexOf('.');
+ return fname.substring(0, dot);
+ }
+
+ private static boolean isInList(List<String> onlys, String name) {
+ for (String only : onlys)
+ if (name.indexOf(only) >= 0)
+ return true;
+ return false;
+ }
+
+}