finished test framework
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 332b23b..7f78b68 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -3,10 +3,15 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.Serializable;
import org.apache.hadoop.io.WritableComparable;
-public class NodeWritable implements WritableComparable<NodeWritable> {
+public class NodeWritable implements WritableComparable<NodeWritable>, Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
private PositionWritable nodeID;
private int countOfKmer;
private PositionListWritable incomingList;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index a3b5c84..a578543 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -3,13 +3,18 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Iterator;
import org.apache.hadoop.io.Writable;
import edu.uci.ics.genomix.data.Marshal;
-public class PositionListWritable implements Writable, Iterable<PositionWritable> {
+public class PositionListWritable implements Writable, Iterable<PositionWritable>, Serializable{
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
protected byte[] storage;
protected int offset;
protected int valueCount;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
index c7f24ec..c19af12 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
@@ -3,13 +3,18 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.Serializable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import edu.uci.ics.genomix.data.Marshal;
-public class PositionWritable implements WritableComparable<PositionWritable> {
+public class PositionWritable implements WritableComparable<PositionWritable> , Serializable{
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
protected byte[] storage;
protected int offset;
public static final int LENGTH = 5;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
index fcc61d5..5cf44ba 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
@@ -2,7 +2,7 @@
import edu.uci.ics.genomix.type.NodeWritable;
-public class NodeReference extends NodeWritable{
+public class NodeReference extends NodeWritable {
public NodeReference(int kmerSize) {
super(kmerSize);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
index 7b46fa5..76b10d0 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
@@ -4,4 +4,5 @@
import edu.uci.ics.hyracks.data.std.api.IValueReference;
public class PositionListReference extends PositionListWritable implements IValueReference {
+
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
index 4c19595..026758a 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
@@ -4,4 +4,5 @@
import edu.uci.ics.hyracks.data.std.api.IValueReference;
public class PositionReference extends PositionWritable implements IValueReference {
+
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index 1bc2137..e91dd66 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -24,27 +24,37 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
+
+ public static final int OutputKmerField = 0;
+ public static final int OutputPosition = 1;
+
private KmerBytesWritable kmer;
+ private PositionReference pos;
private boolean bReversed;
+ public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
+ null });
+
public ReadsKeyValueParserFactory(int k, boolean bGenerateReversed) {
bReversed = bGenerateReversed;
kmer = new KmerBytesWritable(k);
+ pos = new PositionReference();
}
@Override
@@ -107,13 +117,13 @@
private void InsertToFrame(KmerBytesWritable kmer, int readID, int posInRead, IFrameWriter writer) {
try {
- if (posInRead > 127){
- throw new IllegalArgumentException ("Position id is beyond 127 at " + readID);
+ if (posInRead > 127) {
+ throw new IllegalArgumentException("Position id is beyond 127 at " + readID);
}
tupleBuilder.reset();
tupleBuilder.addField(kmer.getBytes(), 0, kmer.getLength());
- tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, readID);
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, (byte)posInRead);
+ pos.set(readID, (byte) posInRead);
+ tupleBuilder.addField(pos.getByteArray(), pos.getStartOffset(), pos.getLength());
if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
tupleBuilder.getSize())) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
index 0772eb3..0508ba7 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
@@ -26,6 +26,8 @@
import edu.uci.ics.genomix.hyracks.job.GenomixJob;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -40,6 +42,9 @@
private ConfFactory confFactory;
private final int kmerlength;
+ public static final int InputKmerField = 0;
+ public static final int InputPositionListField = 1;
+
public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
this.confFactory = new ConfFactory(conf);
this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
@@ -54,6 +59,7 @@
Writer writer = null;
KmerBytesWritable reEnterKey = new KmerBytesWritable(kmerlength);
+ PositionListWritable plist = new PositionListWritable();
/**
* assumption is that output never change source!
@@ -61,18 +67,18 @@
@Override
public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
try {
- byte[] kmer = tuple.getFieldData(0);
- int keyStart = tuple.getFieldStart(0);
- int keyLength = tuple.getFieldLength(0);
- if (reEnterKey.getLength() > keyLength){
+ if (reEnterKey.getLength() > tuple.getFieldLength(InputKmerField)) {
throw new IllegalArgumentException("Not enough kmer bytes");
}
+ reEnterKey.setNewReference(tuple.getFieldData(InputKmerField), tuple.getFieldStart(InputKmerField));
+ int countOfPos = tuple.getFieldLength(InputPositionListField);
+ if (countOfPos % PositionWritable.LENGTH != 0) {
+ throw new IllegalArgumentException("Invalid count of position byte");
+ }
+ plist.setNewReference(countOfPos, tuple.getFieldData(InputPositionListField),
+ tuple.getFieldStart(InputPositionListField));
- byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
- byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
-// reEnterCount.set(bitmap, count);
- reEnterKey.set(kmer, keyStart);
- writer.append(reEnterKey, null);
+ writer.append(reEnterKey, plist);
} catch (IOException e) {
throw new HyracksDataException(e);
}
@@ -82,7 +88,7 @@
public void open(DataOutput output) throws HyracksDataException {
try {
writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, KmerBytesWritable.class,
- null, CompressionType.NONE, null);
+ PositionListWritable.class, CompressionType.NONE, null);
} catch (IOException e) {
throw new HyracksDataException(e);
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
index eac0045..664a2d2 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
@@ -19,6 +19,8 @@
import java.io.IOException;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -32,21 +34,31 @@
*/
private static final long serialVersionUID = 1L;
private KmerBytesWritable kmer;
+ private PositionListWritable plist;
public KMerTextWriterFactory(int k) {
kmer = new KmerBytesWritable(k);
+ plist = new PositionListWritable();
}
public class TupleWriter implements ITupleWriter {
@Override
public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
try {
- kmer.set(tuple.getFieldData(0), tuple.getFieldStart(0));
+ if (kmer.getLength() > tuple.getFieldLength(KMerSequenceWriterFactory.InputKmerField)) {
+ throw new IllegalArgumentException("Not enough kmer bytes");
+ }
+ kmer.setNewReference(tuple.getFieldData(KMerSequenceWriterFactory.InputKmerField), tuple.getFieldStart(KMerSequenceWriterFactory.InputKmerField));
+ int countOfPos = tuple.getFieldLength(KMerSequenceWriterFactory.InputPositionListField);
+ if (countOfPos % PositionWritable.LENGTH != 0) {
+ throw new IllegalArgumentException("Invalid count of position byte");
+ }
+ plist.setNewReference(countOfPos, tuple.getFieldData(KMerSequenceWriterFactory.InputPositionListField),
+ tuple.getFieldStart(KMerSequenceWriterFactory.InputPositionListField));
+
output.write(kmer.toString().getBytes());
output.writeByte('\t');
-// output.write(GeneCode.getSymbolFromBitMap(tuple.getFieldData(1)[tuple.getFieldStart(1)]).getBytes());
- output.writeByte('\t');
- output.write(String.valueOf((int) tuple.getFieldData(2)[tuple.getFieldStart(2)]).getBytes());
+ output.write(plist.toString().getBytes());
output.writeByte('\n');
} catch (IOException e) {
throw new HyracksDataException(e);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/MapperKmerToReadIDTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/MapperKmerToReadIDTextWriterFactory.java
new file mode 100644
index 0000000..32e7b4d
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/MapperKmerToReadIDTextWriterFactory.java
@@ -0,0 +1,25 @@
+package edu.uci.ics.genomix.hyracks.dataflow.io;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class MapperKmerToReadIDTextWriterFactory implements ITupleWriterFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public MapperKmerToReadIDTextWriterFactory(int kmerSize) {
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
index 272a21c..7de3b54 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
@@ -21,6 +21,7 @@
import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+@SuppressWarnings("deprecation")
public class NodeSequenceWriterFactory implements ITupleWriterFactory {
/**
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/ReadIDAggregationTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/ReadIDAggregationTextWriterFactory.java
new file mode 100644
index 0000000..28b8484
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/ReadIDAggregationTextWriterFactory.java
@@ -0,0 +1,25 @@
+package edu.uci.ics.genomix.hyracks.dataflow.io;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class ReadIDAggregationTextWriterFactory implements ITupleWriterFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public ReadIDAggregationTextWriterFactory(int kmerSize) {
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
index d958205..68a83ac 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
@@ -27,6 +27,10 @@
import edu.uci.ics.genomix.hyracks.job.GenomixJob;
import edu.uci.ics.genomix.hyracks.job.JobGen;
import edu.uci.ics.genomix.hyracks.job.JobGenBrujinGraph;
+import edu.uci.ics.genomix.hyracks.job.JobGenCheckReader;
+import edu.uci.ics.genomix.hyracks.job.JobGenCreateKmerInfo;
+import edu.uci.ics.genomix.hyracks.job.JobGenGroupbyReadID;
+import edu.uci.ics.genomix.hyracks.job.JobGenMapKmerToRead;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
@@ -38,9 +42,11 @@
public class Driver {
public static enum Plan {
+ CHECK_KMERREADER,
+ OUTPUT_KMERHASHTABLE,
+ OUTPUT_MAP_KMER_TO_READ,
+ OUTPUT_GROUPBY_READID,
BUILD_DEBRUJIN_GRAPH,
- GRAPH_CLEANNING,
- CONTIGS_GENERATION,
}
private static final String IS_PROFILING = "genomix.driver.profiling";
@@ -91,10 +97,18 @@
default:
jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
break;
+ case OUTPUT_KMERHASHTABLE:
+ jobGen = new JobGenCreateKmerInfo(job, scheduler, ncMap, numPartitionPerMachine);
+ case OUTPUT_MAP_KMER_TO_READ:
+ jobGen = new JobGenMapKmerToRead(job,scheduler, ncMap, numPartitionPerMachine);
+ case OUTPUT_GROUPBY_READID:
+ jobGen = new JobGenGroupbyReadID(job, scheduler, ncMap, numPartitionPerMachine);
+ case CHECK_KMERREADER:
+ jobGen = new JobGenCheckReader(job, scheduler, ncMap, numPartitionPerMachine);
}
start = System.currentTimeMillis();
- runCreate(jobGen);
+ run(jobGen);
end = System.currentTimeMillis();
time = end - start;
LOG.info("result writing finished " + time + "ms");
@@ -104,7 +118,7 @@
}
}
- private void runCreate(JobGen jobGen) throws Exception {
+ private void run(JobGen jobGen) throws Exception {
try {
JobSpecification createJob = jobGen.generateJob();
execute(createJob);
@@ -134,6 +148,7 @@
boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
// FileInputFormat.setInputPaths(job, otherArgs[2]);
{
+ @SuppressWarnings("deprecation")
Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
jobConf.set("mapred.input.dir", path.toString());
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
index 3563f93..d17d9ef 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
@@ -15,26 +15,31 @@
package edu.uci.ics.genomix.hyracks.job;
+import java.io.Serializable;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
-public abstract class JobGen {
+public abstract class JobGen implements Serializable{
- protected final Configuration conf;
- protected final GenomixJob genomixJob;
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ protected final ConfFactory confFactory;
protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
- public JobGen(GenomixJob job) {
- this.conf = job;
- this.genomixJob = job;
+ public JobGen(GenomixJob job) throws HyracksDataException {
+ this.confFactory = new ConfFactory(job);
this.initJobConfiguration();
}
- protected abstract void initJobConfiguration();
+ protected abstract void initJobConfiguration()throws HyracksDataException ;
public abstract JobSpecification generateJob() throws HyracksException;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index 117b7e0..4adddb6 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -19,6 +19,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -66,11 +67,17 @@
import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
public class JobGenBrujinGraph extends JobGen {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
public enum GroupbyType {
EXTERNAL,
PRECLUSTER,
@@ -82,26 +89,26 @@
BINARY,
}
- JobConf job;
- private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
- private Scheduler scheduler;
- private String[] ncNodeNames;
+ protected ConfFactory hadoopJobConfFactory;
+ protected static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+ protected Scheduler scheduler;
+ protected String[] ncNodeNames;
- private int readLength;
- private int kmerSize;
- private int frameLimits;
- private int frameSize;
- private int tableSize;
- private GroupbyType groupbyType;
- private OutputFormat outputFormat;
- private boolean bGenerateReversedKmer;
+ protected int readLength;
+ protected int kmerSize;
+ protected int frameLimits;
+ protected int frameSize;
+ protected int tableSize;
+ protected GroupbyType groupbyType;
+ protected OutputFormat outputFormat;
+ protected boolean bGenerateReversedKmer;
- private void logDebug(String status) {
+ protected void logDebug(String status) {
LOG.debug(status + " nc nodes:" + ncNodeNames.length);
}
public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
- int numPartitionPerMachine) {
+ int numPartitionPerMachine) throws HyracksDataException {
super(job);
this.scheduler = scheduler;
String[] nodes = new String[ncMap.size()];
@@ -150,45 +157,41 @@
obj[2] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, merger,
finalRec);
+ jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
break;
}
return obj;
}
- public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec, RecordDescriptor outRec)
- throws HyracksDataException {
+ public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
try {
- InputSplit[] splits = job.getInputFormat().getSplits(job, ncNodeNames.length);
+ InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
+ .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
LOG.info("HDFS read into " + splits.length + " splits");
String[] readSchedule = scheduler.getLocationConstraints(splits);
- return new HDFSReadOperatorDescriptor(jobSpec, outRec, job, splits, readSchedule,
- new ReadsKeyValueParserFactory(kmerSize, bGenerateReversedKmer));
+ return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
+ hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(kmerSize,
+ bGenerateReversedKmer));
} catch (Exception e) {
throw new HyracksDataException(e);
}
}
- private void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
+ public static void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
jobSpec.connect(conn, preOp, 0, nextOp, 0);
}
- @Override
- public JobSpecification generateJob() throws HyracksException {
+ public AbstractOperatorDescriptor generateGroupbyKmerJob(JobSpecification jobSpec,
+ AbstractOperatorDescriptor readOperator) throws HyracksDataException {
- JobSpecification jobSpec = new JobSpecification();
- RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null });
RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
jobSpec.setFrameSize(frameSize);
- // File input
- logDebug("ReadKmer Operator");
- HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec, readKmerOutputRec);
-
Object[] objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateKmerAggregateFactory(),
new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
@@ -201,21 +204,27 @@
IConnectorDescriptor kmerConnPartition = (IConnectorDescriptor) objs[1];
AbstractOperatorDescriptor kmerCrossAggregator = (AbstractOperatorDescriptor) objs[2];
connectOperators(jobSpec, kmerLocalAggregator, ncNodeNames, kmerCrossAggregator, ncNodeNames, kmerConnPartition);
+ return kmerCrossAggregator;
+ }
- logDebug("Map Kmer to Read Operator");
+ public AbstractOperatorDescriptor generateMapperFromKmerToRead(JobSpecification jobSpec,
+ AbstractOperatorDescriptor kmerCrossAggregator) {
//Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,{OtherPosition,...},Kmer)
RecordDescriptor readIDOutputRec = new RecordDescriptor(
new ISerializerDeserializer[] { null, null, null, null });
AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec, readIDOutputRec);
connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapKmerToRead, ncNodeNames,
new OneToOneConnectorDescriptor(jobSpec));
+ return mapKmerToRead;
+ }
- logDebug("Group by Read Operator");
+ public AbstractOperatorDescriptor generateGroupbyReadJob(JobSpecification jobSpec,
+ AbstractOperatorDescriptor mapKmerToRead) throws HyracksDataException {
// (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
RecordDescriptor readIDCombineRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
RecordDescriptor readIDFinalRec = new RecordDescriptor(
new ISerializerDeserializer[MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
- objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateReadIDAggregateFactory(),
+ Object[] objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateReadIDAggregateFactory(),
new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(),
new ReadIDNormarlizedComputeFactory(), IntegerPointable.FACTORY, readIDCombineRec, readIDFinalRec);
AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
@@ -226,48 +235,96 @@
IConnectorDescriptor readconn = (IConnectorDescriptor) objs[1];
AbstractOperatorDescriptor readCrossAggregator = (AbstractOperatorDescriptor) objs[2];
connectOperators(jobSpec, readLocalAggregator, ncNodeNames, readCrossAggregator, ncNodeNames, readconn);
+ return readCrossAggregator;
+ }
- logDebug("Map ReadInfo to Node");
+ public AbstractOperatorDescriptor generateMapperFromReadToNode(JobSpecification jobSpec,
+ AbstractOperatorDescriptor readCrossAggregator) {
//Map (ReadID, [(Poslist,Kmer) ... ]) to (Node, IncomingList, OutgoingList, Kmer)
RecordDescriptor nodeRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null, null });
AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec, nodeRec, kmerSize);
connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
new OneToOneConnectorDescriptor(jobSpec));
+ return mapEachReadToNode;
+ }
+ public AbstractOperatorDescriptor generateRootByWriteKmerGroupbyResult(JobSpecification jobSpec,
+ AbstractOperatorDescriptor kmerCrossAggregator) throws HyracksException {
// Output Kmer
ITupleWriterFactory kmerWriter = null;
- ITupleWriterFactory nodeWriter = null;
switch (outputFormat) {
case TEXT:
kmerWriter = new KMerTextWriterFactory(kmerSize);
+ break;
+ case BINARY:
+ default:
+ kmerWriter = new KMerSequenceWriterFactory(hadoopJobConfFactory.getConf());
+ break;
+ }
+ logDebug("WriteOperator");
+ HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+ hadoopJobConfFactory.getConf(), kmerWriter);
+ connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ jobSpec.addRoot(writeKmerOperator);
+ return writeKmerOperator;
+ }
+
+ public AbstractOperatorDescriptor generateRootByWriteNodeResult(JobSpecification jobSpec,
+ AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
+ ITupleWriterFactory nodeWriter = null;
+ switch (outputFormat) {
+ case TEXT:
nodeWriter = new NodeTextWriterFactory(kmerSize);
break;
case BINARY:
default:
- kmerWriter = new KMerSequenceWriterFactory(job);
- nodeWriter = new NodeSequenceWriterFactory(job);
+ nodeWriter = new NodeSequenceWriterFactory(hadoopJobConfFactory.getConf());
break;
}
logDebug("WriteOperator");
- HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, kmerWriter);
- connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
- new OneToOneConnectorDescriptor(jobSpec));
- jobSpec.addRoot(writeKmerOperator);
-
// Output Node
- HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, nodeWriter);
+ HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+ hadoopJobConfFactory.getConf(), nodeWriter);
connectOperators(jobSpec, mapEachReadToNode, ncNodeNames, writeNodeOperator, ncNodeNames,
new OneToOneConnectorDescriptor(jobSpec));
jobSpec.addRoot(writeNodeOperator);
-
- if (groupbyType == GroupbyType.PRECLUSTER) {
- jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- }
- return jobSpec;
+ return writeNodeOperator;
}
@Override
- protected void initJobConfiguration() {
+ public JobSpecification generateJob() throws HyracksException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ logDebug("ReadKmer Operator");
+
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+ logDebug("Group by Kmer");
+ AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+ logDebug("Write kmer to result");
+ generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+
+ logDebug("Map Kmer to Read Operator");
+ lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
+
+ logDebug("Group by Read Operator");
+ lastOperator = generateGroupbyReadJob(jobSpec, lastOperator);
+
+ logDebug("Map ReadInfo to Node");
+ lastOperator = generateMapperFromReadToNode(jobSpec, lastOperator);
+
+ logDebug("Write node to result");
+ generateRootByWriteNodeResult(jobSpec, lastOperator);
+
+ return jobSpec;
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ protected void initJobConfiguration() throws HyracksDataException {
+ Configuration conf = confFactory.getConf();
readLength = conf.getInt(GenomixJob.READ_LENGTH, GenomixJob.DEFAULT_READLEN);
kmerSize = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
if (kmerSize % 2 == 0) {
@@ -295,7 +352,7 @@
} else {
outputFormat = OutputFormat.BINARY;
}
- job = new JobConf(conf);
+ hadoopJobConfFactory = new ConfFactory(new JobConf(conf));
LOG.info("Genomix Graph Build Configuration");
LOG.info("Kmer:" + kmerSize);
LOG.info("Groupby type:" + type);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
new file mode 100644
index 0000000..a18c280
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
@@ -0,0 +1,107 @@
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenCheckReader extends JobGenBrujinGraph {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public JobGenCheckReader(GenomixJob job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) throws HyracksDataException {
+ super(job, scheduler, ncMap, numPartitionPerMachine);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ logDebug("ReadKmer Operator");
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+ logDebug("Write kmer to result");
+ generateRootByWriteKmerReader(jobSpec, readOperator);
+
+ return jobSpec;
+ }
+
+ public AbstractSingleActivityOperatorDescriptor generateRootByWriteKmerReader(JobSpecification jobSpec,
+ HDFSReadOperatorDescriptor readOperator) throws HyracksException {
+ // Output Kmer
+ HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec, hadoopJobConfFactory.getConf(), new ITupleWriterFactory(){
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+ private PositionWritable pos = new PositionWritable();
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new ITupleWriter(){
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ try {
+ if (kmer.getLength() > tuple.getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
+ throw new IllegalArgumentException("Not enough kmer bytes");
+ }
+ kmer.setNewReference(tuple.getFieldData(ReadsKeyValueParserFactory.OutputKmerField), tuple.getFieldStart(ReadsKeyValueParserFactory.OutputKmerField));
+ pos.setNewReference(tuple.getFieldData(ReadsKeyValueParserFactory.OutputPosition),
+ tuple.getFieldStart(ReadsKeyValueParserFactory.OutputPosition));
+
+ output.write(kmer.toString().getBytes());
+ output.writeByte('\t');
+ output.write(pos.toString().getBytes());
+ output.writeByte('\n');
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ };
+ }
+
+ });
+ connectOperators(jobSpec, readOperator, ncNodeNames, writeKmerOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ jobSpec.addRoot(writeKmerOperator);
+ return writeKmerOperator;
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java
new file mode 100644
index 0000000..2c1a70a
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenCreateKmerInfo extends JobGenBrujinGraph {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public JobGenCreateKmerInfo(GenomixJob job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) throws HyracksDataException {
+ super(job, scheduler, ncMap, numPartitionPerMachine);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ logDebug("ReadKmer Operator");
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+ logDebug("Group by Kmer");
+ AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+ logDebug("Write kmer to result");
+ generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+
+ return jobSpec;
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
new file mode 100644
index 0000000..7649100
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
@@ -0,0 +1,68 @@
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.Map;
+
+import edu.uci.ics.genomix.hyracks.dataflow.io.ReadIDAggregationTextWriterFactory;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenGroupbyReadID extends JobGenBrujinGraph {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public JobGenGroupbyReadID(GenomixJob job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) throws HyracksDataException {
+ super(job, scheduler, ncMap, numPartitionPerMachine);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ logDebug("ReadKmer Operator");
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+ logDebug("Group by Kmer");
+ AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+ logDebug("Write kmer to result");
+ generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+
+ logDebug("Map Kmer to Read Operator");
+ lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
+
+ logDebug("Group by Read Operator");
+ lastOperator = generateGroupbyReadJob(jobSpec, lastOperator);
+
+ logDebug("Map ReadInfo to Node");
+ lastOperator = generateMapperFromReadToNode(jobSpec, lastOperator);
+
+ logDebug("Write node to result");
+ generateRootByWriteReadIDAggregationResult(jobSpec, lastOperator);
+
+ return jobSpec;
+ }
+
+ public AbstractOperatorDescriptor generateRootByWriteReadIDAggregationResult(JobSpecification jobSpec,
+ AbstractOperatorDescriptor readCrossAggregator) throws HyracksException {
+ ITupleWriterFactory readWriter = new ReadIDAggregationTextWriterFactory(kmerSize);
+ HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec, hadoopJobConfFactory.getConf(), readWriter);
+ connectOperators(jobSpec, readCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ jobSpec.addRoot(writeKmerOperator);
+ return writeKmerOperator;
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
new file mode 100644
index 0000000..76499e6
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
@@ -0,0 +1,57 @@
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.Map;
+
+import edu.uci.ics.genomix.hyracks.dataflow.io.MapperKmerToReadIDTextWriterFactory;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenMapKmerToRead extends JobGenBrujinGraph {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public JobGenMapKmerToRead(GenomixJob job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) throws HyracksDataException {
+ super(job, scheduler, ncMap, numPartitionPerMachine);
+ // TODO Auto-generated constructor stub
+ }
+
+ public AbstractOperatorDescriptor generateRootByWriteMapperFromKmerToReadID(JobSpecification jobSpec, AbstractOperatorDescriptor mapper) throws HyracksException{
+ // Output Kmer
+ ITupleWriterFactory kmerWriter = new MapperKmerToReadIDTextWriterFactory(kmerSize);
+ HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec, hadoopJobConfFactory.getConf(), kmerWriter);
+ connectOperators(jobSpec, mapper, ncNodeNames, writeKmerOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ jobSpec.addRoot(writeKmerOperator);
+ return writeKmerOperator;
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ logDebug("ReadKmer Operator");
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+ logDebug("Group by Kmer");
+ AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+ logDebug("Map Kmer to Read Operator");
+ lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
+
+ generateRootByWriteMapperFromKmerToReadID(jobSpec, lastOperator);
+
+ return jobSpec;
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
deleted file mode 100644
index adce3f6..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.hyracks.job;
-
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-
-import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.hyracks.util.ByteComparatorFactory;
-import edu.uci.ics.genomix.hyracks.util.StatCountAggregateFactory;
-import edu.uci.ics.genomix.hyracks.util.StatSumAggregateFactory;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
-
-public class JobGenStatistic extends JobGen {
- private int kmers;
- private JobConf hadoopjob;
- private RecordDescriptor readOutputRec;
- private String[] ncNodeNames;
- private Scheduler scheduler;
- private RecordDescriptor combineOutputRec;
-
- public JobGenStatistic(GenomixJob job) {
- super(job);
- // TODO Auto-generated constructor stub
- }
-
- @Override
- protected void initJobConfiguration() {
- // TODO Auto-generated method stub
- kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
- hadoopjob = new JobConf(conf);
- hadoopjob.setInputFormat(SequenceFileInputFormat.class);
- }
-
- @Override
- public JobSpecification generateJob() throws HyracksException {
- int[] degreeFields = { 0, 1 }; // indegree, outdegree
- int[] countFields = { 2 };
- JobSpecification jobSpec = new JobSpecification();
- /** specify the record fields after read */
- readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
- ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
- combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
- ByteSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
- /** the reader */
- HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
-
- /** the combiner aggregator */
- AbstractOperatorDescriptor degreeLocal = connectLocalAggregateByField(jobSpec, degreeFields, readOperator);
- AbstractOperatorDescriptor countLocal = connectLocalAggregateByField(jobSpec, countFields, readOperator);
-
- /** the final aggregator */
- AbstractOperatorDescriptor degreeMerger = connectFinalAggregateByField(jobSpec, degreeFields, degreeLocal);
- AbstractOperatorDescriptor countMerger = connectFinalAggregateByField(jobSpec, countFields, countLocal);
-
- /** writer */
- AbstractFileWriteOperatorDescriptor writeDegree = connectWriter(jobSpec, degreeFields, degreeMerger);
- AbstractFileWriteOperatorDescriptor writeCount = connectWriter(jobSpec, countFields, countMerger);
- jobSpec.addRoot(writeDegree);
- jobSpec.addRoot(writeCount);
- return jobSpec;
- }
-
- private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
- try {
-
- InputSplit[] splits = hadoopjob.getInputFormat().getSplits(hadoopjob, ncNodeNames.length);
-
- String[] readSchedule = scheduler.getLocationConstraints(splits);
- return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, hadoopjob, splits, readSchedule,
- null); //new StatReadsKeyValueParserFactory());
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
- IAggregatorDescriptorFactory aggeragater) {
- return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, GenomixJob.DEFAULT_FRAME_LIMIT,
- new IBinaryComparatorFactory[] { new ByteComparatorFactory(), new ByteComparatorFactory() }, null,
- aggeragater, new StatSumAggregateFactory(), combineOutputRec, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- new ByteComparatorFactory(), new ByteComparatorFactory() }),
- GenomixJob.DEFAULT_TABLE_SIZE), true);
- }
-
- private AbstractOperatorDescriptor connectLocalAggregateByField(JobSpecification jobSpec, int[] fields,
- HDFSReadOperatorDescriptor readOperator) {
- AbstractOperatorDescriptor localAggregator = newExternalGroupby(jobSpec, fields,
- new StatCountAggregateFactory());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, localAggregator, ncNodeNames);
- IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
- jobSpec.connect(readfileConn, readOperator, 0, localAggregator, 0);
- return localAggregator;
- }
-
- private AbstractOperatorDescriptor connectFinalAggregateByField(JobSpecification jobSpec, int[] fields,
- AbstractOperatorDescriptor localAggregator) {
- AbstractOperatorDescriptor finalAggregator = newExternalGroupby(jobSpec, fields, new StatSumAggregateFactory());
- // only need one reducer
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, finalAggregator, ncNodeNames[fields[0]
- % ncNodeNames.length]);
- IConnectorDescriptor mergeConn = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
- new ITuplePartitionComputerFactory() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public ITuplePartitionComputer createPartitioner() {
- return new ITuplePartitionComputer() {
- @Override
- public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts)
- throws HyracksDataException {
- return 0;
- }
- };
- }
- }, fields, new IBinaryComparatorFactory[] { new ByteComparatorFactory() });
- jobSpec.connect(mergeConn, localAggregator, 0, finalAggregator, 0);
- return finalAggregator;
- }
-
- private AbstractFileWriteOperatorDescriptor connectWriter(JobSpecification jobSpec, int[] fields,
- AbstractOperatorDescriptor finalAggregator) {
- LineFileWriteOperatorDescriptor writeOperator = new LineFileWriteOperatorDescriptor(jobSpec, null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames[fields[0]
- % ncNodeNames.length]);
-
- IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
- jobSpec.connect(printConn, finalAggregator, 0, writeOperator, 0);
- return writeOperator;
- }
-}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java
similarity index 99%
rename from genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunTest.java
rename to genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java
index 4a7c8aa..ae2838d 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java
@@ -43,7 +43,7 @@
import edu.uci.ics.genomix.hyracks.job.GenomixJob;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-public class JobRunTest {
+public class JobKmerGroupbyTest {
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index d41bcbe..fca0607 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -7,6 +7,8 @@
import java.io.FileWriter;
import java.io.IOException;
+import junit.framework.Assert;
+
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -19,23 +21,28 @@
import org.apache.hadoop.mapred.JobConf;
import org.junit.After;
import org.junit.Before;
+import org.junit.Test;
import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
import edu.uci.ics.genomix.hyracks.job.GenomixJob;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
public class JobRunStepByStepTest {
+ private static final int KmerSize = 5;
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_PATH = "src/test/resources/data/webmap/text.txt";
+ private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/text.txt";
private static final String HDFS_INPUT_PATH = "/webmap";
private static final String HDFS_OUTPUT_PATH = "/webmap_result";
+
+ private static final String EXPECTED_DIR = "src/test/resources/expected/";
+ private static final String EXPECTED_READER_RESULT = EXPECTED_DIR + "result_after_initial_read";
private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
- private static final String EXPECTED_PATH = "src/test/resources/expected/result2";
- private static final String EXPECTED_REVERSE_PATH = "src/test/resources/expected/result_reverse";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
private MiniDFSCluster dfsCluster;
@@ -44,6 +51,18 @@
private int numPartitionPerMachine = 1;
private Driver driver;
+
+ @Test
+ public void TestAll() throws Exception {
+ TestReader();
+ }
+
+ public void TestReader() throws Exception{
+ conf.set(GenomixJob.GROUPBY_TYPE, "external");
+ System.err.println("Testing ExternalGroupBy");
+ driver.runJob(new GenomixJob(conf), Plan.CHECK_KMERREADER, true);
+ Assert.assertEquals(true, checkResults(EXPECTED_READER_RESULT));
+ }
@Before
public void setUp() throws Exception {
@@ -56,7 +75,7 @@
FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
- conf.setInt(GenomixJob.KMER_LENGTH, 5);
+ conf.setInt(GenomixJob.KMER_LENGTH, KmerSize);
driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
}
@@ -77,7 +96,7 @@
System.setProperty("hadoop.log.dir", "logs");
dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
FileSystem dfs = FileSystem.get(conf);
- Path src = new Path(DATA_PATH);
+ Path src = new Path(DATA_INPUT_PATH);
Path dest = new Path(HDFS_INPUT_PATH);
dfs.mkdirs(dest);
//dfs.mkdirs(result);
diff --git a/genomix/genomix-hyracks/src/test/resources/data/0/text.txt b/genomix/genomix-hyracks/src/test/resources/data/0/text.txt
deleted file mode 100755
index f63a141..0000000
--- a/genomix/genomix-hyracks/src/test/resources/data/0/text.txt
+++ /dev/null
@@ -1,4 +0,0 @@
-@625E1AAXX100810:1:100:10000:10271/1
-AATAGAAG
-+
-EDBDB?BEEEDGGEGGGDGGGA>DG@GGD;GD@DG@F?<B<BFFD?
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
index 9874fc2..08f0f95 100755
--- a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
@@ -1,8 +1,5 @@
-@625E1AAXX100810:1:100:10000:10271/1
-AATAGAAG
-AATAGAAG
-+
-EDBDB?BEEEDGGEGGGDGGGA>DG@GGD;GD@DG@F?<B<BFFD?
-AATAGAAG
-AATAGAAG
-AATAGAAG
\ No newline at end of file
+1 AATAGAAG
+2 AATAGAAG
+3 AATAGAAG
+4 AATAGAAG
+5 AATAGAAG
\ No newline at end of file