sequential file format

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2949 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-hadoop/actual/result2/.part-00000.crc b/genomix/genomix-hadoop/actual/result2/.part-00000.crc
index ea0c7ed..1bb3d3d 100755
--- a/genomix/genomix-hadoop/actual/result2/.part-00000.crc
+++ b/genomix/genomix-hadoop/actual/result2/.part-00000.crc
Binary files differ
diff --git a/genomix/genomix-hadoop/actual/result2/part-00000 b/genomix/genomix-hadoop/actual/result2/part-00000
index 882a5db..122fd34 100755
--- a/genomix/genomix-hadoop/actual/result2/part-00000
+++ b/genomix/genomix-hadoop/actual/result2/part-00000
Binary files differ
diff --git a/genomix/genomix-hadoop/data/webmap/text.txt b/genomix/genomix-hadoop/data/webmap/text.txt
index f63a141..c6cd7fe 100755
--- a/genomix/genomix-hadoop/data/webmap/text.txt
+++ b/genomix/genomix-hadoop/data/webmap/text.txt
@@ -1,4 +1,4 @@
 @625E1AAXX100810:1:100:10000:10271/1

-AATAGAAG

+AATAGAAGATCGAT

 +

 EDBDB?BEEEDGGEGGGDGGGA>DG@GGD;GD@DG@F?<B<BFFD?

diff --git a/genomix/genomix-hadoop/expected/result2 b/genomix/genomix-hadoop/expected/result2
index 2c44be3..3665e18 100755
--- a/genomix/genomix-hadoop/expected/result2
+++ b/genomix/genomix-hadoop/expected/result2
@@ -1,4 +1,3 @@
-10 03	18	1
-31 00	1	1
-41 00	-128	1
-c4 00	17	1
\ No newline at end of file
+39 41 0c	1	1
+e4 04 31	24	1
+93 13 c4	16	1
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixCombiner.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixCombiner.java
index 74ef455..b66ee15 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixCombiner.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixCombiner.java
@@ -30,12 +30,12 @@
  * This class implement the combiner operator of Mapreduce model
  */
 public class GenomixCombiner extends MapReduceBase implements
-        Reducer<BytesWritable, ValueWritable, BytesWritable, ValueWritable> {
+        Reducer<ValueBytesWritable, ValueWritable, ValueBytesWritable, ValueWritable> {
     public ValueWritable vaWriter = new ValueWritable();
 
     @Override
-    public void reduce(BytesWritable key, Iterator<ValueWritable> values,
-            OutputCollector<BytesWritable, ValueWritable> output, Reporter reporter) throws IOException {
+    public void reduce(ValueBytesWritable key, Iterator<ValueWritable> values,
+            OutputCollector<ValueBytesWritable, ValueWritable> output, Reporter reporter) throws IOException {
         byte groupByAdjList = 0;
         int count = 0;
         byte bytCount = 0;
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixDriver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixDriver.java
index ad62ee9..71a675e 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixDriver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixDriver.java
@@ -21,12 +21,15 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
 import org.kohsuke.args4j.CmdLineParser;
@@ -66,15 +69,20 @@
         conf.setReducerClass(GenomixReducer.class);
         conf.setCombinerClass(GenomixCombiner.class);
 
-        conf.setMapOutputKeyClass(BytesWritable.class);
+        conf.setMapOutputKeyClass(ValueBytesWritable.class);
         conf.setMapOutputValueClass(ValueWritable.class);
 
         conf.setInputFormat(TextInputFormat.class);
-        conf.setOutputFormat(TextOutputFormat.class);
-        conf.setOutputKeyClass(BytesWritable.class);
+//        conf.setOutputFormat(TextOutputFormat.class);
+        conf.setOutputFormat(SequenceFileOutputFormat.class);
+        conf.setOutputKeyClass(ValueBytesWritable.class);
         conf.setOutputValueClass(ValueWritable.class);
         FileInputFormat.setInputPaths(conf, new Path(inputPath));
         FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+//        SequenceFileOutputFormat.setOutputPath(conf,new Path(outputPath));
+//        SequenceFileOutputFormat.setCompressOutput(conf, true);
+//        SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);
+//        SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK);
         conf.setNumReduceTasks(numReducers);
 
         FileSystem dfs = FileSystem.get(conf);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
index d8d0ff8..7162c91 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
@@ -33,7 +33,8 @@
 /**
  * This class implement mapper operator of mapreduce model
  */
-public class GenomixMapper extends MapReduceBase implements Mapper<LongWritable, Text, BytesWritable, ValueWritable> {
+public class GenomixMapper extends MapReduceBase implements
+        Mapper<LongWritable, Text, ValueBytesWritable, ValueWritable> {
 
     public class CurrenByte {
         public byte curByte;
@@ -42,7 +43,7 @@
 
     public static int KMER_SIZE;
     public ValueWritable outputAdjList = new ValueWritable();
-    public BytesWritable outputKmer = new BytesWritable();
+    public ValueBytesWritable outputKmer = new ValueBytesWritable();
 
     @Override
     public void configure(JobConf job) {
@@ -91,7 +92,7 @@
       C 01000000 64
       T 10000000 128*/
     @Override
-    public void map(LongWritable key, Text value, OutputCollector<BytesWritable, ValueWritable> output,
+    public void map(LongWritable key, Text value, OutputCollector<ValueBytesWritable, ValueWritable> output,
             Reporter reporter) throws IOException {
         /* A 00
            G 01
@@ -119,7 +120,7 @@
                 byte kmerAdjList = 0;
                 byte initial;
                 if (i >= KMER_SIZE) {
-                    outputKmer.set(kmerValue, 0, size);
+                    outputKmer.set(kmerValue, (byte) 0, (byte) size);
                     switch ((int) preMarker) {
                         case -1:
                             kmerAdjList = (byte) (kmerAdjList + 0);
@@ -253,7 +254,7 @@
                         break;
                 }
                 outputAdjList.set(kmerAdjList, count);
-                outputKmer.set(kmerValue, 0, size);
+                outputKmer.set(kmerValue, (byte) 0, (byte) size);
                 output.collect(outputKmer, outputAdjList);
             }
         }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixReducer.java
index ebf0df0..a9584f6 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixReducer.java
@@ -30,12 +30,12 @@
  * This class implement reducer operator of mapreduce model
  */
 public class GenomixReducer extends MapReduceBase implements
-        Reducer<BytesWritable, ValueWritable, BytesWritable, ValueWritable> {
+        Reducer<ValueBytesWritable, ValueWritable, ValueBytesWritable, ValueWritable> {
     ValueWritable valWriter = new ValueWritable();
 
     @Override
-    public void reduce(BytesWritable key, Iterator<ValueWritable> values,
-            OutputCollector<BytesWritable, ValueWritable> output, Reporter reporter) throws IOException {
+    public void reduce(ValueBytesWritable key, Iterator<ValueWritable> values,
+            OutputCollector<ValueBytesWritable, ValueWritable> output, Reporter reporter) throws IOException {
         byte groupByAdjList = 0;
         int count = 0;
         byte bytCount = 0;
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
index 49126d6..754f99d 100755
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
@@ -14,18 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import java.io.BufferedWriter;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
+import java.io.FileWriter;
 import java.io.IOException;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.junit.Test;
 
 import edu.uci.ics.utils.TestUtils;
@@ -43,6 +53,7 @@
     private static final String RESULT_PATH = "/result2";
     private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + RESULT_PATH + "/part-00000";
     private static final String EXPECTED_PATH = "expected/result2";
+    private static final String TEST_SOURCE_DIR = "testactual/source.txt";
 
     private MiniDFSCluster dfsCluster;
     private MiniMRCluster mrCluster;
@@ -56,9 +67,23 @@
 
         // run graph transformation tests
         GenomixDriver tldriver = new GenomixDriver();
-        tldriver.run(HDFS_PATH, RESULT_PATH, 2, 5, HADOOP_CONF_PATH);
+        tldriver.run(HDFS_PATH, RESULT_PATH, 2, 12, HADOOP_CONF_PATH);
+
+        SequenceFile.Reader reader = null;
+        Path path = new Path(RESULT_PATH + "/part-00000");
+        reader = new SequenceFile.Reader(dfs, path, conf);
+        ValueBytesWritable key = (ValueBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+        ValueWritable value = (ValueWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+        File filePathTo = new File(TEST_SOURCE_DIR);
+        BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+        while (reader.next(key, value)) {
+            bw.write(key + "\t" + value.toString());
+            bw.newLine();
+        }
+        bw.close();
+        
         dumpResult();
-        TestUtils.compareWithResult(new File(DUMPED_RESULT), new File(EXPECTED_PATH));
+        TestUtils.compareWithResult(new File(TEST_SOURCE_DIR), new File(EXPECTED_PATH));
 
         cleanupHadoop();