fix sequecefile.reader problem
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3040 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java
index 08c283d..7ace9b0 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java
@@ -2,14 +2,12 @@
import java.io.DataInput;
import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.IOException;
-import org.apache.hadoop.io.SequenceFile.ValueBytes;
import org.apache.hadoop.io.Writable;
-public class KmerCountValue implements ValueBytes, Writable{
+public class KmerCountValue implements Writable{
private byte adjBitMap;
private byte count;
@@ -23,25 +21,6 @@
}
@Override
- public int getSize() {
- return 2;
- }
-
- @Override
- public void writeCompressedBytes(DataOutputStream arg0)
- throws IllegalArgumentException, IOException {
- arg0.writeByte(adjBitMap);
- arg0.writeByte(count);
- }
-
- @Override
- public void writeUncompressedBytes(DataOutputStream arg0)
- throws IOException {
- arg0.writeByte(adjBitMap);
- arg0.writeByte(count);
- }
-
- @Override
public void readFields(DataInput arg0) throws IOException {
adjBitMap = arg0.readByte();
count = arg0.readByte();
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
index 66ce324..02be7d8 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
@@ -10,7 +10,6 @@
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.mapred.JobConf;
-import edu.uci.ics.genomix.dataflow.util.NonSyncWriter;
import edu.uci.ics.genomix.type.KmerCountValue;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -35,9 +34,9 @@
ConfFactory cf;
Writer writer = null;
-// NonSyncWriter writer = null;
KmerCountValue reEnterCount = new KmerCountValue();
+ BytesWritable reEnterKey = new BytesWritable();
/**
* assumption is that output never change source!
*/
@@ -46,7 +45,6 @@
throws HyracksDataException {
try {
if (writer == null) {
-// writer = new NonSyncWriter((FSDataOutputStream) output);
writer = SequenceFile.createWriter(cf.getConf(),
(FSDataOutputStream) output, BytesWritable.class,
KmerCountValue.class, CompressionType.NONE, null);
@@ -58,7 +56,10 @@
byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
reEnterCount.reset(bitmap, count);
- writer.appendRaw(kmer, keyStart, keyLength, reEnterCount);
+ reEnterKey.set(kmer, keyStart, keyLength);
+ writer.append(reEnterKey, reEnterCount);
+ // @mark: this method can not used for read in hadoop 0.20.2.
+ //writer.appendRaw(kmer, keyStart, keyLength, reEnterCount);
} catch (IOException e) {
throw new HyracksDataException(e);
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index 95562a6..fd8af39 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -79,6 +79,14 @@
private int recordSizeInBytes;
private int hashfuncStartLevel;
+ private void logDebug(String status){
+ String names = "";
+ for (String str : ncNodeNames){
+ names += str + " ";
+ }
+ LOG.info(status + " nc nodes:" + ncNodeNames.length + " " + names);
+ }
+
public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler,
final Map<String, NodeControllerInfo> ncMap,
int numPartitionPerMachine) {
@@ -91,7 +99,7 @@
System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length,
nodes.length);
}
- LOG.info("nc nodes:" + ncNodeNames.length + " " + ncNodeNames.toString());
+ logDebug("initialize");
}
private ExternalGroupOperatorDescriptor newExternalGroupby(
@@ -192,6 +200,11 @@
LOG.info("HDFS read into " + splits.length + " splits");
String[] readSchedule = scheduler.getLocationConstraints(splits);
+ String log = "";
+ for (String schedule: readSchedule){
+ log += schedule + " ";
+ }
+ LOG.info("HDFS read schedule " + log);
return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job,
splits, readSchedule, new ReadsKeyValueParserFactory(kmers));
} catch (Exception e) {
@@ -213,10 +226,12 @@
// File input
HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+ logDebug("Read Operator");
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
readOperator, ncNodeNames);
generateDescriptorbyType(jobSpec);
+ logDebug("SingleGroupby Operator");
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
singleGrouper, ncNodeNames);
@@ -224,6 +239,7 @@
jobSpec);
jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
+ logDebug("CrossGrouper Operator");
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
crossGrouper, ncNodeNames);
jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
@@ -242,6 +258,7 @@
HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(
jobSpec, job, writer);
+ logDebug("WriteOperator");
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
writeOperator, ncNodeNames);
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
index 3a37087..8464684 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
@@ -17,7 +17,6 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
@@ -55,7 +54,7 @@
private JobConf conf = new JobConf();
private int numberOfNC = 2;
- private int numPartitionPerMachine = 2;
+ private int numPartitionPerMachine = 1;
private Driver driver;
@@ -122,13 +121,13 @@
@Test
public void TestExternalGroupby() throws Exception {
conf.set(GenomixJob.GROUPBY_TYPE, "external");
- conf.set(GenomixJob.OUTPUT_FORMAT, "text");
+ conf.set(GenomixJob.OUTPUT_FORMAT, "binary");
System.err.println("Testing ExternalGroupBy");
driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults());
}
- //@Test
+ @Test
public void TestPreClusterGroupby() throws Exception {
conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
conf.set(GenomixJob.OUTPUT_FORMAT, "text");
@@ -147,29 +146,60 @@
}
private boolean checkResults() throws Exception {
- FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
- FileSystem.getLocal(new Configuration()), new Path(
- DUMPED_RESULT), false, conf, null);
- File dumped = new File( DUMPED_RESULT);
- String format = conf.get(GenomixJob.OUTPUT_FORMAT);
- if( !"text".equalsIgnoreCase(format)){
- SequenceFile.Reader reader = null;
- Path path = new Path(HDFS_OUTPUT_FILE);
- FileSystem dfs = FileSystem.get(conf);
- reader = new SequenceFile.Reader(dfs, path, conf);
- BytesWritable key = (BytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- File filePathTo = new File(CONVERT_RESULT);
- BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
- int k = conf.getInt(GenomixJob.KMER_LENGTH, 25);
- while (reader.next(key, value)) {
- bw.write(Kmer.recoverKmerFrom(k, key.getBytes(), 0, key.getLength()) + "\t" + value.toString());
- bw.newLine();
- }
- bw.close();
- dumped = new File(CONVERT_RESULT);
+ File dumped = null;
+ String format = conf.get(GenomixJob.OUTPUT_FORMAT);
+ if ("text".equalsIgnoreCase(format)) {
+ FileUtil.copyMerge(FileSystem.get(conf),
+ new Path(HDFS_OUTPUT_PATH), FileSystem
+ .getLocal(new Configuration()), new Path(
+ DUMPED_RESULT), false, conf, null);
+ dumped = new File(DUMPED_RESULT);
+ } else {
+
+ FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR
+ + HDFS_OUTPUT_PATH));
+ File filePathTo = new File(CONVERT_RESULT);
+ BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+ for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
+ String partname = "/part-" + i;
+// FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
+// + partname), FileSystem.getLocal(new Configuration()),
+// new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname), false, conf);
+
+
+ Path path = new Path(HDFS_OUTPUT_PATH
+ + partname);
+ FileSystem dfs = FileSystem.get(conf);
+ if (dfs.getFileStatus(path).getLen() == 0){
+ continue;
+ }
+ SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path,
+ conf);
+ BytesWritable key = (BytesWritable) ReflectionUtils
+ .newInstance(reader.getKeyClass(), conf);
+ KmerCountValue value = (KmerCountValue) ReflectionUtils
+ .newInstance(reader.getValueClass(), conf);
+
+ int k = conf.getInt(GenomixJob.KMER_LENGTH, 25);
+ while (reader.next(key, value)) {
+ if (key == null || value == null){
+ break;
+ }
+ bw.write(Kmer.recoverKmerFrom(k, key.getBytes(), 0,
+ key.getLength())
+ + "\t" + value.toString());
+ System.out.println(Kmer.recoverKmerFrom(k, key.getBytes(), 0,
+ key.getLength())
+ + "\t" + value.toString());
+ bw.newLine();
+ }
+ reader.close();
+
+ }
+ bw.close();
+ dumped = new File(CONVERT_RESULT);
}
-
+
TestUtils.compareWithSortedResult(new File(EXPECTED_PATH), dumped);
return true;
}