add outputformat option

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2946 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
index f15c08b..6f856de 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
@@ -20,7 +20,7 @@
 
 	private static final long serialVersionUID = 1L;
 	private Configuration conf;
-	KMerSequenceWriterFactory(Configuration conf){
+	public KMerSequenceWriterFactory(Configuration conf){
 		this.conf = conf;
 	}
 	
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
index 80fe57b..1bdaabb 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
@@ -26,7 +26,7 @@
 						byte[] data = tuple.getFieldData(0);
 						int start = tuple.getFieldStart(0);
 						int len = tuple.getFieldLength(0);
-						output.write(data, start, len);
+						output.write(new String(data,start,len).getBytes());
 						output.writeChar(' ');
 					}
 					output.writeByte(newLine);
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
index bdc8202..7d6101b 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
@@ -17,8 +17,10 @@
 	public static final String FRAME_LIMIT = "genomix.framelimit";
 	/** Table Size, hyracks need */
 	public static final String TABLE_SIZE = "genomix.tablesize";
-	/** Groupby types ? */
+	/** Groupby types */
 	public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
+	/** Graph outputformat */
+	public static final String OUTPUT_FORMAT = "genomix.graph.output";
 
 	/** Configurations used by hybrid groupby function in graph build phrase */
 	public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index 6f2bc5e..a67f42f 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -13,6 +13,8 @@
 import edu.uci.ics.genomix.data.std.accessors.VLongBinaryHashFunctionFamily;
 import edu.uci.ics.genomix.data.std.primitive.VLongPointable;
 import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.genomix.dataflow.KMerSequenceWriterFactory;
+import edu.uci.ics.genomix.dataflow.KMerTextWriterFactory;
 import edu.uci.ics.genomix.dataflow.KMerWriterFactory;
 import edu.uci.ics.genomix.dataflow.ReadsKeyValueParserFactory;
 import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
@@ -41,6 +43,7 @@
 import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
 import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
 import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
 import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
@@ -49,6 +52,9 @@
 	public enum GroupbyType {
 		EXTERNAL, PRECLUSTER, HYBRIDHASH,
 	}
+	public enum OutputFormat{
+		TEXT,BINARY,
+	}
 
 	private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
 	private final Map<String, NodeControllerInfo> ncMap;
@@ -59,6 +65,7 @@
 	private int frameLimits;
 	private int tableSize;
 	private GroupbyType groupbyType;
+	private OutputFormat outputFormat;
 
 	private AbstractOperatorDescriptor singleGrouper;
 	private IConnectorDescriptor connPartition;
@@ -220,8 +227,17 @@
 		jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
 
 		// Output
+		ITupleWriterFactory writer = null;
+		switch (outputFormat){
+		case TEXT:
+			writer = new KMerTextWriterFactory();
+			break;
+		case BINARY: default:
+			writer = new KMerSequenceWriterFactory(conf);
+			break;
+		}
 		HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(
-				jobSpec, (JobConf) conf, new KMerWriterFactory());
+				jobSpec, (JobConf) conf, writer);
 
 		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
 				writeOperator, ncNodeNames);
@@ -251,6 +267,15 @@
 		} else {
 			groupbyType = GroupbyType.HYBRIDHASH;
 		}
+		
+		String output = conf.get(GenomixJob.OUTPUT_FORMAT, "binary");
+		if (output.equalsIgnoreCase("binary")){
+			outputFormat = OutputFormat.BINARY;
+		} else if ( output.equalsIgnoreCase("text")){
+			outputFormat = OutputFormat.TEXT;
+		} else {
+			outputFormat = OutputFormat.TEXT;
+		}
 	}
 
 }