add the read checker for hyracks
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java
index c903645..5409f34 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java
@@ -27,6 +27,7 @@
import edu.uci.ics.genomix.hyracks.newgraph.job.GenomixJobConf;
import edu.uci.ics.genomix.hyracks.newgraph.job.JobGen;
import edu.uci.ics.genomix.hyracks.newgraph.job.JobGenBrujinGraph;
+import edu.uci.ics.genomix.hyracks.newgraph.job.JobGenCheckReader;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
@@ -40,6 +41,7 @@
public class Driver {
public static enum Plan {
BUILD_DEBRUJIN_GRAPH,
+ CHECK_KMERREADER,
}
private static final String IS_PROFILING = "genomix.driver.profiling";
@@ -90,6 +92,9 @@
default:
jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
break;
+ case CHECK_KMERREADER:
+ jobGen = new JobGenCheckReader(job, scheduler, ncMap, numPartitionPerMachine);
+ break;
}
start = System.currentTimeMillis();
@@ -115,8 +120,9 @@
private void execute(JobSpecification job) throws Exception {
job.setUseConnectorPolicyForScheduling(false);
- JobId jobId = hcc
- .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+ if(profiling)
+ EnumSet.of(JobFlag.PROFILE_RUNTIME);
+ JobId jobId = hcc.startJob(job, EnumSet.of(JobFlag.PROFILE_RUNTIME));
hcc.waitForCompletion(jobId);
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
index 1a95ac2..afc1cf7 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
@@ -209,13 +209,10 @@
logDebug("Group by Kmer");
AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
- // logDebug("Write kmer to result");
- // generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
-
logDebug("Write node to result");
lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);
- jobSpec.addRoot(lastOperator);
+ jobSpec.addRoot(readOperator);//what's this? why we need this? why I can't seet it in the JobGenCheckReader
return jobSpec;
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
new file mode 100644
index 0000000..f512f43
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.newgraph.job;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenCheckReader extends JobGenBrujinGraph {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public JobGenCheckReader(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) throws HyracksDataException {
+ super(job, scheduler, ncMap, numPartitionPerMachine);
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ logDebug("ReadKmer Operator");
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+ logDebug("Write kmer to result");
+ generateRootByWriteKmerReader(jobSpec, readOperator);
+
+ return jobSpec;
+ }
+
+ public AbstractSingleActivityOperatorDescriptor generateRootByWriteKmerReader(JobSpecification jobSpec,
+ HDFSReadOperatorDescriptor readOperator) throws HyracksException {
+
+ HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+ hadoopJobConfFactory.getConf(), new ITupleWriterFactory() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ KmerBytesWritable.setGlobalKmerLength(kmerSize);
+ return new ITupleWriter() {
+
+ private NodeWritable outputNode = new NodeWritable();
+ private KmerBytesWritable outputKmer = new KmerBytesWritable();
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+ }
+
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ try {
+ if (outputKmer.getLength() > tuple
+ .getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
+ throw new IllegalArgumentException("Not enough kmer bytes");
+ }
+ outputKmer.setAsReference(
+ tuple.getFieldData(ReadsKeyValueParserFactory.OutputKmerField),
+ tuple.getFieldStart(ReadsKeyValueParserFactory.OutputKmerField));
+ outputNode.setAsReference(
+ tuple.getFieldData(ReadsKeyValueParserFactory.OutputNodeField),
+ tuple.getFieldStart(ReadsKeyValueParserFactory.OutputNodeField));
+
+ output.write(outputKmer.toString().getBytes());
+ output.writeByte('\t');
+ output.write(outputNode.toString().getBytes());
+ output.writeByte('\n');
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+
+ }
+
+ };
+ }
+
+ });
+ connectOperators(jobSpec, readOperator, ncNodeNames, writeKmerOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ jobSpec.addRoot(writeKmerOperator);
+ return writeKmerOperator;
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
index bb0d131..3cd5bb9 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
@@ -52,7 +52,15 @@
@Test
public void TestAll() throws Exception {
- TestGroupby();
+ TestReader();
+// TestGroupby();
+ }
+
+ public void TestReader() throws Exception {
+ cleanUpReEntry();
+ conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+ driver.runJob(new GenomixJobConf(conf), Plan.CHECK_KMERREADER, true);
+ dumpResult();
}
public void TestGroupby() throws Exception {
@@ -60,7 +68,7 @@
cleanUpReEntry();
conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- dumpResult();
+// dumpResult();
}
@Before
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index d446f39..51a0d15 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -76,10 +76,10 @@
@Test
public void TestAll() throws Exception {
// TestReader();
- TestGroupbyKmer();
+// TestGroupbyKmer();
// TestMapKmerToRead();
// TestGroupByReadID();
-// TestEndToEnd();
+ TestEndToEnd();
// TestUnMergedNode();
}