sequential file format
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2949 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-hadoop/actual/result2/.part-00000.crc b/genomix/genomix-hadoop/actual/result2/.part-00000.crc
index ea0c7ed..1bb3d3d 100755
--- a/genomix/genomix-hadoop/actual/result2/.part-00000.crc
+++ b/genomix/genomix-hadoop/actual/result2/.part-00000.crc
Binary files differ
diff --git a/genomix/genomix-hadoop/actual/result2/part-00000 b/genomix/genomix-hadoop/actual/result2/part-00000
index 882a5db..122fd34 100755
--- a/genomix/genomix-hadoop/actual/result2/part-00000
+++ b/genomix/genomix-hadoop/actual/result2/part-00000
Binary files differ
diff --git a/genomix/genomix-hadoop/data/webmap/text.txt b/genomix/genomix-hadoop/data/webmap/text.txt
index f63a141..c6cd7fe 100755
--- a/genomix/genomix-hadoop/data/webmap/text.txt
+++ b/genomix/genomix-hadoop/data/webmap/text.txt
@@ -1,4 +1,4 @@
@625E1AAXX100810:1:100:10000:10271/1
-AATAGAAG
+AATAGAAGATCGAT
+
EDBDB?BEEEDGGEGGGDGGGA>DG@GGD;GD@DG@F?<B<BFFD?
diff --git a/genomix/genomix-hadoop/expected/result2 b/genomix/genomix-hadoop/expected/result2
index 2c44be3..3665e18 100755
--- a/genomix/genomix-hadoop/expected/result2
+++ b/genomix/genomix-hadoop/expected/result2
@@ -1,4 +1,3 @@
-10 03 18 1
-31 00 1 1
-41 00 -128 1
-c4 00 17 1
\ No newline at end of file
+39 41 0c 1 1
+e4 04 31 24 1
+93 13 c4 16 1
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixCombiner.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixCombiner.java
index 74ef455..b66ee15 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixCombiner.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixCombiner.java
@@ -30,12 +30,12 @@
* This class implement the combiner operator of Mapreduce model
*/
public class GenomixCombiner extends MapReduceBase implements
- Reducer<BytesWritable, ValueWritable, BytesWritable, ValueWritable> {
+ Reducer<ValueBytesWritable, ValueWritable, ValueBytesWritable, ValueWritable> {
public ValueWritable vaWriter = new ValueWritable();
@Override
- public void reduce(BytesWritable key, Iterator<ValueWritable> values,
- OutputCollector<BytesWritable, ValueWritable> output, Reporter reporter) throws IOException {
+ public void reduce(ValueBytesWritable key, Iterator<ValueWritable> values,
+ OutputCollector<ValueBytesWritable, ValueWritable> output, Reporter reporter) throws IOException {
byte groupByAdjList = 0;
int count = 0;
byte bytCount = 0;
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixDriver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixDriver.java
index ad62ee9..71a675e 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixDriver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixDriver.java
@@ -21,12 +21,15 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.kohsuke.args4j.CmdLineParser;
@@ -66,15 +69,20 @@
conf.setReducerClass(GenomixReducer.class);
conf.setCombinerClass(GenomixCombiner.class);
- conf.setMapOutputKeyClass(BytesWritable.class);
+ conf.setMapOutputKeyClass(ValueBytesWritable.class);
conf.setMapOutputValueClass(ValueWritable.class);
conf.setInputFormat(TextInputFormat.class);
- conf.setOutputFormat(TextOutputFormat.class);
- conf.setOutputKeyClass(BytesWritable.class);
+// conf.setOutputFormat(TextOutputFormat.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ conf.setOutputKeyClass(ValueBytesWritable.class);
conf.setOutputValueClass(ValueWritable.class);
FileInputFormat.setInputPaths(conf, new Path(inputPath));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+// SequenceFileOutputFormat.setOutputPath(conf,new Path(outputPath));
+// SequenceFileOutputFormat.setCompressOutput(conf, true);
+// SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);
+// SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK);
conf.setNumReduceTasks(numReducers);
FileSystem dfs = FileSystem.get(conf);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
index d8d0ff8..7162c91 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
@@ -33,7 +33,8 @@
/**
* This class implement mapper operator of mapreduce model
*/
-public class GenomixMapper extends MapReduceBase implements Mapper<LongWritable, Text, BytesWritable, ValueWritable> {
+public class GenomixMapper extends MapReduceBase implements
+ Mapper<LongWritable, Text, ValueBytesWritable, ValueWritable> {
public class CurrenByte {
public byte curByte;
@@ -42,7 +43,7 @@
public static int KMER_SIZE;
public ValueWritable outputAdjList = new ValueWritable();
- public BytesWritable outputKmer = new BytesWritable();
+ public ValueBytesWritable outputKmer = new ValueBytesWritable();
@Override
public void configure(JobConf job) {
@@ -91,7 +92,7 @@
C 01000000 64
T 10000000 128*/
@Override
- public void map(LongWritable key, Text value, OutputCollector<BytesWritable, ValueWritable> output,
+ public void map(LongWritable key, Text value, OutputCollector<ValueBytesWritable, ValueWritable> output,
Reporter reporter) throws IOException {
/* A 00
G 01
@@ -119,7 +120,7 @@
byte kmerAdjList = 0;
byte initial;
if (i >= KMER_SIZE) {
- outputKmer.set(kmerValue, 0, size);
+ outputKmer.set(kmerValue, (byte) 0, (byte) size);
switch ((int) preMarker) {
case -1:
kmerAdjList = (byte) (kmerAdjList + 0);
@@ -253,7 +254,7 @@
break;
}
outputAdjList.set(kmerAdjList, count);
- outputKmer.set(kmerValue, 0, size);
+ outputKmer.set(kmerValue, (byte) 0, (byte) size);
output.collect(outputKmer, outputAdjList);
}
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixReducer.java
index ebf0df0..a9584f6 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixReducer.java
@@ -30,12 +30,12 @@
* This class implement reducer operator of mapreduce model
*/
public class GenomixReducer extends MapReduceBase implements
- Reducer<BytesWritable, ValueWritable, BytesWritable, ValueWritable> {
+ Reducer<ValueBytesWritable, ValueWritable, ValueBytesWritable, ValueWritable> {
ValueWritable valWriter = new ValueWritable();
@Override
- public void reduce(BytesWritable key, Iterator<ValueWritable> values,
- OutputCollector<BytesWritable, ValueWritable> output, Reporter reporter) throws IOException {
+ public void reduce(ValueBytesWritable key, Iterator<ValueWritable> values,
+ OutputCollector<ValueBytesWritable, ValueWritable> output, Reporter reporter) throws IOException {
byte groupByAdjList = 0;
int count = 0;
byte bytCount = 0;
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
index 49126d6..754f99d 100755
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
@@ -14,18 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
+import java.io.FileWriter;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Test;
import edu.uci.ics.utils.TestUtils;
@@ -43,6 +53,7 @@
private static final String RESULT_PATH = "/result2";
private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + RESULT_PATH + "/part-00000";
private static final String EXPECTED_PATH = "expected/result2";
+ private static final String TEST_SOURCE_DIR = "testactual/source.txt";
private MiniDFSCluster dfsCluster;
private MiniMRCluster mrCluster;
@@ -56,9 +67,23 @@
// run graph transformation tests
GenomixDriver tldriver = new GenomixDriver();
- tldriver.run(HDFS_PATH, RESULT_PATH, 2, 5, HADOOP_CONF_PATH);
+ tldriver.run(HDFS_PATH, RESULT_PATH, 2, 12, HADOOP_CONF_PATH);
+
+ SequenceFile.Reader reader = null;
+ Path path = new Path(RESULT_PATH + "/part-00000");
+ reader = new SequenceFile.Reader(dfs, path, conf);
+ ValueBytesWritable key = (ValueBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ ValueWritable value = (ValueWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ File filePathTo = new File(TEST_SOURCE_DIR);
+ BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+ while (reader.next(key, value)) {
+ bw.write(key + "\t" + value.toString());
+ bw.newLine();
+ }
+ bw.close();
+
dumpResult();
- TestUtils.compareWithResult(new File(DUMPED_RESULT), new File(EXPECTED_PATH));
+ TestUtils.compareWithResult(new File(TEST_SOURCE_DIR), new File(EXPECTED_PATH));
cleanupHadoop();