build graph building job for hyracks
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
index 5b612dc..f027546 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
@@ -47,14 +47,6 @@
     public static final int OutputKmerField = 0;
     public static final int outputNodeIdListField = 1;
     
-    public static final int OutputForwardForwardField = 2;
-    public static final int OutputFFListCountField = 3;
-    public static final int OutputForwardReverseField = 4;
-    public static final int OutputFRListCountField = 5;
-    public static final int OutputReverseForwardField = 6;
-    public static final int OutputRFListCountField = 7;
-    public static final int OutputReverseReverseField = 8;
-    public static final int OutputRRListCountField = 9;
 
     private final int readLength;
     private final int kmerSize;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/KMerTextWriterFactory.java
new file mode 100644
index 0000000..f296fd4
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/KMerTextWriterFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.newgraph.io;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class KMerTextWriterFactory implements ITupleWriterFactory {
+
+    /**
+	 * 
+	 */
+    private static final long serialVersionUID = 1L;
+
+    private final int kmerSize;
+
+    public KMerTextWriterFactory(int k) {
+        kmerSize = k;
+    }
+
+    public class TupleWriter implements ITupleWriter {
+        private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+        private PositionListWritable plist = new PositionListWritable();
+
+        @Override
+        public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+            try {
+                if (kmer.getLength() > tuple.getFieldLength(KMerSequenceWriterFactory.InputKmerField)) {
+                    throw new IllegalArgumentException("Not enough kmer bytes");
+                }
+                kmer.setNewReference(tuple.getFieldData(KMerSequenceWriterFactory.InputKmerField),
+                        tuple.getFieldStart(KMerSequenceWriterFactory.InputKmerField));
+                int countOfPos = tuple.getFieldLength(KMerSequenceWriterFactory.InputPositionListField)
+                        / PositionWritable.LENGTH;
+                if (tuple.getFieldLength(KMerSequenceWriterFactory.InputPositionListField) % PositionWritable.LENGTH != 0) {
+                    throw new IllegalArgumentException("Invalid count of position byte");
+                }
+                plist.setNewReference(countOfPos, tuple.getFieldData(KMerSequenceWriterFactory.InputPositionListField),
+                        tuple.getFieldStart(KMerSequenceWriterFactory.InputPositionListField));
+
+                output.write(kmer.toString().getBytes());
+                output.writeByte('\t');
+                output.write(plist.toString().getBytes());
+                output.writeByte('\n');
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        @Override
+        public void open(DataOutput output) throws HyracksDataException {
+
+        }
+
+        @Override
+        public void close(DataOutput output) throws HyracksDataException {
+        }
+    }
+
+    @Override
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+        return new TupleWriter();
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
new file mode 100644
index 0000000..c579261
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.newgraph.io;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.oldtype.NodeWritable;
+import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class NodeTextWriterFactory implements ITupleWriterFactory {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    private final int initialKmerSize;
+
+    public NodeTextWriterFactory(int initialKmerSize) {
+        this.initialKmerSize = initialKmerSize;
+    }
+
+    @Override
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+        return new ITupleWriter() {
+            NodeWritable node = new NodeWritable(initialKmerSize);
+
+            @Override
+            public void open(DataOutput output) throws HyracksDataException {
+
+            }
+
+            @Override
+            public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+                node.getNodeID().setNewReference(tuple.getFieldData(NodeSequenceWriterFactory.InputNodeIDField),
+                        tuple.getFieldStart(NodeSequenceWriterFactory.InputNodeIDField));
+                node.getFFList().setNewReference(
+                        tuple.getFieldLength(NodeSequenceWriterFactory.InputFFField) / PositionWritable.LENGTH,
+                        tuple.getFieldData(NodeSequenceWriterFactory.InputFFField),
+                        tuple.getFieldStart(NodeSequenceWriterFactory.InputFFField));
+                node.getFRList().setNewReference(
+                        tuple.getFieldLength(NodeSequenceWriterFactory.InputFRField) / PositionWritable.LENGTH,
+                        tuple.getFieldData(NodeSequenceWriterFactory.InputFRField),
+                        tuple.getFieldStart(NodeSequenceWriterFactory.InputFRField));
+                node.getRFList().setNewReference(
+                        tuple.getFieldLength(NodeSequenceWriterFactory.InputRFField) / PositionWritable.LENGTH,
+                        tuple.getFieldData(NodeSequenceWriterFactory.InputRFField),
+                        tuple.getFieldStart(NodeSequenceWriterFactory.InputRFField));
+                node.getRRList().setNewReference(
+                        tuple.getFieldLength(NodeSequenceWriterFactory.InputRRField) / PositionWritable.LENGTH,
+                        tuple.getFieldData(NodeSequenceWriterFactory.InputRRField),
+                        tuple.getFieldStart(NodeSequenceWriterFactory.InputRRField));
+
+                node.getKmer().setNewReference(
+                        Marshal.getInt(tuple.getFieldData(NodeSequenceWriterFactory.InputCountOfKmerField),
+                                tuple.getFieldStart(NodeSequenceWriterFactory.InputCountOfKmerField)),
+                        tuple.getFieldData(NodeSequenceWriterFactory.InputKmerBytesField),
+                        tuple.getFieldStart(NodeSequenceWriterFactory.InputKmerBytesField));
+                try {
+                    output.write(node.toString().getBytes());
+                    output.writeByte('\n');
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void close(DataOutput output) throws HyracksDataException {
+
+            }
+
+        };
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGen.java
new file mode 100644
index 0000000..9649566
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGen.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.newgraph.job;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+
+public abstract class JobGen implements Serializable {
+
+    /**
+     * generate the jobId
+     */
+    private static final long serialVersionUID = 1L;
+    protected final ConfFactory confFactory;
+    protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+
+    public JobGen(GenomixJobConf job) throws HyracksDataException {
+        this.confFactory = new ConfFactory(job);
+    }
+
+    public abstract JobSpecification generateJob() throws HyracksException;
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
index 99885a7..f35d0ef 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
@@ -28,16 +28,17 @@
 import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
 import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
 import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
-import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
+//import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
 import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.AggregateKmerAggregateFactory;
 import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.MergeKmerAggregateFactory;
 
 
-import edu.uci.ics.genomix.hyracks.dataflow.io.KMerSequenceWriterFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.io.KMerTextWriterFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.io.NodeSequenceWriterFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.io.NodeTextWriterFactory;
+//import edu.uci.ics.genomix.hyracks.dataflow.io.KMerSequenceWriterFactory;
+//import edu.uci.ics.genomix.hyracks.dataflow.io.KMerTextWriterFactory;
+//import edu.uci.ics.genomix.hyracks.dataflow.io.NodeSequenceWriterFactory;
+//import edu.uci.ics.genomix.hyracks.dataflow.io.NodeTextWriterFactory;
+
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -119,18 +120,6 @@
         initJobConfiguration(scheduler);
     }
 
-    private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
-            IAggregatorDescriptorFactory aggeragater, IAggregatorDescriptorFactory merger,
-            ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
-            IPointableFactory pointable, RecordDescriptor outRed) {
-        return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, normalizer,
-                aggeragater, merger, outRed, new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(pointable) }),
-                        tableSize), true);
-    }
-
     private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec, int[] keyFields,
             IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
             ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
@@ -140,16 +129,8 @@
         Object[] obj = new Object[3];
 
         switch (groupbyType) {
-            case EXTERNAL:
-                obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
-                        combineRed);
-                obj[1] = new MToNPartitioningConnectorDescriptor(jobSpec, partition);
-                obj[2] = newExternalGroupby(jobSpec, keyFields, merger, merger, partition, normalizer, pointable,
-                        finalRec);
-                break;
             case PRECLUSTER:
             default:
-
                 obj[0] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
                         new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, aggregator,
                         combineRed);
@@ -190,15 +171,16 @@
 
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
-                ReadsKeyValueParserFactory.readKmerOutputRec);
+                ReadsKeyValueParserFactory.readKmerOutputRec);//???????????
+        
         connectOperators(jobSpec, readOperator, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
                 jobSpec));
 
         RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
         jobSpec.setFrameSize(frameSize);
 
-        Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateKmerAggregateFactory(),
-                new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
+        Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateKmerAggregateFactory(readLength,kmerSize),
+                new MergeKmerAggregateFactory(readLength,kmerSize), new KmerHashPartitioncomputerFactory(),
                 new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
         AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
         logDebug("LocalKmerGroupby Operator");
@@ -288,18 +270,11 @@
         tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
         frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, GenomixJobConf.DEFAULT_FRAME_SIZE);
 
-        bGenerateReversedKmer = conf.getBoolean(GenomixJobConf.REVERSED_KMER, GenomixJobConf.DEFAULT_REVERSED);
-
         String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
-        if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_EXTERNAL)) {
-            groupbyType = GroupbyType.EXTERNAL;
-        } else if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_PRECLUSTER)) {
-            groupbyType = GroupbyType.PRECLUSTER;
-        } else {
-            groupbyType = GroupbyType.HYBRIDHASH;
-        }
+        groupbyType = GroupbyType.PRECLUSTER;
 
-        String output = conf.get(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
+        String output = conf.get(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+        
         if (output.equalsIgnoreCase("text")) {
             outputFormat = OutputFormat.TEXT;
         } else {