build graph building job for hyracks
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
index 5b612dc..f027546 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
@@ -47,14 +47,6 @@
public static final int OutputKmerField = 0;
public static final int outputNodeIdListField = 1;
- public static final int OutputForwardForwardField = 2;
- public static final int OutputFFListCountField = 3;
- public static final int OutputForwardReverseField = 4;
- public static final int OutputFRListCountField = 5;
- public static final int OutputReverseForwardField = 6;
- public static final int OutputRFListCountField = 7;
- public static final int OutputReverseReverseField = 8;
- public static final int OutputRRListCountField = 9;
private final int readLength;
private final int kmerSize;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/KMerTextWriterFactory.java
new file mode 100644
index 0000000..f296fd4
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/KMerTextWriterFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.newgraph.io;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class KMerTextWriterFactory implements ITupleWriterFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ private final int kmerSize;
+
+ public KMerTextWriterFactory(int k) {
+ kmerSize = k;
+ }
+
+ public class TupleWriter implements ITupleWriter {
+ private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+ private PositionListWritable plist = new PositionListWritable();
+
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ try {
+ if (kmer.getLength() > tuple.getFieldLength(KMerSequenceWriterFactory.InputKmerField)) {
+ throw new IllegalArgumentException("Not enough kmer bytes");
+ }
+ kmer.setNewReference(tuple.getFieldData(KMerSequenceWriterFactory.InputKmerField),
+ tuple.getFieldStart(KMerSequenceWriterFactory.InputKmerField));
+ int countOfPos = tuple.getFieldLength(KMerSequenceWriterFactory.InputPositionListField)
+ / PositionWritable.LENGTH;
+ if (tuple.getFieldLength(KMerSequenceWriterFactory.InputPositionListField) % PositionWritable.LENGTH != 0) {
+ throw new IllegalArgumentException("Invalid count of position byte");
+ }
+ plist.setNewReference(countOfPos, tuple.getFieldData(KMerSequenceWriterFactory.InputPositionListField),
+ tuple.getFieldStart(KMerSequenceWriterFactory.InputPositionListField));
+
+ output.write(kmer.toString().getBytes());
+ output.writeByte('\t');
+ output.write(plist.toString().getBytes());
+ output.writeByte('\n');
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+ }
+ }
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new TupleWriter();
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
new file mode 100644
index 0000000..c579261
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.newgraph.io;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.oldtype.NodeWritable;
+import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class NodeTextWriterFactory implements ITupleWriterFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private final int initialKmerSize;
+
+ public NodeTextWriterFactory(int initialKmerSize) {
+ this.initialKmerSize = initialKmerSize;
+ }
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new ITupleWriter() {
+ NodeWritable node = new NodeWritable(initialKmerSize);
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ node.getNodeID().setNewReference(tuple.getFieldData(NodeSequenceWriterFactory.InputNodeIDField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputNodeIDField));
+ node.getFFList().setNewReference(
+ tuple.getFieldLength(NodeSequenceWriterFactory.InputFFField) / PositionWritable.LENGTH,
+ tuple.getFieldData(NodeSequenceWriterFactory.InputFFField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputFFField));
+ node.getFRList().setNewReference(
+ tuple.getFieldLength(NodeSequenceWriterFactory.InputFRField) / PositionWritable.LENGTH,
+ tuple.getFieldData(NodeSequenceWriterFactory.InputFRField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputFRField));
+ node.getRFList().setNewReference(
+ tuple.getFieldLength(NodeSequenceWriterFactory.InputRFField) / PositionWritable.LENGTH,
+ tuple.getFieldData(NodeSequenceWriterFactory.InputRFField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputRFField));
+ node.getRRList().setNewReference(
+ tuple.getFieldLength(NodeSequenceWriterFactory.InputRRField) / PositionWritable.LENGTH,
+ tuple.getFieldData(NodeSequenceWriterFactory.InputRRField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputRRField));
+
+ node.getKmer().setNewReference(
+ Marshal.getInt(tuple.getFieldData(NodeSequenceWriterFactory.InputCountOfKmerField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputCountOfKmerField)),
+ tuple.getFieldData(NodeSequenceWriterFactory.InputKmerBytesField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputKmerBytesField));
+ try {
+ output.write(node.toString().getBytes());
+ output.writeByte('\n');
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+
+ }
+
+ };
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGen.java
new file mode 100644
index 0000000..9649566
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGen.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.newgraph.job;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+
+public abstract class JobGen implements Serializable {
+
+ /**
+ * generate the jobId
+ */
+ private static final long serialVersionUID = 1L;
+ protected final ConfFactory confFactory;
+ protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+
+ public JobGen(GenomixJobConf job) throws HyracksDataException {
+ this.confFactory = new ConfFactory(job);
+ }
+
+ public abstract JobSpecification generateJob() throws HyracksException;
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
index 99885a7..f35d0ef 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
@@ -28,16 +28,17 @@
import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
-import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
+//import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.AggregateKmerAggregateFactory;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.MergeKmerAggregateFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.io.KMerSequenceWriterFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.io.KMerTextWriterFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.io.NodeSequenceWriterFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.io.NodeTextWriterFactory;
+//import edu.uci.ics.genomix.hyracks.dataflow.io.KMerSequenceWriterFactory;
+//import edu.uci.ics.genomix.hyracks.dataflow.io.KMerTextWriterFactory;
+//import edu.uci.ics.genomix.hyracks.dataflow.io.NodeSequenceWriterFactory;
+//import edu.uci.ics.genomix.hyracks.dataflow.io.NodeTextWriterFactory;
+
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -119,18 +120,6 @@
initJobConfiguration(scheduler);
}
- private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
- IAggregatorDescriptorFactory aggeragater, IAggregatorDescriptorFactory merger,
- ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
- IPointableFactory pointable, RecordDescriptor outRed) {
- return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, normalizer,
- aggeragater, merger, outRed, new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(pointable) }),
- tableSize), true);
- }
-
private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec, int[] keyFields,
IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
@@ -140,16 +129,8 @@
Object[] obj = new Object[3];
switch (groupbyType) {
- case EXTERNAL:
- obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
- combineRed);
- obj[1] = new MToNPartitioningConnectorDescriptor(jobSpec, partition);
- obj[2] = newExternalGroupby(jobSpec, keyFields, merger, merger, partition, normalizer, pointable,
- finalRec);
- break;
case PRECLUSTER:
default:
-
obj[0] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, aggregator,
combineRed);
@@ -190,15 +171,16 @@
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
- ReadsKeyValueParserFactory.readKmerOutputRec);
+ ReadsKeyValueParserFactory.readKmerOutputRec);//???????????
+
connectOperators(jobSpec, readOperator, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
jobSpec));
RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
jobSpec.setFrameSize(frameSize);
- Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateKmerAggregateFactory(),
- new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
+ Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateKmerAggregateFactory(readLength,kmerSize),
+ new MergeKmerAggregateFactory(readLength,kmerSize), new KmerHashPartitioncomputerFactory(),
new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
logDebug("LocalKmerGroupby Operator");
@@ -288,18 +270,11 @@
tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, GenomixJobConf.DEFAULT_FRAME_SIZE);
- bGenerateReversedKmer = conf.getBoolean(GenomixJobConf.REVERSED_KMER, GenomixJobConf.DEFAULT_REVERSED);
-
String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
- if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_EXTERNAL)) {
- groupbyType = GroupbyType.EXTERNAL;
- } else if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_PRECLUSTER)) {
- groupbyType = GroupbyType.PRECLUSTER;
- } else {
- groupbyType = GroupbyType.HYBRIDHASH;
- }
+ groupbyType = GroupbyType.PRECLUSTER;
- String output = conf.get(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
+ String output = conf.get(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+
if (output.equalsIgnoreCase("text")) {
outputFormat = OutputFormat.TEXT;
} else {