fix the issue of same recordoutputDescriptor
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2958 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
index f3efd45..dae1b6f 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
@@ -5,9 +5,8 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
/**
- * Aggregation sort: speed up
- * from hyracks
- *
+ * Aggregation sort: speed up from hyracks
+ *
*/
public class Integer64NormalizedKeyComputerFactory implements
INormalizedKeyComputerFactory {
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
index 54fb62b..d099fc0 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
@@ -29,7 +29,8 @@
int slotLength = accessor.getFieldSlotsLength();
ByteBuffer buf = accessor.getBuffer();
- long l = BufferSerDeUtils.getLong(buf.array(), startOffset + fieldOffset + slotLength);
+ long l = BufferSerDeUtils.getLong(buf.array(), startOffset
+ + fieldOffset + slotLength);
return (int) (l % nParts);
}
};
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
index ffa6fc6..c49d6ff 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
@@ -78,11 +78,11 @@
public int compareTo(byte[] bytes, int start, int length) {
int be = this.start;
-
- if(this.bytes[be] != bytes[start]){
+
+ if (this.bytes[be] != bytes[start]) {
return (this.bytes[be] < bytes[start]) ? -1 : 1;
}
-
+
int n = this.bytes[be];
int l = (int) Math.ceil(n / 4);
for (int i = l; i > 0; i--) {
@@ -99,19 +99,18 @@
public int hash() {// BKDRHash
int hash = 1;
for (int i = start + 1; i <= start + length; i++)
- hash = (31 * hash) + (int)bytes[i];
- if(hash < 0){
+ hash = (31 * hash) + (int) bytes[i];
+ if (hash < 0) {
hash = -hash;
}
- //System.err.println(hash);
+ // System.err.println(hash);
return hash;
-/* int seed = 131; // 31 131 1313 13131 131313 etc..
- int hash = 0;
- int l = (int) Math.ceil((double) bytes[start] / 4.0);
- for (int i = start + 1; i <= start + l; i++) {
- hash = hash * seed + bytes[i];
- }
- return (hash & 0x7FFFFFFF);*/
+ /*
+ * int seed = 131; // 31 131 1313 13131 131313 etc.. int hash = 0; int l
+ * = (int) Math.ceil((double) bytes[start] / 4.0); for (int i = start +
+ * 1; i <= start + l; i++) { hash = hash * seed + bytes[i]; } return
+ * (hash & 0x7FFFFFFF);
+ */
}
@Override
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
index ed114ff..d3de2ba 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -9,7 +9,7 @@
/**
* used by precluster groupby
- *
+ *
*/
public class ConnectorPolicyAssignmentPolicy implements
IConnectorPolicyAssignmentPolicy {
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
index 5576fd4..6951eef 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -72,8 +72,8 @@
private ArrayTupleBuilder tupleBuilder;
private ByteBuffer outputBuffer;
private FrameTupleAppender outputAppender;
-
- private byte []filter = new byte[4];
+
+ private byte[] filter = new byte[4];
@SuppressWarnings("resource")
@Override
@@ -83,9 +83,9 @@
outputBuffer = ctx.allocateFrame();
outputAppender = new FrameTupleAppender(ctx.getFrameSize());
outputAppender.reset(outputBuffer, true);
-
+
Kmer.initializeFilter(k, filter);
-
+
try {// one try with multiple catch?
writer.open();
String s = pathSurfix + String.valueOf(temp);
@@ -99,19 +99,19 @@
new InputStreamReader(
new FileInputStream(fa[i])));
String read = readsfile.readLine();
- //int count = 0;
+ // int count = 0;
while (read != null) {
read = readsfile.readLine();
- //if(count % 4 == 1)
+ // if(count % 4 == 1)
SplitReads(read.getBytes());
// read.getBytes();
read = readsfile.readLine();
-
+
read = readsfile.readLine();
read = readsfile.readLine();
- //count += 1;
- //System.err.println(count);
+ // count += 1;
+ // System.err.println(count);
}
}
if (outputAppender.getTupleCount() > 0) {
@@ -127,7 +127,6 @@
}
}
-
private void SplitReads(byte[] array) {
try {
byte[] bytes = null;
@@ -139,7 +138,7 @@
if (0 == i) {
bytes = Kmer.CompressKmer(k, array, i);
} else {
- Kmer.MoveKmer(k, bytes, array[i+k-1], filter);
+ Kmer.MoveKmer(k, bytes, array[i + k - 1], filter);
/*
* l <<= 2; l &= window; l |= ConvertSymbol(array[i
* + k - 1]);
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
index 6928f18..1370494 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
@@ -1,18 +1,16 @@
package edu.uci.ics.genomix.dataflow;
import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.ValueBytes;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.mapred.JobConf;
+import edu.uci.ics.genomix.type.KmerCountValue;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
@@ -23,64 +21,42 @@
private static final long serialVersionUID = 1L;
private ConfFactory confFactory;
- public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException{
- this.confFactory = new ConfFactory(conf);
+
+ public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
+ this.confFactory = new ConfFactory(conf);
}
-
- public class KMerCountValue implements ValueBytes{
- private ITupleReference tuple;
- public KMerCountValue(ITupleReference tuple) {
- this.tuple = tuple;
- }
- @Override
- public int getSize() {
- return tuple.getFieldLength(1) + tuple.getFieldLength(2);
- }
-
- @Override
- public void writeCompressedBytes(DataOutputStream arg0)
- throws IllegalArgumentException, IOException {
- for(int i=1; i<3; i++){
- arg0.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
- }
- }
-
- @Override
- public void writeUncompressedBytes(DataOutputStream arg0)
- throws IOException {
- for(int i=1; i<3; i++){
- arg0.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
- }
- }
-
- }
-
- public class TupleWriter implements ITupleWriter{
- public TupleWriter(ConfFactory cf){
+ public class TupleWriter implements ITupleWriter {
+ public TupleWriter(ConfFactory cf) {
this.cf = cf;
}
+
ConfFactory cf;
Writer writer = null;
+
/**
- * assumption is that output never change source!
+ * assumption is that output never change source!
*/
@Override
- public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ public void write(DataOutput output, ITupleReference tuple)
+ throws HyracksDataException {
try {
- if (writer == null){
- writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, BytesWritable.class, BytesWritable.class, CompressionType.NONE, null);
+ if (writer == null) {
+ writer = SequenceFile.createWriter(cf.getConf(),
+ (FSDataOutputStream) output, BytesWritable.class,
+ BytesWritable.class, CompressionType.NONE, null);
}
byte[] kmer = tuple.getFieldData(0);
int keyStart = tuple.getFieldStart(0);
int keyLength = tuple.getFieldLength(0);
- writer.appendRaw(kmer, keyStart, keyLength, new KMerCountValue(tuple));
+ writer.appendRaw(kmer, keyStart, keyLength, new KmerCountValue(
+ tuple));
} catch (IOException e) {
throw new HyracksDataException(e);
}
}
}
-
+
@Override
public ITupleWriter getTupleWriter() {
return new TupleWriter(confFactory);
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
index ebec17e..9f19858 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
@@ -3,7 +3,6 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import edu.uci.ics.genomix.type.Kmer;
@@ -19,23 +18,29 @@
*/
private static final long serialVersionUID = 1L;
private final int KMER;
- public KMerTextWriterFactory(int kmer){
+
+ public KMerTextWriterFactory(int kmer) {
KMER = kmer;
}
- public class TupleWriter implements ITupleWriter{
+ public class TupleWriter implements ITupleWriter {
@Override
- public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ public void write(DataOutput output, ITupleReference tuple)
+ throws HyracksDataException {
try {
- Text.writeString(output, Kmer.recoverKmerFrom(KMER,tuple.getFieldData(0),tuple.getFieldStart(0),tuple.getFieldLength(0)));
- Text.writeString(output,"\t");
- Text.writeString(output, Kmer.recoverAdjacent(tuple.getFieldData(1)[tuple.getFieldStart(1)]));
+ Text.writeString(output, Kmer.recoverKmerFrom(KMER,
+ tuple.getFieldData(0), tuple.getFieldStart(0),
+ tuple.getFieldLength(0)));
+ Text.writeString(output, "\t");
+ Text.writeString(output, Kmer.recoverAdjacent(tuple
+ .getFieldData(1)[tuple.getFieldStart(1)]));
Text.writeString(output, "\n");
} catch (IOException e) {
throw new HyracksDataException(e);
}
}
}
+
@Override
public ITupleWriter getTupleWriter() {
return new TupleWriter();
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
index b77720a..13ee200 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
@@ -10,7 +10,6 @@
public class KMerWriterFactory implements ITupleWriterFactory {
private static final long serialVersionUID = 1L;
-
@Override
public ITupleWriter getTupleWriter() {
return new ITupleWriter() {
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
index 6d0ef74..0dfc69f 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
@@ -29,19 +29,7 @@
public ReadsKeyValueParserFactory(int k) {
this.k = k;
byteNum = (byte) Math.ceil((double) k / 4.0);
- filter[0] = (byte) 0xC0;
- filter[1] = (byte) 0xFC;
- filter[2] = 0;
-
- int r = byteNum * 8 - 2 * k;
- r = 8 - r;
- for (int i = 0; i < r; i++) {
- filter[2] <<= 1;
- filter[2] |= 1;
- }
- for(int i = 0; i < r-1 ; i++){
- filter[3] <<= 1;
- }
+ Kmer.initializeFilter(k, filter);
}
@Override
@@ -83,13 +71,9 @@
for (int i = 0; i < array.length - k + 1; i++) {
if (0 == i) {
- bytes = Kmer.CompressKmer(k,array, i);
+ bytes = Kmer.CompressKmer(k, array, i);
} else {
- Kmer.MoveKmer(k,bytes, array[i + k - 1], filter);
- /*
- * l <<= 2; l &= window; l |= ConvertSymbol(array[i
- * + k - 1]);
- */
+ Kmer.MoveKmer(k, bytes, array[i + k - 1], filter);
pre = Kmer.GENE_CODE.getAdjBit(array[i - 1]);
}
if (i + k != array.length) {
@@ -102,17 +86,10 @@
r |= next;
tupleBuilder.reset();
-
- // tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE,
- // l);
- tupleBuilder.addField(bytes, 0, byteNum + 1);
+ tupleBuilder.addField(bytes, 0, byteNum + 1);
tupleBuilder.addField(
ByteSerializerDeserializer.INSTANCE, r);
- // int[] a = tupleBuilder.getFieldEndOffsets();
- // int b = tupleBuilder.getSize();
- // byte[] c = tupleBuilder.getByteArray();
-
if (!outputAppender.append(
tupleBuilder.getFieldEndOffsets(),
tupleBuilder.getByteArray(), 0,
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index d6498e6..f979070 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -46,313 +46,365 @@
public class Tester {
- private static final Logger LOGGER = Logger.getLogger(Tester.class.getName());
- public static final String NC1_ID = "nc1";
- public static final String NC2_ID = "nc2";
- public static final String NC3_ID = "nc3";
- public static final String NC4_ID = "nc4";
+ private static final Logger LOGGER = Logger.getLogger(Tester.class
+ .getName());
+ public static final String NC1_ID = "nc1";
+ public static final String NC2_ID = "nc2";
+ public static final String NC3_ID = "nc3";
+ public static final String NC4_ID = "nc4";
- private static ClusterControllerService cc;
- private static NodeControllerService nc1;
- private static NodeControllerService nc2;
- private static NodeControllerService nc3;
- private static NodeControllerService nc4;
- private static IHyracksClientConnection hcc;
+ private static ClusterControllerService cc;
+ private static NodeControllerService nc1;
+ private static NodeControllerService nc2;
+ private static NodeControllerService nc3;
+ private static NodeControllerService nc4;
+ private static IHyracksClientConnection hcc;
- //private static final boolean DEBUG = true;
+ // private static final boolean DEBUG = true;
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) throws Exception {
- LOGGER.setLevel(Level.OFF);
+ LOGGER.setLevel(Level.OFF);
- init();
+ init();
- // Options options = new Options();
+ // Options options = new Options();
- IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);
+ IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);
- /*
- * JobSpecification job =
- * createJob(parseFileSplits(options.inFileCustomerSplits),
- * parseFileSplits(options.inFileOrderSplits),
- * parseFileSplits(options.outFileSplits), options.numJoinPartitions,
- * options.algo, options.graceInputSize, options.graceRecordsPerFrame,
- * options.graceFactor, options.memSize, options.tableSize,
- * options.hasGroupBy);
- */
+ /*
+ * JobSpecification job =
+ * createJob(parseFileSplits(options.inFileCustomerSplits),
+ * parseFileSplits(options.inFileOrderSplits),
+ * parseFileSplits(options.outFileSplits), options.numJoinPartitions,
+ * options.algo, options.graceInputSize, options.graceRecordsPerFrame,
+ * options.graceFactor, options.memSize, options.tableSize,
+ * options.hasGroupBy);
+ */
- int k, page_num;
- String file_name = args[0];
- k = Integer.parseInt(args[1]);
- page_num = Integer.parseInt(args[2]);
- int type = Integer.parseInt(args[3]);
+ int k, page_num;
+ String file_name = args[0];
+ k = Integer.parseInt(args[1]);
+ page_num = Integer.parseInt(args[2]);
+ int type = Integer.parseInt(args[3]);
- JobSpecification job = createJob(file_name, k, page_num, type);
+ JobSpecification job = createJob(file_name, k, page_num, type);
- long start = System.currentTimeMillis();
- JobId jobId = hcc.startJob("test", job);
- hcc.waitForCompletion(jobId);
- long end = System.currentTimeMillis();
- System.err.println(start + " " + end + " " + (end - start));
-
- /*
+ long start = System.currentTimeMillis();
+ JobId jobId = hcc.startJob("test", job);
+ hcc.waitForCompletion(jobId);
+ long end = System.currentTimeMillis();
+ System.err.println(start + " " + end + " " + (end - start));
- String s = "g:\\data\\results.txt" ;
+ /*
+ *
+ * String s = "g:\\data\\results.txt" ;
+ *
+ * filenames = new FileOutputStream(s); // filenames = new
+ * FileInputStream("filename.txt");
+ *
+ * BufferedWriter writer = new BufferedWriter(new
+ * OutputStreamWriter(filenames)); writer.write((int) (end-start));
+ * writer.close();
+ */
- filenames = new FileOutputStream(s);
- // filenames = new FileInputStream("filename.txt");
+ }
- BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(filenames));
- writer.write((int) (end-start));
- writer.close();*/
-
- }
+ public static void init() throws Exception {
+ CCConfig ccConfig = new CCConfig();
+ ccConfig.clientNetIpAddress = "127.0.0.1";
+ ccConfig.clientNetPort = 39000;
+ ccConfig.clusterNetIpAddress = "127.0.0.1";
+ ccConfig.clusterNetPort = 39001;
+ ccConfig.profileDumpPeriod = -1;
+ File outDir = new File("target/ClusterController");
+ outDir.mkdirs();
+ File ccRoot = File.createTempFile(Tester.class.getName(), ".data",
+ outDir);
+ ccRoot.delete();
+ ccRoot.mkdir();
+ ccConfig.ccRoot = ccRoot.getAbsolutePath();
+ cc = new ClusterControllerService(ccConfig);
+ cc.start();
+ ccConfig.defaultMaxJobAttempts = 0;
- public static void init() throws Exception {
- CCConfig ccConfig = new CCConfig();
- ccConfig.clientNetIpAddress = "127.0.0.1";
- ccConfig.clientNetPort = 39000;
- ccConfig.clusterNetIpAddress = "127.0.0.1";
- ccConfig.clusterNetPort = 39001;
- ccConfig.profileDumpPeriod = -1;
- File outDir = new File("target/ClusterController");
- outDir.mkdirs();
- File ccRoot = File.createTempFile(Tester.class.getName(), ".data", outDir);
- ccRoot.delete();
- ccRoot.mkdir();
- ccConfig.ccRoot = ccRoot.getAbsolutePath();
- cc = new ClusterControllerService(ccConfig);
- cc.start();
- ccConfig.defaultMaxJobAttempts = 0;
+ NCConfig ncConfig1 = new NCConfig();
+ ncConfig1.ccHost = "localhost";
+ ncConfig1.ccPort = 39001;
+ ncConfig1.clusterNetIPAddress = "127.0.0.1";
+ ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.nodeId = NC1_ID;
+ nc1 = new NodeControllerService(ncConfig1);
+ nc1.start();
- NCConfig ncConfig1 = new NCConfig();
- ncConfig1.ccHost = "localhost";
- ncConfig1.ccPort = 39001;
- ncConfig1.clusterNetIPAddress = "127.0.0.1";
- ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.nodeId = NC1_ID;
- nc1 = new NodeControllerService(ncConfig1);
- nc1.start();
+ /*
+ * NCConfig ncConfig2 = new NCConfig(); ncConfig2.ccHost = "localhost";
+ * ncConfig2.ccPort = 39001; ncConfig2.clusterNetIPAddress =
+ * "127.0.0.1"; ncConfig2.dataIPAddress = "127.0.0.1"; ncConfig2.nodeId
+ * = NC2_ID; nc2 = new NodeControllerService(ncConfig2); nc2.start();
+ *
+ * NCConfig ncConfig3 = new NCConfig(); ncConfig3.ccHost = "localhost";
+ * ncConfig3.ccPort = 39001; ncConfig3.clusterNetIPAddress =
+ * "127.0.0.1"; ncConfig3.dataIPAddress = "127.0.0.1"; ncConfig3.nodeId
+ * = NC3_ID; nc3 = new NodeControllerService(ncConfig3); nc3.start();
+ *
+ * NCConfig ncConfig4 = new NCConfig(); ncConfig4.ccHost = "localhost";
+ * ncConfig4.ccPort = 39001; ncConfig4.clusterNetIPAddress =
+ * "127.0.0.1"; ncConfig4.dataIPAddress = "127.0.0.1"; ncConfig4.nodeId
+ * = NC4_ID; nc4 = new NodeControllerService(ncConfig4); nc4.start();
+ */
- /* NCConfig ncConfig2 = new NCConfig();
- ncConfig2.ccHost = "localhost";
- ncConfig2.ccPort = 39001;
- ncConfig2.clusterNetIPAddress = "127.0.0.1";
- ncConfig2.dataIPAddress = "127.0.0.1";
- ncConfig2.nodeId = NC2_ID;
- nc2 = new NodeControllerService(ncConfig2);
- nc2.start();
-
- NCConfig ncConfig3 = new NCConfig();
- ncConfig3.ccHost = "localhost";
- ncConfig3.ccPort = 39001;
- ncConfig3.clusterNetIPAddress = "127.0.0.1";
- ncConfig3.dataIPAddress = "127.0.0.1";
- ncConfig3.nodeId = NC3_ID;
- nc3 = new NodeControllerService(ncConfig3);
- nc3.start();
-
- NCConfig ncConfig4 = new NCConfig();
- ncConfig4.ccHost = "localhost";
- ncConfig4.ccPort = 39001;
- ncConfig4.clusterNetIPAddress = "127.0.0.1";
- ncConfig4.dataIPAddress = "127.0.0.1";
- ncConfig4.nodeId = NC4_ID;
- nc4 = new NodeControllerService(ncConfig4);
- nc4.start();*/
+ hcc = new HyracksConnection(ccConfig.clientNetIpAddress,
+ ccConfig.clientNetPort);
+ hcc.createApplication("test", null);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
+ }
+ }
- hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
- hcc.createApplication("test", null);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
- }
- }
+ private static JobSpecification createJob(String filename, int k,
+ int page_num, int type) throws HyracksDataException {
+ JobSpecification spec = new JobSpecification();
- private static JobSpecification createJob(String filename, int k, int page_num, int type) throws HyracksDataException {
- JobSpecification spec = new JobSpecification();
+ // spec.setFrameSize(32768);
+ spec.setFrameSize(32768);
- //spec.setFrameSize(32768);
- spec.setFrameSize(32768);
+ FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,
+ // NC1_ID, NC2_ID,NC3_ID,NC4_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,
+ NC1_ID);
- FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);
- //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID);
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] { null,
+ ByteSerializerDeserializer.INSTANCE,
+ ByteSerializerDeserializer.INSTANCE });
+ // Integer64SerializerDeserializer.INSTANCE,
+ // ByteSerializerDeserializer.INSTANCE,
+ // ByteSerializerDeserializer.INSTANCE });
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {null, ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE});
- //Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,
- //ByteSerializerDeserializer.INSTANCE });
+ int[] keyFields = new int[] { 0 };
+ int frameLimits = 4096; // hyracks oriented
+ int tableSize = 10485767; // hyracks oriented
- int[] keyFields = new int[] { 0 };
- int frameLimits = 4096; // hyracks oriented
- int tableSize = 10485767; // hyracks oriented
+ AbstractOperatorDescriptor single_grouper;
+ IConnectorDescriptor conn_partition;
+ AbstractOperatorDescriptor cross_grouper;
- AbstractOperatorDescriptor single_grouper;
- IConnectorDescriptor conn_partition;
- AbstractOperatorDescriptor cross_grouper;
+ if (0 == type) {// external group by
+ single_grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(VLongPointable.FACTORY) },
+ new VLongNormalizedKeyComputerFactory(),
+ new MergeKmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
-
- if(0 == type){//external group by
- single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
- new VLongNormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
+ new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(VLongPointable.FACTORY) }),
+ tableSize), true);
- new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- outputRec, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(VLongPointable.FACTORY) }), tableSize), true);
-
- conn_partition = new MToNPartitioningConnectorDescriptor(spec,
- new KmerHashPartitioncomputerFactory());
- cross_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
- new VLongNormalizedKeyComputerFactory(), new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
+ conn_partition = new MToNPartitioningConnectorDescriptor(spec,
+ new KmerHashPartitioncomputerFactory());
+ cross_grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(VLongPointable.FACTORY) },
+ new VLongNormalizedKeyComputerFactory(),
+ new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
- new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- outputRec, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(VLongPointable.FACTORY) }), tableSize), true);
- }
- else if( 1 == type){
- single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
- new VLongNormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- outputRec, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(VLongPointable.FACTORY) }), tableSize), true);
- conn_partition = new MToNPartitioningMergingConnectorDescriptor(spec, new KmerHashPartitioncomputerFactory(),
- keyFields, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY)} );
- cross_grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
- new DistributedMergeLmerAggregateFactory(),
- outputRec);
- }
- else{
- long inputSizeInRawRecords = 154000000;
- long inputSizeInUniqueKeys = 38500000;
- int recordSizeInBytes = 4;
- int hashfuncStartLevel = 1;
- single_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,
- frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
- new IBinaryHashFunctionFamily[] {new VLongBinaryHashFunctionFamily()},
- //new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
- hashfuncStartLevel,
- new VLongNormalizedKeyComputerFactory(),
- new MergeKmerAggregateFactory(),
- new DistributedMergeLmerAggregateFactory(),
- outputRec, true);
- conn_partition = new MToNPartitioningConnectorDescriptor(spec,
- new KmerHashPartitioncomputerFactory());
- recordSizeInBytes = 13;
- cross_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,
- frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
- new IBinaryHashFunctionFamily[] {new VLongBinaryHashFunctionFamily()},
- //new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
- hashfuncStartLevel,
- new VLongNormalizedKeyComputerFactory(),
- new DistributedMergeLmerAggregateFactory(),
- new DistributedMergeLmerAggregateFactory(),
- outputRec, true);
- }
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID);
- //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
-
- IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(spec);
- spec.connect(readfileConn, scan, 0, single_grouper, 0);
-
+ new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(VLongPointable.FACTORY) }),
+ tableSize), true);
+ } else if (1 == type) {
+ single_grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(VLongPointable.FACTORY) },
+ new VLongNormalizedKeyComputerFactory(),
+ new MergeKmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(VLongPointable.FACTORY) }),
+ tableSize), true);
+ conn_partition = new MToNPartitioningMergingConnectorDescriptor(
+ spec,
+ new KmerHashPartitioncomputerFactory(),
+ keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(VLongPointable.FACTORY) });
+ cross_grouper = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(VLongPointable.FACTORY) },
+ new DistributedMergeLmerAggregateFactory(), outputRec);
+ } else {
+ long inputSizeInRawRecords = 154000000;
+ long inputSizeInUniqueKeys = 38500000;
+ int recordSizeInBytes = 4;
+ int hashfuncStartLevel = 1;
+ single_grouper = new HybridHashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ inputSizeInRawRecords,
+ inputSizeInUniqueKeys,
+ recordSizeInBytes,
+ tableSize,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(VLongPointable.FACTORY) },
+ new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },
+ // new IBinaryHashFunctionFamily[]
+ // {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
+ hashfuncStartLevel,
+ new VLongNormalizedKeyComputerFactory(),
+ new MergeKmerAggregateFactory(),
+ new DistributedMergeLmerAggregateFactory(), outputRec, true);
+ conn_partition = new MToNPartitioningConnectorDescriptor(spec,
+ new KmerHashPartitioncomputerFactory());
+ recordSizeInBytes = 13;
+ cross_grouper = new HybridHashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ inputSizeInRawRecords,
+ inputSizeInUniqueKeys,
+ recordSizeInBytes,
+ tableSize,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(VLongPointable.FACTORY) },
+ new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },
+ // new IBinaryHashFunctionFamily[]
+ // {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
+ hashfuncStartLevel,
+ new VLongNormalizedKeyComputerFactory(),
+ new DistributedMergeLmerAggregateFactory(),
+ new DistributedMergeLmerAggregateFactory(), outputRec, true);
+ }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper,NC1_ID);
- //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
- spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ single_grouper, NC1_ID);
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ // single_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
- //PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec, "G:\\data\\result");
- //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+ IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
+ spec);
+ spec.connect(readfileConn, scan, 0, single_grouper, 0);
- IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);
- spec.connect(printConn, cross_grouper, 0, printer, 0);
- //spec.connect(readfileConn, scan, 0, printer, 0);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ cross_grouper, NC1_ID);
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ // cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
+ spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);
- spec.addRoot(printer);
+ // PrinterOperatorDescriptor printer = new
+ // PrinterOperatorDescriptor(spec);
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec,
+ "G:\\data\\result");
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ // printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC1_ID);
- if( 1 == type ){
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- }
- // System.out.println(spec.toString());
- return spec;
- }
+ IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(printConn, cross_grouper, 0, printer, 0);
+ // spec.connect(readfileConn, scan, 0, printer, 0);
+ spec.addRoot(printer);
- static class JoinComparatorFactory implements ITuplePairComparatorFactory {
- private static final long serialVersionUID = 1L;
+ if (1 == type) {
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ }
+ // System.out.println(spec.toString());
+ return spec;
+ }
- private final IBinaryComparatorFactory bFactory;
- private final int pos0;
- private final int pos1;
+ static class JoinComparatorFactory implements ITuplePairComparatorFactory {
+ private static final long serialVersionUID = 1L;
- public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
- this.bFactory = bFactory;
- this.pos0 = pos0;
- this.pos1 = pos1;
- }
+ private final IBinaryComparatorFactory bFactory;
+ private final int pos0;
+ private final int pos1;
- @Override
- public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
- return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
- }
- }
+ public JoinComparatorFactory(IBinaryComparatorFactory bFactory,
+ int pos0, int pos1) {
+ this.bFactory = bFactory;
+ this.pos0 = pos0;
+ this.pos1 = pos1;
+ }
- static class JoinComparator implements ITuplePairComparator {
+ @Override
+ public ITuplePairComparator createTuplePairComparator(
+ IHyracksTaskContext ctx) {
+ return new JoinComparator(bFactory.createBinaryComparator(), pos0,
+ pos1);
+ }
+ }
- private final IBinaryComparator bComparator;
- private final int field0;
- private final int field1;
+ static class JoinComparator implements ITuplePairComparator {
- public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
- this.bComparator = bComparator;
- this.field0 = field0;
- this.field1 = field1;
- }
+ private final IBinaryComparator bComparator;
+ private final int field0;
+ private final int field1;
- @Override
- public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
- int tStart0 = accessor0.getTupleStartOffset(tIndex0);
- int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+ public JoinComparator(IBinaryComparator bComparator, int field0,
+ int field1) {
+ this.bComparator = bComparator;
+ this.field0 = field0;
+ this.field1 = field1;
+ }
- int tStart1 = accessor1.getTupleStartOffset(tIndex1);
- int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+ @Override
+ public int compare(IFrameTupleAccessor accessor0, int tIndex0,
+ IFrameTupleAccessor accessor1, int tIndex1) {
+ int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+ int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
- int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
- int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
- int fLen0 = fEnd0 - fStart0;
+ int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+ int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
- int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
- int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
- int fLen1 = fEnd1 - fStart1;
+ int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
+ int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
+ int fLen0 = fEnd0 - fStart0;
- int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
- .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
- if (c != 0) {
- return c;
- }
- return 0;
- }
- }
+ int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
+ int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
+ int fLen1 = fEnd1 - fStart1;
+
+ int c = bComparator.compare(accessor0.getBuffer().array(), fStart0
+ + fStartOffset0, fLen0, accessor1.getBuffer().array(),
+ fStart1 + fStartOffset1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ return 0;
+ }
+ }
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
index d7ab7f7..ba61d18 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -16,7 +16,7 @@
/**
* sum
- *
+ *
*/
public class DistributedMergeLmerAggregateFactory implements
IAggregatorDescriptorFactory {
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
index 76b04d1..427ad4f 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -16,7 +16,7 @@
/**
* count
- *
+ *
*/
public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
@@ -100,7 +100,7 @@
+ stateAccessor.getFieldSlotsLength() + statefieldStart;
count += 1;
- if(count > max){
+ if (count > max) {
count = max;
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
index b2ffb41..12c72d8 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
@@ -7,7 +7,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -129,18 +128,16 @@
public static void main(String[] args) throws Exception {
GenomixJob job = new GenomixJob();
- String[] otherArgs = new GenericOptionsParser(job,
- args).getRemainingArgs();
+ String[] otherArgs = new GenericOptionsParser(job, args)
+ .getRemainingArgs();
if (otherArgs.length < 4) {
System.err.println("Need <serverIP> <port> <input> <output>");
System.exit(-1);
}
String ipAddress = otherArgs[0];
int port = Integer.parseInt(otherArgs[1]);
- int numOfDuplicate = job.getInt(
- CPARTITION_PER_MACHINE, 2);
- boolean bProfiling = job.getBoolean(IS_PROFILING,
- true);
+ int numOfDuplicate = job.getInt(CPARTITION_PER_MACHINE, 2);
+ boolean bProfiling = job.getBoolean(IS_PROFILING, true);
FileInputFormat.setInputPaths(new Job(job), otherArgs[2]);
FileOutputFormat.setOutputPath(new Job(job), new Path(otherArgs[3]));
Driver driver = new Driver(ipAddress, port, numOfDuplicate);
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
index e065b8e..55912c6 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
@@ -3,7 +3,6 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
public class GenomixJob extends Configuration {
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index b7e703e..a2e8860 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -15,12 +15,9 @@
import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.genomix.dataflow.KMerSequenceWriterFactory;
import edu.uci.ics.genomix.dataflow.KMerTextWriterFactory;
-import edu.uci.ics.genomix.dataflow.KMerWriterFactory;
import edu.uci.ics.genomix.dataflow.ReadsKeyValueParserFactory;
import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;
-import edu.uci.ics.genomix.job.GenomixJob;
-import edu.uci.ics.genomix.job.JobGen;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -34,7 +31,6 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
@@ -54,8 +50,9 @@
public enum GroupbyType {
EXTERNAL, PRECLUSTER, HYBRIDHASH,
}
- public enum OutputFormat{
- TEXT,BINARY,
+
+ public enum OutputFormat {
+ TEXT, BINARY,
}
JobConf job;
@@ -73,7 +70,8 @@
private AbstractOperatorDescriptor singleGrouper;
private IConnectorDescriptor connPartition;
private AbstractOperatorDescriptor crossGrouper;
- private RecordDescriptor outputRec;
+ private RecordDescriptor readOutputRec;
+ private RecordDescriptor combineOutputRec;
public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler,
final Map<String, NodeControllerInfo> ncMap,
@@ -103,7 +101,7 @@
new VLongNormalizedKeyComputerFactory(),
aggeragater,
new DistributedMergeLmerAggregateFactory(),
- outputRec,
+ combineOutputRec,
new HashSpillableTableFactory(
new FieldHashPartitionComputerFactory(
keyFields,
@@ -128,10 +126,10 @@
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
.of(VLongPointable.FACTORY) },
new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },
- hashfuncStartLevel,
- new VLongNormalizedKeyComputerFactory(),
+ hashfuncStartLevel, new VLongNormalizedKeyComputerFactory(),
new MergeKmerAggregateFactory(),
- new DistributedMergeLmerAggregateFactory(), outputRec, true);
+ new DistributedMergeLmerAggregateFactory(), combineOutputRec,
+ true);
}
private void generateDescriptorbyType(JobSpecification jobSpec)
@@ -161,7 +159,8 @@
keyFields,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
.of(VLongPointable.FACTORY) },
- new DistributedMergeLmerAggregateFactory(), outputRec);
+ new DistributedMergeLmerAggregateFactory(),
+ combineOutputRec);
break;
case HYBRIDHASH:
default:
@@ -192,14 +191,13 @@
public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec)
throws HyracksDataException {
try {
-
- InputSplit[] splits = job.getInputFormat().getSplits(
- job, ncNodeNames.length);
+
+ InputSplit[] splits = job.getInputFormat().getSplits(job,
+ ncNodeNames.length);
String[] readSchedule = scheduler.getLocationConstraints(splits);
- return new HDFSReadOperatorDescriptor(jobSpec, outputRec,
- job, splits, readSchedule,
- new ReadsKeyValueParserFactory(kmers));
+ return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job,
+ splits, readSchedule, new ReadsKeyValueParserFactory(kmers));
} catch (Exception e) {
throw new HyracksDataException(e);
}
@@ -209,9 +207,12 @@
public JobSpecification generateJob() throws HyracksException {
JobSpecification jobSpec = new JobSpecification();
- outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ null, ByteSerializerDeserializer.INSTANCE });
+ combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
null, ByteSerializerDeserializer.INSTANCE,
ByteSerializerDeserializer.INSTANCE });
+
// File input
HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
@@ -232,11 +233,12 @@
// Output
ITupleWriterFactory writer = null;
- switch (outputFormat){
+ switch (outputFormat) {
case TEXT:
writer = new KMerTextWriterFactory(kmers);
break;
- case BINARY: default:
+ case BINARY:
+ default:
writer = new KMerSequenceWriterFactory(job);
break;
}
@@ -248,9 +250,7 @@
IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(
jobSpec);
-// jobSpec.connect(printConn, crossGrouper, 0, writeOperator, 0);
- jobSpec.connect(printConn, readOperator, 0, writeOperator, 0);
-// jobSpec.addRoot(readOperator);
+ jobSpec.connect(printConn, crossGrouper, 0, writeOperator, 0);
jobSpec.addRoot(writeOperator);
if (groupbyType == GroupbyType.PRECLUSTER) {
@@ -261,7 +261,7 @@
@Override
protected void initJobConfiguration() {
-
+
kmers = conf.getInt(GenomixJob.KMER_LENGTH, 25);
frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, 4096);
tableSize = conf.getInt(GenomixJob.TABLE_SIZE, 10485767);
@@ -274,11 +274,11 @@
} else {
groupbyType = GroupbyType.HYBRIDHASH;
}
-
+
String output = conf.get(GenomixJob.OUTPUT_FORMAT, "binary");
- if (output.equalsIgnoreCase("binary")){
+ if (output.equalsIgnoreCase("binary")) {
outputFormat = OutputFormat.BINARY;
- } else if ( output.equalsIgnoreCase("text")){
+ } else if (output.equalsIgnoreCase("text")) {
outputFormat = OutputFormat.TEXT;
} else {
outputFormat = OutputFormat.TEXT;
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/Kmer.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/Kmer.java
index 6570509..0281ebc 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/Kmer.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/Kmer.java
@@ -6,122 +6,129 @@
import org.apache.hadoop.io.Writable;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+public class Kmer {
-public class Kmer implements Writable {
-
- public final static byte[] GENE_SYMBOL = {'A','C','G','T'};
- public final static class GENE_CODE{
-
- public static final byte A=0;
- public static final byte C=1;
- public static final byte G=2;
- public static final byte T=3;
-
- public static byte getCodeFromSymbol(byte ch){
+ public final static byte[] GENE_SYMBOL = { 'A', 'C', 'G', 'T' };
+
+ public final static class GENE_CODE {
+
+ /**
+ * make sure this 4 ids equal to the sequence id of char in {@GENE_SYMBOL}
+ */
+ public static final byte A = 0;
+ public static final byte C = 1;
+ public static final byte G = 2;
+ public static final byte T = 3;
+
+ public static byte getCodeFromSymbol(byte ch) {
byte r = 0;
switch (ch) {
- case 'A':case 'a':
+ case 'A':
+ case 'a':
r = A;
break;
- case 'C':case 'c':
+ case 'C':
+ case 'c':
r = C;
break;
- case 'G':case 'g':
+ case 'G':
+ case 'g':
r = G;
break;
- case 'T':case 't':
+ case 'T':
+ case 't':
r = T;
break;
}
return r;
}
-
- public static byte getSymbolFromCode(byte code){
- if (code > 3){
+
+ public static byte getSymbolFromCode(byte code) {
+ if (code > 3) {
return '!';
}
return GENE_SYMBOL[code];
}
-
+
public static byte getAdjBit(byte t) {
byte r = 0;
switch (t) {
- case 'A':case 'a':
+ case 'A':
+ case 'a':
r = 1 << A;
break;
- case 'C':case 'c':
+ case 'C':
+ case 'c':
r = 1 << C;
break;
- case 'G':case 'g':
+ case 'G':
+ case 'g':
r = 1 << G;
break;
- case 'T':case 't':
+ case 'T':
+ case 't':
r = 1 << T;
break;
}
return r;
}
- }
-
- public static final byte LOWBITMASK = 0x03;
-
- public static String recoverKmerFrom(int k, byte [] keyData, int keyStart, int keyLength ) {
- StringBuilder sblder = new StringBuilder();
-
- int outKmer = 0;
- for ( int i = keyLength-1; i>=0; i--){
- byte last = keyData[keyStart + i];
- for( int j = 0; j < 8; j+=2){
- byte kmer = (byte) ((last >>j) & LOWBITMASK);
- sblder.append((char)GENE_CODE.getSymbolFromCode(kmer));
- if ( ++outKmer > k){
- break;
+
+ public static String getSymbolFromBitMap(byte code) {
+ int left = (code >> 4) & 0x0F;
+ int right = code & 0x0F;
+ String str = new String();
+ for(int i = A; i <= T ; i++){
+ if ( (left & (1<<i)) != 0){
+ str += GENE_SYMBOL[i];
}
}
- if(outKmer >k){
- break;
+ str += '|';
+ for(int i = A; i <= T ; i++){
+ if ( (right & (1<<i)) != 0){
+ str += GENE_SYMBOL[i];
+ }
}
+ return str;
}
- return sblder.toString();
}
-
- public static String recoverAdjacent(byte number){
+
+ public static String recoverKmerFrom(int k, byte[] keyData, int keyStart,
+ int keyLength) {
+ byte kmer = keyData[keyStart];
+
+ String sblder = String.valueOf((int) kmer) + " ";
+ for (int i = keyStart + 1; i < keyStart + keyLength; i++) {
+ byte genecode = keyData[i];
+ sblder += String.valueOf((int) genecode) + " ";
+ }
+ return sblder;
+ }
+
+ public static String recoverAdjacent(byte number) {
int incoming = (number & 0xF0) >> 4;
int outgoing = number & 0x0F;
return String.valueOf(incoming) + '|' + String.valueOf(outgoing);
}
-
- @Override
- public void readFields(DataInput arg0) throws IOException {
- // TODO Auto-generated method stub
- }
- @Override
- public void write(DataOutput arg0) throws IOException {
- // TODO Auto-generated method stub
-
- }
-
- public static void initializeFilter(int k, byte []filter){
+ public static void initializeFilter(int k, byte[] filter) {
filter[0] = (byte) 0xC0;
filter[1] = (byte) 0xFC;
filter[2] = 0;
filter[3] = 3;
final int byteNum = (byte) Math.ceil((double) k / 4.0);
-
+
int r = byteNum * 8 - 2 * k;
r = 8 - r;
for (int i = 0; i < r; i++) {
filter[2] <<= 1;
filter[2] |= 1;
}
- for(int i = 0; i < r-1 ; i++){
+ for (int i = 0; i < r - 1; i++) {
filter[3] <<= 1;
}
}
-
+
public static byte[] CompressKmer(int k, byte[] array, int start) {
final int byteNum = (byte) Math.ceil((double) k / 4.0);
byte[] bytes = new byte[byteNum + 1];
@@ -131,26 +138,27 @@
int count = 0;
int bcount = 0;
- for (int i = start; i < start+k ; i++) {
- l = (byte) ((l<<2) & 0xFC);
+ for (int i = start; i < start + k; i++) {
+ l = (byte) ((l << 2) & 0xFC);
l |= GENE_CODE.getCodeFromSymbol(array[i]);
count += 2;
if (count % 8 == 0 && byteNum - bcount > 1) {
- bytes[byteNum-bcount] = l;
+ bytes[byteNum - bcount] = l;
bcount += 1;
count = 0;
l = 0;
}
- if (byteNum - bcount <= 1){
+ if (byteNum - bcount <= 1) {
break;
}
}
bytes[1] = l;
return bytes;
}
-
+
public static void MoveKmer(int k, byte[] bytes, byte c, byte filter[]) {
- int i = (byte) Math.ceil((double) k / 4.0);;
+ int i = (byte) Math.ceil((double) k / 4.0);
+ ;
bytes[i] <<= 2;
bytes[i] &= filter[1];
i -= 1;
@@ -163,11 +171,10 @@
bytes[i] &= filter[1];
i -= 1;
}
- bytes[2] |= (byte) (bytes[1]&filter[3]);
- bytes[1] <<=2;
+ bytes[2] |= (byte) (bytes[1] & filter[3]);
+ bytes[1] <<= 2;
bytes[1] &= filter[2];
bytes[1] |= GENE_CODE.getCodeFromSymbol(c);
}
-
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java
new file mode 100644
index 0000000..e943798
--- /dev/null
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java
@@ -0,0 +1,58 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.SequenceFile.ValueBytes;
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class KmerCountValue implements ValueBytes, Writable{
+ private byte adjBitMap;
+ private byte count;
+
+ public KmerCountValue(ITupleReference tuple) {
+ adjBitMap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
+ count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
+ }
+
+ @Override
+ public int getSize() {
+ return 2;
+ }
+
+ @Override
+ public void writeCompressedBytes(DataOutputStream arg0)
+ throws IllegalArgumentException, IOException {
+ arg0.writeByte(adjBitMap);
+ arg0.writeByte(count);
+ }
+
+ @Override
+ public void writeUncompressedBytes(DataOutputStream arg0)
+ throws IOException {
+ arg0.writeByte(adjBitMap);
+ arg0.writeByte(count);
+ }
+
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+ adjBitMap = arg0.readByte();
+ count = arg0.readByte();
+ }
+
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+ arg0.writeByte(adjBitMap);
+ arg0.writeByte(count);
+ }
+
+ @Override
+ public String toString() {
+ return Kmer.GENE_CODE.getSymbolFromBitMap(adjBitMap) + '\t' + String.valueOf(count);
+ }
+
+}
\ No newline at end of file
diff --git a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
index a6ff826..788ebca 100644
--- a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
+++ b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
@@ -1,25 +1,26 @@
package edu.uci.ics.genomix.example.jobrun;
+import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
+import java.io.FileWriter;
import java.io.IOException;
import junit.framework.Assert;
-import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -27,30 +28,9 @@
import edu.uci.ics.genomix.driver.Driver;
import edu.uci.ics.genomix.driver.Driver.Plan;
import edu.uci.ics.genomix.job.GenomixJob;
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
-import edu.uci.ics.hyracks.hdfs.lib.RawBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.hdfs.lib.TextKeyValueParserFactory;
-import edu.uci.ics.hyracks.hdfs.lib.TextTupleWriterFactory;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+import edu.uci.ics.genomix.type.KmerCountValue;
import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
import edu.uci.ics.hyracks.hdfs.utils.TestUtils;
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
public class JobRunTestCase {
private static final String ACTUAL_RESULT_DIR = "actual";
@@ -60,8 +40,10 @@
private static final String HDFS_INPUT_PATH = "/webmap";
private static final String HDFS_OUTPUT_PATH = "/webmap_result/";
- private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
- private static final String EXPECTED_PATH = "src/test/resources/expected/result2";
+ private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR
+ + HDFS_OUTPUT_PATH + "/merged.txt";
+ private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
+ private static final String EXPECTED_PATH = "src/test/resources/expected/result2";
private static final String HYRACKS_APP_NAME = "genomix";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
@@ -122,14 +104,14 @@
confOutput.flush();
confOutput.close();
}
-
- private void cleanUpReEntry() throws IOException{
+
+ private void cleanUpReEntry() throws IOException {
FileSystem lfs = FileSystem.getLocal(new Configuration());
- if (lfs.exists(new Path(DUMPED_RESULT))){
+ if (lfs.exists(new Path(DUMPED_RESULT))) {
lfs.delete(new Path(DUMPED_RESULT), true);
}
FileSystem dfs = FileSystem.get(conf);
- if (dfs.exists(new Path(HDFS_OUTPUT_PATH))){
+ if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
}
}
@@ -138,12 +120,12 @@
public void TestExternalGroupby() throws Exception {
cleanUpReEntry();
conf.set(GenomixJob.GROUPBY_TYPE, "external");
- conf.set(GenomixJob.OUTPUT_FORMAT, "text");
+ conf.set(GenomixJob.OUTPUT_FORMAT, "binary");
driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults());
}
- //@Test
+ // @Test
public void TestPreClusterGroupby() throws Exception {
cleanUpReEntry();
conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
@@ -151,19 +133,36 @@
Assert.assertEquals(true, checkResults());
}
- //@Test
+ // @Test
public void TestHybridGroupby() throws Exception {
cleanUpReEntry();
conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
- conf.set(GenomixJob.OUTPUT_FORMAT, "text");
+ conf.set(GenomixJob.OUTPUT_FORMAT, "binary");
driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults());
}
private boolean checkResults() throws Exception {
- FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH), FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
- TestUtils.compareWithResult(new File(EXPECTED_PATH
- ), new File(DUMPED_RESULT));
+ FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
+ FileSystem.getLocal(new Configuration()), new Path(
+ DUMPED_RESULT), false, conf, null);
+
+ SequenceFile.Reader reader = null;
+ Path path = new Path(DUMPED_RESULT);
+ FileSystem dfs = FileSystem.get(conf);
+ reader = new SequenceFile.Reader(dfs, path, conf);
+ BytesWritable key = (BytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ File filePathTo = new File(CONVERT_RESULT);
+ BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+ while (reader.next(key, value)) {
+ bw.write(key + "\t" + value.toString());
+ bw.newLine();
+ }
+ bw.close();
+
+ TestUtils.compareWithResult(new File(EXPECTED_PATH), new File(
+ DUMPED_RESULT));
return true;
}