fix sequecefile.reader problem

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3040 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java
index 08c283d..7ace9b0 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java
@@ -2,14 +2,12 @@
 
 import java.io.DataInput;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
 
-import org.apache.hadoop.io.SequenceFile.ValueBytes;
 import org.apache.hadoop.io.Writable;
 
 
-public class KmerCountValue implements ValueBytes, Writable{
+public class KmerCountValue implements  Writable{
 	private byte adjBitMap;
 	private byte count;
 
@@ -23,25 +21,6 @@
 	}
 
 	@Override
-	public int getSize() {
-		return 2;
-	}
-
-	@Override
-	public void writeCompressedBytes(DataOutputStream arg0)
-			throws IllegalArgumentException, IOException {
-		arg0.writeByte(adjBitMap);
-		arg0.writeByte(count);
-	}
-
-	@Override
-	public void writeUncompressedBytes(DataOutputStream arg0)
-			throws IOException {
-		arg0.writeByte(adjBitMap);
-		arg0.writeByte(count);
-	}
-
-	@Override
 	public void readFields(DataInput arg0) throws IOException {
 		adjBitMap = arg0.readByte();
 		count = arg0.readByte();
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
index 66ce324..02be7d8 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
@@ -10,7 +10,6 @@
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.mapred.JobConf;
 
-import edu.uci.ics.genomix.dataflow.util.NonSyncWriter;
 import edu.uci.ics.genomix.type.KmerCountValue;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -35,9 +34,9 @@
 
 		ConfFactory cf;
 		Writer writer = null;
-//		NonSyncWriter writer = null;
 
 		KmerCountValue reEnterCount = new KmerCountValue();
+		BytesWritable reEnterKey = new BytesWritable();
 		/**
 		 * assumption is that output never change source!
 		 */
@@ -46,7 +45,6 @@
 				throws HyracksDataException {
 			try {
 				if (writer == null) {
-//					writer = new NonSyncWriter((FSDataOutputStream) output);
 					writer = SequenceFile.createWriter(cf.getConf(),
 							(FSDataOutputStream) output, BytesWritable.class,
 							KmerCountValue.class, CompressionType.NONE, null);
@@ -58,7 +56,10 @@
 				byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
 				byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
 				reEnterCount.reset(bitmap, count);
-				writer.appendRaw(kmer, keyStart, keyLength, reEnterCount);
+				reEnterKey.set(kmer, keyStart, keyLength);
+				writer.append(reEnterKey, reEnterCount);
+				// @mark: this method can not used for read in hadoop 0.20.2. 
+				//writer.appendRaw(kmer, keyStart, keyLength, reEnterCount);
 			} catch (IOException e) {
 				throw new HyracksDataException(e);
 			}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index 95562a6..fd8af39 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -79,6 +79,14 @@
 	private int recordSizeInBytes;
 	private int hashfuncStartLevel;
 
+	private void logDebug(String status){
+		String names = "";
+		for (String str : ncNodeNames){
+			names += str + " ";
+		}
+		LOG.info(status + " nc nodes:" + ncNodeNames.length + " " + names);
+	}
+	
 	public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler,
 			final Map<String, NodeControllerInfo> ncMap,
 			int numPartitionPerMachine) {
@@ -91,7 +99,7 @@
 			System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length,
 					nodes.length);
 		}
-		LOG.info("nc nodes:" + ncNodeNames.length + " " + ncNodeNames.toString());
+		logDebug("initialize");
 	}
 
 	private ExternalGroupOperatorDescriptor newExternalGroupby(
@@ -192,6 +200,11 @@
 
 			LOG.info("HDFS read into " + splits.length + " splits");
 			String[] readSchedule = scheduler.getLocationConstraints(splits);
+			String log = "";
+			for (String schedule: readSchedule){
+				log += schedule + " ";
+			}
+			LOG.info("HDFS read schedule " + log);
 			return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job,
 					splits, readSchedule, new ReadsKeyValueParserFactory(kmers));
 		} catch (Exception e) {
@@ -213,10 +226,12 @@
 		// File input
 		HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
 
+		logDebug("Read Operator");
 		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
 				readOperator, ncNodeNames);
 
 		generateDescriptorbyType(jobSpec);
+		logDebug("SingleGroupby Operator");
 		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
 				singleGrouper, ncNodeNames);
 
@@ -224,6 +239,7 @@
 				jobSpec);
 		jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
 
+		logDebug("CrossGrouper Operator");
 		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
 				crossGrouper, ncNodeNames);
 		jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
@@ -242,6 +258,7 @@
 		HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(
 				jobSpec, job, writer);
 
+		logDebug("WriteOperator");
 		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
 				writeOperator, ncNodeNames);
 
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
index 3a37087..8464684 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
@@ -17,7 +17,6 @@
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
@@ -55,7 +54,7 @@
 
 	private JobConf conf = new JobConf();
 	private int numberOfNC = 2;
-	private int numPartitionPerMachine = 2;
+	private int numPartitionPerMachine = 1;
 
 	private Driver driver;
 
@@ -122,13 +121,13 @@
 	@Test
 	public void TestExternalGroupby() throws Exception {
 		conf.set(GenomixJob.GROUPBY_TYPE, "external");
-		conf.set(GenomixJob.OUTPUT_FORMAT, "text");
+		conf.set(GenomixJob.OUTPUT_FORMAT, "binary");
 		System.err.println("Testing ExternalGroupBy");
 		driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
 		Assert.assertEquals(true, checkResults());
 	}
 
-	//@Test
+	@Test
 	public void TestPreClusterGroupby() throws Exception {
 		conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
 		conf.set(GenomixJob.OUTPUT_FORMAT, "text");
@@ -147,29 +146,60 @@
 	}
 
 	private boolean checkResults() throws Exception {
-		FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
-				FileSystem.getLocal(new Configuration()), new Path(
-						DUMPED_RESULT), false, conf, null);
-		File dumped = new File( DUMPED_RESULT);
-		String format = conf.get(GenomixJob.OUTPUT_FORMAT); 
-		if( !"text".equalsIgnoreCase(format)){
-	        SequenceFile.Reader reader = null;
-	        Path path = new Path(HDFS_OUTPUT_FILE);
-	        FileSystem dfs = FileSystem.get(conf);
-	        reader = new SequenceFile.Reader(dfs, path, conf);
-	        BytesWritable key = (BytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-	        KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-	        File filePathTo = new File(CONVERT_RESULT);
-	        BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
-	        int k = conf.getInt(GenomixJob.KMER_LENGTH, 25);
-	        while (reader.next(key, value)) {
-	            bw.write(Kmer.recoverKmerFrom(k, key.getBytes(), 0, key.getLength()) + "\t" + value.toString());
-	            bw.newLine();
-	        }
-	        bw.close();
-	        dumped = new File(CONVERT_RESULT);
+		File dumped = null;
+		String format = conf.get(GenomixJob.OUTPUT_FORMAT);
+		if ("text".equalsIgnoreCase(format)) {
+			FileUtil.copyMerge(FileSystem.get(conf),
+					new Path(HDFS_OUTPUT_PATH), FileSystem
+							.getLocal(new Configuration()), new Path(
+							DUMPED_RESULT), false, conf, null);
+			dumped = new File(DUMPED_RESULT);
+		} else {
+			
+			FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR
+			+ HDFS_OUTPUT_PATH));
+			File filePathTo = new File(CONVERT_RESULT);
+			BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+			for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
+				String partname = "/part-" + i;
+//				FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
+//						+ partname), FileSystem.getLocal(new Configuration()),
+//						new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname), false, conf);
+				
+				
+				Path path = new Path(HDFS_OUTPUT_PATH
+						+ partname);
+				FileSystem dfs = FileSystem.get(conf);
+				if (dfs.getFileStatus(path).getLen() == 0){
+					continue;
+				}
+				SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path,
+						conf);
+				BytesWritable key = (BytesWritable) ReflectionUtils
+						.newInstance(reader.getKeyClass(), conf);
+				KmerCountValue value = (KmerCountValue) ReflectionUtils
+						.newInstance(reader.getValueClass(), conf);
+
+				int k = conf.getInt(GenomixJob.KMER_LENGTH, 25);
+				while (reader.next(key, value)) {
+					if (key == null || value == null){
+						break;
+					}
+					bw.write(Kmer.recoverKmerFrom(k, key.getBytes(), 0,
+							key.getLength())
+							+ "\t" + value.toString());
+					System.out.println(Kmer.recoverKmerFrom(k, key.getBytes(), 0,
+							key.getLength())
+							+ "\t" + value.toString());
+					bw.newLine();
+				}
+				reader.close();
+
+			}
+			bw.close();
+			dumped = new File(CONVERT_RESULT);
 		}
-        
+
 		TestUtils.compareWithSortedResult(new File(EXPECTED_PATH), dumped);
 		return true;
 	}