new graph job built
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java
index 6d6e1e6..c903645 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.hyracks.newgraph.driver;
import java.net.URL;
@@ -6,11 +21,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.GenericOptionsParser;
-import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.hyracks.job.JobGen;
-import edu.uci.ics.genomix.hyracks.newgraph.job.JobGenCheckReader;
+import edu.uci.ics.genomix.hyracks.newgraph.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.newgraph.job.JobGen;
+import edu.uci.ics.genomix.hyracks.newgraph.job.JobGenBrujinGraph;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
@@ -23,19 +39,20 @@
public class Driver {
public static enum Plan {
- CHECK_KMERREADER,
+ BUILD_DEBRUJIN_GRAPH,
}
-
+
private static final String IS_PROFILING = "genomix.driver.profiling";
private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
private static final Log LOG = LogFactory.getLog(Driver.class);
private JobGen jobGen;
private boolean profiling;
+
private int numPartitionPerMachine;
private IHyracksClientConnection hcc;
private Scheduler scheduler;
-
+
public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException {
try {
hcc = new HyracksConnection(ipAddress, port);
@@ -45,9 +62,9 @@
}
this.numPartitionPerMachine = numPartitionPerMachine;
}
-
+
public void runJob(GenomixJobConf job) throws HyracksException {
- runJob(job, Plan.CHECK_KMERREADER, false);
+ runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
}
public void runJob(GenomixJobConf job, Plan planChoice, boolean profiling) throws HyracksException {
@@ -69,9 +86,9 @@
Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
switch (planChoice) {
- case CHECK_KMERREADER:
+ case BUILD_DEBRUJIN_GRAPH:
default:
- jobGen = new JobGenCheckReader(job, scheduler, ncMap, numPartitionPerMachine);
+ jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
break;
}
@@ -85,7 +102,7 @@
throw new HyracksException(e);
}
}
-
+
private void run(JobGen jobGen) throws Exception {
try {
JobSpecification createJob = jobGen.generateJob();
@@ -95,11 +112,38 @@
throw e;
}
}
-
+
private void execute(JobSpecification job) throws Exception {
job.setUseConnectorPolicyForScheduling(false);
JobId jobId = hcc
.startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.waitForCompletion(jobId);
}
+
+ public static void main(String[] args) throws Exception {
+ GenomixJobConf jobConf = new GenomixJobConf();
+ String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
+ if (otherArgs.length < 4) {
+ System.err.println("Need <serverIP> <port> <input> <output>");
+ System.exit(-1);
+ }
+ String ipAddress = otherArgs[0];
+ int port = Integer.parseInt(otherArgs[1]);
+ int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
+ boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
+ // FileInputFormat.setInputPaths(job, otherArgs[2]);
+ {
+ @SuppressWarnings("deprecation")
+ Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
+ jobConf.set("mapred.input.dir", path.toString());
+
+ @SuppressWarnings("deprecation")
+ Path outputDir = new Path(jobConf.getWorkingDirectory(), otherArgs[3]);
+ jobConf.set("mapred.output.dir", outputDir.toString());
+ }
+ // FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
+ // FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
+ Driver driver = new Driver(ipAddress, port, numOfDuplicate);
+ driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
index f35d0ef..369f874 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
@@ -194,28 +194,6 @@
return kmerCrossAggregator;
}
-
- public AbstractOperatorDescriptor generateKmerWritorOperator(JobSpecification jobSpec,
- AbstractOperatorDescriptor kmerCrossAggregator) throws HyracksException {
- // Output Kmer
- ITupleWriterFactory kmerWriter = null;
- switch (outputFormat) {
- case TEXT:
- kmerWriter = new KMerTextWriterFactory(kmerSize);
- break;
- case BINARY:
- default:
- kmerWriter = new KMerSequenceWriterFactory(hadoopJobConfFactory.getConf());
- break;
- }
- logDebug("WriteOperator");
- HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
- hadoopJobConfFactory.getConf(), kmerWriter);
- connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
- new OneToOneConnectorDescriptor(jobSpec));
- return writeKmerOperator;
- }
-
public AbstractOperatorDescriptor generateNodeWriterOpertator(JobSpecification jobSpec,
AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
ITupleWriterFactory nodeWriter = null;
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
index dfae011..6cc4475 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
@@ -24,11 +24,11 @@
import org.junit.Before;
import org.junit.Test;
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.newgraph.job.GenomixJobConf;
import edu.uci.ics.genomix.hyracks.newgraph.driver.Driver;
import edu.uci.ics.genomix.hyracks.newgraph.driver.Driver.Plan;
import edu.uci.ics.genomix.hyracks.test.TestUtils;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
+//import edu.uci.ics.genomix.oldtype.NodeWritable;
@SuppressWarnings("deprecation")
public class JobRun {
@@ -43,11 +43,6 @@
private static final String EXPECTED_DIR = "src/test/resources/expected/";
private static final String EXPECTED_READER_RESULT = EXPECTED_DIR + "result_after_initial_read";
-// private static final String EXPECTED_OUPUT_KMER = EXPECTED_DIR + "result_after_kmerAggregate";
-// private static final String EXPECTED_KMER_TO_READID = EXPECTED_DIR + "result_after_kmer2readId";
-// private static final String EXPECTED_GROUPBYREADID = EXPECTED_DIR + "result_after_readIDAggreage";
-// private static final String EXPECTED_OUPUT_NODE = EXPECTED_DIR + "result_after_generateNode";
-// private static final String EXPECTED_UNMERGED = EXPECTED_DIR + "result_unmerged";
private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
@@ -62,14 +57,14 @@
@Test
public void TestAll() throws Exception {
- TestReader();
+ TestGroupby();
}
- public void TestReader() throws Exception {
- cleanUpReEntry();
+ public void TestGroupby() throws Exception {
conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
- driver.runJob(new GenomixJobConf(conf), Plan.CHECK_KMERREADER, true);
- Assert.assertEquals(true, checkResults(EXPECTED_READER_RESULT, null));
+ cleanUpReEntry();
+ conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+ driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
}
@Before
@@ -129,7 +124,7 @@
}
}
- private boolean checkResults(String expectedPath, int[] poslistField) throws Exception {
+/* private boolean checkResults(String expectedPath, int[] poslistField) throws Exception {
File dumped = null;
String format = conf.get(GenomixJobConf.OUTPUT_FORMAT);
if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(format)) {
@@ -177,7 +172,7 @@
TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
}
return true;
- }
+ }*/
@After
public void tearDown() throws Exception {