Add setFrameSize; refine test cases

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2974 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
index 21808f8..39f181a 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
@@ -28,7 +28,20 @@
 	public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
 	public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
 	public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
-
+	
+	public static final int DEFAULT_KMER= 55;
+	public static final int DEFAULT_FRAME_SIZE = 32768;
+	public static final int DEFAULT_FRAME_LIMIT = 4096;
+	public static final int DEFAULT_TABLE_SIZE = 10485767;
+	public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
+	public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
+	public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
+	public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
+	public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
+	
+	public static final String DEFAULT_GROUPBY_TYPE ="hybrid";
+	public static final String DEFAULT_OUTPUT_FORMAT ="binary";
+	
 	public GenomixJob() throws IOException {
 		super(new Configuration());
 	}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index a2e8860..baffdcb 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -57,12 +57,12 @@
 
 	JobConf job;
 	private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
-	private final Map<String, NodeControllerInfo> ncMap;
 	private Scheduler scheduler;
 	private String[] ncNodeNames;
 
 	private int kmers;
 	private int frameLimits;
+	private int frameSize;
 	private int tableSize;
 	private GroupbyType groupbyType;
 	private OutputFormat outputFormat;
@@ -73,11 +73,16 @@
 	private RecordDescriptor readOutputRec;
 	private RecordDescriptor combineOutputRec;
 
+	/** works for hybrid hashing */
+	private long inputSizeInRawRecords;
+	private long inputSizeInUniqueKeys;
+	private int recordSizeInBytes;
+	private int hashfuncStartLevel;
+
 	public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler,
 			final Map<String, NodeControllerInfo> ncMap,
 			int numPartitionPerMachine) {
 		super(job);
-		this.ncMap = ncMap;
 		this.scheduler = scheduler;
 		String[] nodes = new String[ncMap.size()];
 		ncMap.keySet().toArray(nodes);
@@ -164,23 +169,13 @@
 			break;
 		case HYBRIDHASH:
 		default:
-			long inputSizeInRawRecords = conf.getLong(
-					GenomixJob.GROUPBY_HYBRID_INPUTSIZE, 154000000);
-			long inputSizeInUniqueKeys = conf.getLong(
-					GenomixJob.GROUPBY_HYBRID_INPUTKEYS, 38500000);
-			int recordSizeInBytes = conf.getInt(
-					GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE, 9);
-			int hashfuncStartLevel = conf.getInt(
-					GenomixJob.GROUPBY_HYBRID_HASHLEVEL, 1);
 
 			singleGrouper = newHybridGroupby(jobSpec, keyFields,
 					inputSizeInRawRecords, inputSizeInUniqueKeys,
 					recordSizeInBytes, hashfuncStartLevel);
 			connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
 					new KmerHashPartitioncomputerFactory());
-			/** here read the different recordSize why ? */
-			recordSizeInBytes = conf.getInt(
-					GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS, 13);
+
 			crossGrouper = newHybridGroupby(jobSpec, keyFields,
 					inputSizeInRawRecords, inputSizeInUniqueKeys,
 					recordSizeInBytes, hashfuncStartLevel);
@@ -212,6 +207,7 @@
 		combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
 				null, ByteSerializerDeserializer.INSTANCE,
 				ByteSerializerDeserializer.INSTANCE });
+		jobSpec.setFrameSize(frameSize);
 
 		// File input
 		HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
@@ -262,11 +258,31 @@
 	@Override
 	protected void initJobConfiguration() {
 
-		kmers = conf.getInt(GenomixJob.KMER_LENGTH, 25);
-		frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, 4096);
-		tableSize = conf.getInt(GenomixJob.TABLE_SIZE, 10485767);
+		kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+		frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT,
+				GenomixJob.DEFAULT_FRAME_LIMIT);
+		tableSize = conf.getInt(GenomixJob.TABLE_SIZE,
+				GenomixJob.DEFAULT_TABLE_SIZE);
+		frameSize = conf.getInt(GenomixJob.FRAME_SIZE,
+				GenomixJob.DEFAULT_FRAME_SIZE);
+		inputSizeInRawRecords = conf.getLong(
+				GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
+				GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
+		inputSizeInUniqueKeys = conf.getLong(
+				GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
+				GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
+		recordSizeInBytes = conf.getInt(
+				GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
+				GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
+		hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
+				GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
+		/** here read the different recordSize why ? */
+		recordSizeInBytes = conf.getInt(
+				GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
+				GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
 
-		String type = conf.get(GenomixJob.GROUPBY_TYPE, "hybrid");
+		String type = conf.get(GenomixJob.GROUPBY_TYPE,
+				GenomixJob.DEFAULT_GROUPBY_TYPE);
 		if (type.equalsIgnoreCase("external")) {
 			groupbyType = GroupbyType.EXTERNAL;
 		} else if (type.equalsIgnoreCase("precluster")) {
@@ -275,13 +291,12 @@
 			groupbyType = GroupbyType.HYBRIDHASH;
 		}
 
-		String output = conf.get(GenomixJob.OUTPUT_FORMAT, "binary");
-		if (output.equalsIgnoreCase("binary")) {
-			outputFormat = OutputFormat.BINARY;
-		} else if (output.equalsIgnoreCase("text")) {
+		String output = conf.get(GenomixJob.OUTPUT_FORMAT,
+				GenomixJob.DEFAULT_OUTPUT_FORMAT);
+		if (output.equalsIgnoreCase("text")) {
 			outputFormat = OutputFormat.TEXT;
 		} else {
-			outputFormat = OutputFormat.TEXT;
+			outputFormat = OutputFormat.BINARY;
 		}
 		job = new JobConf(conf);
 	}
diff --git a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
index c185d63..c089128 100644
--- a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
+++ b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
@@ -128,7 +128,7 @@
 		Assert.assertEquals(true, checkResults());
 	}
 
-//	@Test
+	@Test
 	public void TestPreClusterGroupby() throws Exception {
 		cleanUpReEntry();
 		conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
@@ -137,7 +137,7 @@
 		Assert.assertEquals(true, checkResults());
 	}
 
-//	@Test
+	@Test
 	public void TestHybridGroupby() throws Exception {
 		cleanUpReEntry();
 		conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
@@ -170,7 +170,7 @@
 	        dumped = new File(CONVERT_RESULT);
 		}
         
-		TestUtils.compareWithResult(new File(EXPECTED_PATH), dumped);
+		TestUtils.compareWithSortedResult(new File(EXPECTED_PATH), dumped);
 		return true;
 	}
 
diff --git a/genomix/genomix-core/src/test/resources/expected/result2 b/genomix/genomix-core/src/test/resources/expected/result2
index a22dd28..5e76458 100755
--- a/genomix/genomix-core/src/test/resources/expected/result2
+++ b/genomix/genomix-core/src/test/resources/expected/result2
@@ -1,4 +1,4 @@
 AATAG	|A	1
+AGAAG	T|	1
 ATAGA	A|A	1
 TAGAA	A|G	1
-AGAAG	T|	1
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/TestUtils.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/TestUtils.java
index 3826688..e155a2c 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/TestUtils.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/TestUtils.java
@@ -15,9 +15,12 @@
 
 package edu.uci.ics.hyracks.hdfs.utils;
 
+import java.awt.List;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.Collections;
 
 public class TestUtils {
 
@@ -48,6 +51,44 @@
             readerActual.close();
         }
     }
+    
+    /**
+     * Compare with the sorted expected file.
+     * The actual file may not be sorted; 
+     * @param expectedFile
+     * @param actualFile
+     */
+    public static void compareWithSortedResult(File expectedFile, File actualFile) throws Exception{
+    	BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
+    	BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
+    	ArrayList<String> actualLines = new ArrayList<String>();
+    	String lineExpected, lineActual;
+    	try{
+    		while ( (lineActual = readerActual.readLine())!=null){
+    			actualLines.add(lineActual);
+    		}
+    		Collections.sort(actualLines);
+    		int num = 1;
+    		for(String actualLine : actualLines){
+    			lineExpected = readerExpected.readLine();
+    			if (lineExpected == null){
+    				throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
+    			}
+    			if ( !equalStrings(lineExpected, actualLine)){
+    				   throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
+                               + actualLine);
+    			}
+                ++num;
+    		}
+	        lineExpected = readerExpected.readLine();
+	        if (lineExpected != null) {
+	            throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineExpected);
+	        }
+    	} finally{
+    		readerActual.close();
+    		readerExpected.close();
+    	}
+    }
 
     private static boolean equalStrings(String s1, String s2) {
         String[] rowsOne = s1.split("\n");