Add setFrameSize; refine test cases
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2974 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
index 21808f8..39f181a 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
@@ -28,7 +28,20 @@
public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
-
+
+ public static final int DEFAULT_KMER= 55;
+ public static final int DEFAULT_FRAME_SIZE = 32768;
+ public static final int DEFAULT_FRAME_LIMIT = 4096;
+ public static final int DEFAULT_TABLE_SIZE = 10485767;
+ public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
+ public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
+ public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
+ public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
+ public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
+
+ public static final String DEFAULT_GROUPBY_TYPE ="hybrid";
+ public static final String DEFAULT_OUTPUT_FORMAT ="binary";
+
public GenomixJob() throws IOException {
super(new Configuration());
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index a2e8860..baffdcb 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -57,12 +57,12 @@
JobConf job;
private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
- private final Map<String, NodeControllerInfo> ncMap;
private Scheduler scheduler;
private String[] ncNodeNames;
private int kmers;
private int frameLimits;
+ private int frameSize;
private int tableSize;
private GroupbyType groupbyType;
private OutputFormat outputFormat;
@@ -73,11 +73,16 @@
private RecordDescriptor readOutputRec;
private RecordDescriptor combineOutputRec;
+ /** works for hybrid hashing */
+ private long inputSizeInRawRecords;
+ private long inputSizeInUniqueKeys;
+ private int recordSizeInBytes;
+ private int hashfuncStartLevel;
+
public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler,
final Map<String, NodeControllerInfo> ncMap,
int numPartitionPerMachine) {
super(job);
- this.ncMap = ncMap;
this.scheduler = scheduler;
String[] nodes = new String[ncMap.size()];
ncMap.keySet().toArray(nodes);
@@ -164,23 +169,13 @@
break;
case HYBRIDHASH:
default:
- long inputSizeInRawRecords = conf.getLong(
- GenomixJob.GROUPBY_HYBRID_INPUTSIZE, 154000000);
- long inputSizeInUniqueKeys = conf.getLong(
- GenomixJob.GROUPBY_HYBRID_INPUTKEYS, 38500000);
- int recordSizeInBytes = conf.getInt(
- GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE, 9);
- int hashfuncStartLevel = conf.getInt(
- GenomixJob.GROUPBY_HYBRID_HASHLEVEL, 1);
singleGrouper = newHybridGroupby(jobSpec, keyFields,
inputSizeInRawRecords, inputSizeInUniqueKeys,
recordSizeInBytes, hashfuncStartLevel);
connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
new KmerHashPartitioncomputerFactory());
- /** here read the different recordSize why ? */
- recordSizeInBytes = conf.getInt(
- GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS, 13);
+
crossGrouper = newHybridGroupby(jobSpec, keyFields,
inputSizeInRawRecords, inputSizeInUniqueKeys,
recordSizeInBytes, hashfuncStartLevel);
@@ -212,6 +207,7 @@
combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
null, ByteSerializerDeserializer.INSTANCE,
ByteSerializerDeserializer.INSTANCE });
+ jobSpec.setFrameSize(frameSize);
// File input
HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
@@ -262,11 +258,31 @@
@Override
protected void initJobConfiguration() {
- kmers = conf.getInt(GenomixJob.KMER_LENGTH, 25);
- frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, 4096);
- tableSize = conf.getInt(GenomixJob.TABLE_SIZE, 10485767);
+ kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+ frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT,
+ GenomixJob.DEFAULT_FRAME_LIMIT);
+ tableSize = conf.getInt(GenomixJob.TABLE_SIZE,
+ GenomixJob.DEFAULT_TABLE_SIZE);
+ frameSize = conf.getInt(GenomixJob.FRAME_SIZE,
+ GenomixJob.DEFAULT_FRAME_SIZE);
+ inputSizeInRawRecords = conf.getLong(
+ GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
+ inputSizeInUniqueKeys = conf.getLong(
+ GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
+ recordSizeInBytes = conf.getInt(
+ GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
+ hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
+ /** here read the different recordSize why ? */
+ recordSizeInBytes = conf.getInt(
+ GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
- String type = conf.get(GenomixJob.GROUPBY_TYPE, "hybrid");
+ String type = conf.get(GenomixJob.GROUPBY_TYPE,
+ GenomixJob.DEFAULT_GROUPBY_TYPE);
if (type.equalsIgnoreCase("external")) {
groupbyType = GroupbyType.EXTERNAL;
} else if (type.equalsIgnoreCase("precluster")) {
@@ -275,13 +291,12 @@
groupbyType = GroupbyType.HYBRIDHASH;
}
- String output = conf.get(GenomixJob.OUTPUT_FORMAT, "binary");
- if (output.equalsIgnoreCase("binary")) {
- outputFormat = OutputFormat.BINARY;
- } else if (output.equalsIgnoreCase("text")) {
+ String output = conf.get(GenomixJob.OUTPUT_FORMAT,
+ GenomixJob.DEFAULT_OUTPUT_FORMAT);
+ if (output.equalsIgnoreCase("text")) {
outputFormat = OutputFormat.TEXT;
} else {
- outputFormat = OutputFormat.TEXT;
+ outputFormat = OutputFormat.BINARY;
}
job = new JobConf(conf);
}
diff --git a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
index c185d63..c089128 100644
--- a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
+++ b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
@@ -128,7 +128,7 @@
Assert.assertEquals(true, checkResults());
}
-// @Test
+ @Test
public void TestPreClusterGroupby() throws Exception {
cleanUpReEntry();
conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
@@ -137,7 +137,7 @@
Assert.assertEquals(true, checkResults());
}
-// @Test
+ @Test
public void TestHybridGroupby() throws Exception {
cleanUpReEntry();
conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
@@ -170,7 +170,7 @@
dumped = new File(CONVERT_RESULT);
}
- TestUtils.compareWithResult(new File(EXPECTED_PATH), dumped);
+ TestUtils.compareWithSortedResult(new File(EXPECTED_PATH), dumped);
return true;
}
diff --git a/genomix/genomix-core/src/test/resources/expected/result2 b/genomix/genomix-core/src/test/resources/expected/result2
index a22dd28..5e76458 100755
--- a/genomix/genomix-core/src/test/resources/expected/result2
+++ b/genomix/genomix-core/src/test/resources/expected/result2
@@ -1,4 +1,4 @@
AATAG |A 1
+AGAAG T| 1
ATAGA A|A 1
TAGAA A|G 1
-AGAAG T| 1
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/TestUtils.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/TestUtils.java
index 3826688..e155a2c 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/TestUtils.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/utils/TestUtils.java
@@ -15,9 +15,12 @@
package edu.uci.ics.hyracks.hdfs.utils;
+import java.awt.List;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.Collections;
public class TestUtils {
@@ -48,6 +51,44 @@
readerActual.close();
}
}
+
+ /**
+ * Compare with the sorted expected file.
+ * The actual file may not be sorted;
+ * @param expectedFile
+ * @param actualFile
+ */
+ public static void compareWithSortedResult(File expectedFile, File actualFile) throws Exception{
+ BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
+ BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
+ ArrayList<String> actualLines = new ArrayList<String>();
+ String lineExpected, lineActual;
+ try{
+ while ( (lineActual = readerActual.readLine())!=null){
+ actualLines.add(lineActual);
+ }
+ Collections.sort(actualLines);
+ int num = 1;
+ for(String actualLine : actualLines){
+ lineExpected = readerExpected.readLine();
+ if (lineExpected == null){
+ throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
+ }
+ if ( !equalStrings(lineExpected, actualLine)){
+ throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
+ + actualLine);
+ }
+ ++num;
+ }
+ lineExpected = readerExpected.readLine();
+ if (lineExpected != null) {
+ throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineExpected);
+ }
+ } finally{
+ readerActual.close();
+ readerExpected.close();
+ }
+ }
private static boolean equalStrings(String s1, String s2) {
String[] rowsOne = s1.split("\n");