Merge branch 'fullstack_genomix' of https://code.google.com/p/hyracks into fullstack_genomix
Conflicts:
genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
index 1ed6f80..5be5f83 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.type;
public class GeneCode {
@@ -86,6 +101,10 @@
return -1;
}
+ public static byte getBitMapFromGeneCode(byte t) {
+ return (byte) (1 << t);
+ }
+
public static int countNumberOfBitSet(int i) {
int c = 0;
for (; i != 0; c++) {
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 0171865..1f790eb 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -12,6 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package edu.uci.ics.genomix.type;
import java.io.DataInput;
@@ -46,13 +47,13 @@
public KmerBytesWritable(int k, byte[] storage) {
this.kmerlength = k;
- if (k > 0){
+ if (k > 0) {
this.size = KmerUtil.getByteNumFromK(kmerlength);
this.bytes = storage;
- if (this.bytes.length < size){
+ if (this.bytes.length < size) {
throw new ArrayIndexOutOfBoundsException("Storage is smaller than required space for kmerlength:k");
}
- }else{
+ } else {
this.bytes = storage;
this.size = 0;
}
@@ -182,7 +183,11 @@
int pos = ((kmerlength - 1) % 4) << 1;
byte code = (byte) (c << pos);
bytes[0] = (byte) (((bytes[0] >>> 2) & 0x3f) | code);
+<<<<<<< HEAD
return (byte) output;
+=======
+ return output;
+>>>>>>> 230f23a445dc7904a45bc535ebd73c3f4c619081
}
/**
@@ -215,7 +220,11 @@
bytes[0] &= (1 << ((kmerlength % 4) << 1)) - 1;
}
bytes[size - 1] = (byte) ((bytes[size - 1] << 2) | c);
+<<<<<<< HEAD
return (byte) output;
+=======
+ return output;
+>>>>>>> 230f23a445dc7904a45bc535ebd73c3f4c619081
}
public void set(KmerBytesWritable newData) {
@@ -248,7 +257,7 @@
@Override
public int hashCode() {
- return super.hashCode() * this.kmerlength;
+ return super.hashCode() * 31 + this.kmerlength;
}
@Override
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java
index 59cc3b0..68dec30 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.type;
public class KmerUtil {
@@ -19,7 +34,7 @@
public static String recoverKmerFrom(int k, byte[] keyData, int keyStart, int keyLength) {
StringBuilder strKmer = new StringBuilder();
int byteId = keyStart + keyLength - 1;
- if (byteId < 0){
+ if (byteId < 0) {
return empty;
}
byte currentbyte = keyData[byteId];
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
index a66d408..fc63fd8 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.type;
public class VKmerBytesWritable extends KmerBytesWritable {
@@ -10,8 +25,8 @@
public VKmerBytesWritable() {
super();
}
-
- public VKmerBytesWritable(int k, byte[] storage){
+
+ public VKmerBytesWritable(int k, byte[] storage) {
super(k, storage);
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
index ab1e633..9bd6acb 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.type;
public class VKmerBytesWritableFactory {
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
index ee806cc..7aa05f5 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
@@ -1,15 +1,29 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.type.old;
@Deprecated
public class Kmer {
-
+
public final static byte[] GENE_SYMBOL = { 'A', 'C', 'G', 'T' };
public final static class GENE_CODE {
/**
- * make sure this 4 ids equal to the sequence id of char in
- * {@GENE_SYMBOL}
+ * make sure this 4 ids equal to the sequence id of char in {@GENE_SYMBOL}
*/
public static final byte A = 0;
public static final byte C = 1;
@@ -19,22 +33,22 @@
public static byte getCodeFromSymbol(byte ch) {
byte r = 0;
switch (ch) {
- case 'A':
- case 'a':
- r = A;
- break;
- case 'C':
- case 'c':
- r = C;
- break;
- case 'G':
- case 'g':
- r = G;
- break;
- case 'T':
- case 't':
- r = T;
- break;
+ case 'A':
+ case 'a':
+ r = A;
+ break;
+ case 'C':
+ case 'c':
+ r = C;
+ break;
+ case 'G':
+ case 'g':
+ r = G;
+ break;
+ case 'T':
+ case 't':
+ r = T;
+ break;
}
return r;
}
@@ -49,42 +63,44 @@
public static byte getAdjBit(byte t) {
byte r = 0;
switch (t) {
- case 'A':
- case 'a':
- r = 1 << A;
- break;
- case 'C':
- case 'c':
- r = 1 << C;
- break;
- case 'G':
- case 'g':
- r = 1 << G;
- break;
- case 'T':
- case 't':
- r = 1 << T;
- break;
+ case 'A':
+ case 'a':
+ r = 1 << A;
+ break;
+ case 'C':
+ case 'c':
+ r = 1 << C;
+ break;
+ case 'G':
+ case 'g':
+ r = 1 << G;
+ break;
+ case 'T':
+ case 't':
+ r = 1 << T;
+ break;
}
return r;
}
- /**
- * It works for path merge.
+ /**
+ * It works for path merge.
* Merge the kmer by his next, we need to make sure the @{t} is a single neighbor.
- * @param t the neighbor code in BitMap
- * @return the genecode
+ *
+ * @param t
+ * the neighbor code in BitMap
+ * @return the genecode
*/
public static byte getGeneCodeFromBitMap(byte t) {
switch (t) {
- case 1 << A:
- return A;
- case 1 << C:
- return C;
- case 1 << G:
- return G;
- case 1 << T:
- return T;
+ case 1 << A:
+ return A;
+ case 1 << C:
+ return C;
+ case 1 << G:
+ return G;
+ case 1 << T:
+ return T;
}
return -1;
}
@@ -112,8 +128,7 @@
}
}
- public static String recoverKmerFrom(int k, byte[] keyData, int keyStart,
- int keyLength) {
+ public static String recoverKmerFrom(int k, byte[] keyData, int keyStart, int keyLength) {
StringBuilder strKmer = new StringBuilder();
int byteId = keyStart + keyLength - 1;
byte currentbyte = keyData[byteId];
@@ -277,8 +292,7 @@
if (k % 4 != 0) {
kmer[0] &= (1 << ((k % 4) << 1)) - 1;
}
- kmer[kmer.length - 1] = (byte) ((kmer[kmer.length - 1] << 2) | GENE_CODE
- .getCodeFromSymbol(c));
+ kmer[kmer.length - 1] = (byte) ((kmer[kmer.length - 1] << 2) | GENE_CODE.getCodeFromSymbol(c));
return (byte) (1 << output);
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
index dd76427..7a96d2f 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
@@ -8,8 +8,7 @@
import org.apache.hadoop.io.WritableComparator;
@Deprecated
-public class KmerBytesWritable extends BinaryComparable implements
- WritableComparable<BinaryComparable> {
+public class KmerBytesWritable extends BinaryComparable implements WritableComparable<BinaryComparable> {
private static final int LENGTH_BYTES = 4;
private static final byte[] EMPTY_BYTES = {};
private byte size;
@@ -126,8 +125,7 @@
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2,
- s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
+ return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
}
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
index 7e9d2f3..a4b1bec 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
@@ -22,16 +22,16 @@
}
/**
- * Get last kmer from kmer-chain.
+ * Get last kmer from kmer-chain.
* e.g. kmerChain is AAGCTA, if k =5, it will
* return AGCTA
+ *
* @param k
* @param kInChain
* @param kmerChain
* @return LastKmer bytes array
*/
- public static byte[] getLastKmerFromChain(int k, int kInChain,
- byte[] kmerChain, int offset, int length) {
+ public static byte[] getLastKmerFromChain(int k, int kInChain, byte[] kmerChain, int offset, int length) {
if (k > kInChain) {
return null;
}
@@ -66,8 +66,7 @@
* @param kmerChain
* @return FirstKmer bytes array
*/
- public static byte[] getFirstKmerFromChain(int k, int kInChain,
- byte[] kmerChain, int offset, int length) {
+ public static byte[] getFirstKmerFromChain(int k, int kInChain, byte[] kmerChain, int offset, int length) {
if (k > kInChain) {
return null;
}
@@ -94,9 +93,13 @@
* Merge kmer with next neighbor in gene-code format.
* The k of new kmer will increase by 1
* e.g. AAGCT merge with A => AAGCTA
- * @param k :input k of kmer
- * @param kmer : input bytes of kmer
- * @param nextCode: next neighbor in gene-code format
+ *
+ * @param k
+ * :input k of kmer
+ * @param kmer
+ * : input bytes of kmer
+ * @param nextCode
+ * : next neighbor in gene-code format
* @return the merged Kmer, this K of this Kmer is k+1
*/
public static byte[] mergeKmerWithNextCode(int k, byte[] kmer, int offset, int length, byte nextCode) {
@@ -120,9 +123,13 @@
* Merge kmer with previous neighbor in gene-code format.
* The k of new kmer will increase by 1
* e.g. AAGCT merge with A => AAAGCT
- * @param k :input k of kmer
- * @param kmer : input bytes of kmer
- * @param preCode: next neighbor in gene-code format
+ *
+ * @param k
+ * :input k of kmer
+ * @param kmer
+ * : input bytes of kmer
+ * @param preCode
+ * : next neighbor in gene-code format
* @return the merged Kmer,this K of this Kmer is k+1
*/
public static byte[] mergeKmerWithPreCode(int k, byte[] kmer, int offset, int length, byte preCode) {
@@ -147,10 +154,15 @@
/**
* Merge two kmer to one kmer
* e.g. ACTA + ACCGT => ACTAACCGT
- * @param preK : previous k of kmer
- * @param kmerPre : bytes array of previous kmer
- * @param nextK : next k of kmer
- * @param kmerNext : bytes array of next kmer
+ *
+ * @param preK
+ * : previous k of kmer
+ * @param kmerPre
+ * : bytes array of previous kmer
+ * @param nextK
+ * : next k of kmer
+ * @param kmerNext
+ * : bytes array of next kmer
* @return merged kmer, the new k is @preK + @nextK
*/
public static byte[] mergeTwoKmer(int preK, byte[] kmerPre, int offsetPre, int lengthPre, int nextK,
@@ -161,7 +173,7 @@
for (; i <= lengthPre; i++) {
mergedKmer[byteNum - i] = kmerPre[offsetPre + lengthPre - i];
}
- if ( i > 1){
+ if (i > 1) {
i--;
}
if (preK % 4 == 0) {
@@ -172,52 +184,58 @@
int posNeedToMove = ((preK % 4) << 1);
mergedKmer[byteNum - i] |= kmerNext[offsetNext + lengthNext - 1] << posNeedToMove;
for (int j = 1; j < lengthNext; j++) {
- mergedKmer[byteNum - i - j] = (byte) (((kmerNext[offsetNext + lengthNext
- - j] & 0xff) >> (8 - posNeedToMove)) | (kmerNext[offsetNext + lengthNext
- - j - 1] << posNeedToMove));
+ mergedKmer[byteNum - i - j] = (byte) (((kmerNext[offsetNext + lengthNext - j] & 0xff) >> (8 - posNeedToMove)) | (kmerNext[offsetNext
+ + lengthNext - j - 1] << posNeedToMove));
}
- if ( (nextK % 4) * 2 + posNeedToMove > 8) {
+ if ((nextK % 4) * 2 + posNeedToMove > 8) {
mergedKmer[0] = (byte) (kmerNext[offsetNext] >> (8 - posNeedToMove));
}
}
return mergedKmer;
}
-
+
/**
* Safely shifted the kmer forward without change the input kmer
* e.g. AGCGC shift with T => GCGCT
- * @param k: kmer length
- * @param kmer: input kmer
- * @param afterCode: input genecode
+ *
+ * @param k
+ * : kmer length
+ * @param kmer
+ * : input kmer
+ * @param afterCode
+ * : input genecode
* @return new created kmer that shifted by afterCode, the K will not change
*/
- public static byte[] shiftKmerWithNextCode(int k, final byte[] kmer, int offset, int length, byte afterCode){
- byte[] shifted = Arrays.copyOfRange(kmer, offset, offset+length);
+ public static byte[] shiftKmerWithNextCode(int k, final byte[] kmer, int offset, int length, byte afterCode) {
+ byte[] shifted = Arrays.copyOfRange(kmer, offset, offset + length);
Kmer.moveKmer(k, shifted, Kmer.GENE_CODE.getSymbolFromCode(afterCode));
return shifted;
}
-
+
/**
* Safely shifted the kmer backward without change the input kmer
* e.g. AGCGC shift with T => TAGCG
- * @param k: kmer length
- * @param kmer: input kmer
- * @param preCode: input genecode
+ *
+ * @param k
+ * : kmer length
+ * @param kmer
+ * : input kmer
+ * @param preCode
+ * : input genecode
* @return new created kmer that shifted by preCode, the K will not change
*/
- public static byte[] shiftKmerWithPreCode(int k, final byte[] kmer, int offset, int length, byte preCode){
- byte[] shifted = Arrays.copyOfRange(kmer, offset, offset+length);
+ public static byte[] shiftKmerWithPreCode(int k, final byte[] kmer, int offset, int length, byte preCode) {
+ byte[] shifted = Arrays.copyOfRange(kmer, offset, offset + length);
Kmer.moveKmerReverse(k, shifted, Kmer.GENE_CODE.getSymbolFromCode(preCode));
return shifted;
}
- public static byte getGeneCodeAtPosition(int pos, int k, final byte[] kmer,
- int offset, int length) {
+ public static byte getGeneCodeAtPosition(int pos, int k, final byte[] kmer, int offset, int length) {
if (pos >= k) {
return -1;
}
int posByte = pos / 4;
- int shift = (pos % 4) << 1;
+ int shift = (pos % 4) << 1;
return (byte) ((kmer[offset + length - 1 - posByte] >> shift) & 0x3);
}
}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
index f21da91..faee509 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.example.kmer;
import junit.framework.Assert;
@@ -33,7 +48,7 @@
}
byte out = kmer.shiftKmerWithNextChar(array[array.length - 1]);
- Assert.assertEquals(out, GeneCode.getAdjBit((byte) 'A'));
+ Assert.assertEquals(out, GeneCode.getCodeFromSymbol((byte) 'A'));
Assert.assertEquals(kmer.toString(), "ATAGAAG");
}
@@ -59,7 +74,7 @@
}
byte out = kmer.shiftKmerWithPreChar(array[array.length - 1]);
- Assert.assertEquals(out, GeneCode.getAdjBit((byte) 'A'));
+ Assert.assertEquals(out, GeneCode.getCodeFromSymbol((byte) 'A'));
Assert.assertEquals(kmer.toString(), "GAATAGA");
}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
index 6611752..7c4c675 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.example.kmer;
import org.junit.Assert;
@@ -31,49 +46,49 @@
for (int i = 8; i > 0; i--) {
lastKmer = kmerFactory.getLastKmerFromChain(i, kmer);
Assert.assertEquals("AGCTGACCG".substring(9 - i), lastKmer.toString());
- lastKmer = kmerFactory.getSubKmerFromChain(9-i, i, kmer);
+ lastKmer = kmerFactory.getSubKmerFromChain(9 - i, i, kmer);
Assert.assertEquals("AGCTGACCG".substring(9 - i), lastKmer.toString());
}
VKmerBytesWritable vlastKmer;
for (int i = 8; i > 0; i--) {
vlastKmer = kmerFactory.getLastKmerFromChain(i, kmer);
Assert.assertEquals("AGCTGACCG".substring(9 - i), vlastKmer.toString());
- vlastKmer = kmerFactory.getSubKmerFromChain(9-i, i, kmer);
+ vlastKmer = kmerFactory.getSubKmerFromChain(9 - i, i, kmer);
Assert.assertEquals("AGCTGACCG".substring(9 - i), vlastKmer.toString());
}
}
-
+
@Test
- public void TestGetFirstKmer(){
+ public void TestGetFirstKmer() {
KmerBytesWritable kmer = new KmerBytesWritable(9);
kmer.setByRead(array, 0);
Assert.assertEquals("AGCTGACCG", kmer.toString());
KmerBytesWritable firstKmer;
for (int i = 8; i > 0; i--) {
firstKmer = kmerFactory.getFirstKmerFromChain(i, kmer);
- Assert.assertEquals("AGCTGACCG".substring(0,i), firstKmer.toString());
- firstKmer = kmerFactory.getSubKmerFromChain(0,i,kmer);
- Assert.assertEquals("AGCTGACCG".substring(0,i), firstKmer.toString());
+ Assert.assertEquals("AGCTGACCG".substring(0, i), firstKmer.toString());
+ firstKmer = kmerFactory.getSubKmerFromChain(0, i, kmer);
+ Assert.assertEquals("AGCTGACCG".substring(0, i), firstKmer.toString());
}
VKmerBytesWritable vfirstKmer;
for (int i = 8; i > 0; i--) {
vfirstKmer = kmerFactory.getFirstKmerFromChain(i, kmer);
- Assert.assertEquals("AGCTGACCG".substring(0,i), vfirstKmer.toString());
+ Assert.assertEquals("AGCTGACCG".substring(0, i), vfirstKmer.toString());
vfirstKmer = kmerFactory.getSubKmerFromChain(0, i, kmer);
- Assert.assertEquals("AGCTGACCG".substring(0,i), vfirstKmer.toString());
+ Assert.assertEquals("AGCTGACCG".substring(0, i), vfirstKmer.toString());
}
}
-
- @Test
- public void TestGetSubKmer(){
+
+ @Test
+ public void TestGetSubKmer() {
KmerBytesWritable kmer = new KmerBytesWritable(9);
kmer.setByRead(array, 0);
Assert.assertEquals("AGCTGACCG", kmer.toString());
VKmerBytesWritable subKmer;
- for (int istart = 0; istart < kmer.getKmerLength()-1; istart++) {
- for(int isize = 1; isize + istart <= kmer.getKmerLength(); isize ++){
+ for (int istart = 0; istart < kmer.getKmerLength() - 1; istart++) {
+ for (int isize = 1; isize + istart <= kmer.getKmerLength(); isize++) {
subKmer = kmerFactory.getSubKmerFromChain(istart, isize, kmer);
- Assert.assertEquals("AGCTGACCG".substring(istart, istart+isize), subKmer.toString());
+ Assert.assertEquals("AGCTGACCG".substring(istart, istart + isize), subKmer.toString());
}
}
}
diff --git a/genomix/genomix-hadoop/actual3/conf.xml b/genomix/genomix-hadoop/actual3/conf.xml
index 32b17e1..9c045a4 100644
--- a/genomix/genomix-hadoop/actual3/conf.xml
+++ b/genomix/genomix-hadoop/actual3/conf.xml
@@ -12,7 +12,7 @@
<property><name>dfs.namenode.logging.level</name><value>info</value></property>
<property><name>dfs.datanode.address</name><value>127.0.0.1:0</value></property>
<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>fs.default.name</name><value>hdfs://localhost:55289</value></property>
+<property><name>fs.default.name</name><value>hdfs://localhost:55383</value></property>
<property><name>mapred.child.tmp</name><value>./tmp</value></property>
<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
<property><name>dfs.safemode.threshold.pct</name><value>0.999f</value></property>
@@ -125,7 +125,7 @@
<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
<property><name>dfs.namenode.decommission.interval</name><value>3</value></property>
-<property><name>dfs.http.address</name><value>localhost:55291</value></property>
+<property><name>dfs.http.address</name><value>localhost:55384</value></property>
<property><name>dfs.heartbeat.interval</name><value>3</value></property>
<property><name>mapred.job.tracker</name><value>local</value></property>
<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
index d36be17..f3f4584 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
@@ -87,14 +87,14 @@
output.collect(outputKmer, outputAdjList);
/** middle kmer */
for (int i = KMER_SIZE; i < array.length - 1; i++) {
- pre = outputKmer.shiftKmerWithNextChar(array[i]);
+ pre = GeneCode.getBitMapFromGeneCode(outputKmer.shiftKmerWithNextChar(array[i]));
next = GeneCode.getAdjBit(array[i + 1]);
adj = GeneCode.mergePreNextAdj(pre, next);
outputAdjList.set(adj, count);
output.collect(outputKmer, outputAdjList);
}
/** last kmer */
- pre = outputKmer.shiftKmerWithNextChar(array[array.length - 1]);
+ pre = GeneCode.getBitMapFromGeneCode(outputKmer.shiftKmerWithNextChar(array[array.length - 1]));
next = 0;
adj = GeneCode.mergePreNextAdj(pre, next);
outputAdjList.set(adj, count);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/pathmergingh2/SNodeInitialMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/pathmergingh2/SNodeInitialMapper.java
index e7478a6..75ac0d1 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/pathmergingh2/SNodeInitialMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/pathmergingh2/SNodeInitialMapper.java
@@ -91,44 +91,47 @@
succeed = (byte) (succeed & adjBitMap);
boolean inDegree = measureDegree(precursor);
boolean outDegree = measureDegree(succeed);
-
+ if (key.toString().equals("CGC")) {
+ int a = 2;
+ int b = a;
+ }
if (inDegree == false && outDegree == false) {
+
outputKmer.set(key);
bitFlag = (byte) 2;
outputAdjList.set(null, 0, 0, adjBitMap, bitFlag, 0);
output.collect(outputKmer, outputAdjList);
- }
- else{
- for(int i = 0 ; i < 4; i ++){
+ } else {
+ for (int i = 0; i < 4; i++) {
byte temp = 0x01;
byte shiftedCode = 0;
- temp = (byte)(temp << i);
- temp = (byte) (succeed & temp);
- if(temp != 0 ){
- byte succeedCode = GeneCode.getGeneCodeFromBitMap(temp);
- shiftedCode = key.shiftKmerWithNextCode(succeedCode);
- outputKmer.set(key);
- bitFlag = (byte)0x01;
- outputAdjList.set(null, 0, 0, (byte)0, bitFlag, 0);
- output.collect(outputKmer, outputAdjList);
- key.shiftKmerWithPreCode(shiftedCode);
- }
- }
- for(int i = 0; i < 4;i ++){
- byte temp = 0x01;
- byte shiftedCode = 0;
- temp = (byte)(temp << i);
- temp = (byte)(precursor & temp);
- if(temp != 0){
+ temp = (byte) (temp << i);
+ temp = (byte) (precursor & temp);
+ if (temp != 0) {
byte precurCode = GeneCode.getGeneCodeFromBitMap(temp);
shiftedCode = key.shiftKmerWithPreCode(precurCode);
outputKmer.set(key);
- bitFlag = (byte)0x80;
- outputAdjList.set(null, 0, 0, (byte)0, bitFlag, 0);
+ bitFlag = (byte) 0x80;
+ outputAdjList.set(null, 0, 0, (byte) 6, bitFlag, 0);
output.collect(outputKmer, outputAdjList);
key.shiftKmerWithNextCode(shiftedCode);
}
}
+ for (int i = 0; i < 4; i++) {
+ byte temp = 0x01;
+ byte shiftedCode = 0;
+ temp = (byte) (temp << i);
+ temp = (byte) (succeed & temp);
+ if (temp != 0) {
+ byte succeedCode = GeneCode.getGeneCodeFromBitMap(temp);
+ shiftedCode = key.shiftKmerWithNextCode(succeedCode);
+ outputKmer.set(key);
+ bitFlag = (byte) 0x01;
+ outputAdjList.set(null, 0, 0, (byte) 0, bitFlag, 0);
+ output.collect(outputKmer, outputAdjList);
+ key.shiftKmerWithPreCode(shiftedCode);
+ }
+ }
}
}
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/pathmergingh2/SNodeInitialReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/pathmergingh2/SNodeInitialReducer.java
index 91b13c9..3d2b58c 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/pathmergingh2/SNodeInitialReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/pathmergingh2/SNodeInitialReducer.java
@@ -25,6 +25,10 @@
byte targetPointFlag = 0x00;
byte targetAdjList = 0x00;
byte outputFlag = 0x00;
+ if(key.toString().equals("TCG")){
+ int a = 2;
+ int b = a;
+ }
if (values.hasNext() == true) {
switch (outputValue.getFlag()) {
case (byte) 0x01:
@@ -48,14 +52,14 @@
endFlag = 0x01;
break;
case (byte) 0x02:
- targetPointFlag = 0x01;
+ targetPointFlag = 0x02;
targetAdjList = outputValue.getAdjBitMap();
break;
}
if(startFlag != 0x00 && endFlag!= 0x00 && targetPointFlag != 0x00)
break;
}
- if(targetPointFlag != 0x02) {
+ if(targetPointFlag == 0x02) {
if(startFlag == 0x01) {
outputFlag = (byte) (outputFlag | startFlag);
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
index 7b0005b..44af11b 100755
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
@@ -71,7 +71,8 @@
SequenceFile.Reader reader = null;
Path path = new Path(RESULT_PATH + "/part-00000");
reader = new SequenceFile.Reader(dfs, path, conf);
- KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+// KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ KmerBytesWritable key = new KmerBytesWritable(SIZE_KMER);
KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
File filePathTo = new File(TEST_SOURCE_DIR);
BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
index 41e3e4b..ebf1282 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.data.std.accessors;
import java.io.DataInput;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
index c1e9804..34c29c7 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.data.std.accessors;
import edu.uci.ics.genomix.data.std.primitive.KmerPointable;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
index 86ca296..8aaf380 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.data.std.accessors;
import java.nio.ByteBuffer;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
index d1d0054..83b9d12 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.data.std.accessors;
import edu.uci.ics.genomix.data.std.primitive.KmerPointable;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
index b4aa4c7..8febfa5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.data.std.primitive;
import edu.uci.ics.genomix.data.std.accessors.KmerHashPartitioncomputerFactory;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
index d3de2ba..ed1b926 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.dataflow;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -9,22 +24,19 @@
/**
* used by precluster groupby
- *
*/
-public class ConnectorPolicyAssignmentPolicy implements
- IConnectorPolicyAssignmentPolicy {
- private static final long serialVersionUID = 1L;
- private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
- private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
+ private static final long serialVersionUID = 1L;
+ private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+ private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
- @Override
- public IConnectorPolicy getConnectorPolicyAssignment(
- IConnectorDescriptor c, int nProducers, int nConsumers,
- int[] fanouts) {
- if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
- return senderSideMaterializePolicy;
- } else {
- return pipeliningPolicy;
- }
- }
+ @Override
+ public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+ int[] fanouts) {
+ if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
+ return senderSideMaterializePolicy;
+ } else {
+ return pipeliningPolicy;
+ }
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
index 413c73b..405e109 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.dataflow;
import java.io.DataOutput;
@@ -22,68 +37,65 @@
@SuppressWarnings("deprecation")
public class KMerSequenceWriterFactory implements ITupleWriterFactory {
- private static final long serialVersionUID = 1L;
- private ConfFactory confFactory;
- private final int kmerlength;
+ private static final long serialVersionUID = 1L;
+ private ConfFactory confFactory;
+ private final int kmerlength;
- public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
- this.confFactory = new ConfFactory(conf);
- this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
- }
+ public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
+ this.confFactory = new ConfFactory(conf);
+ this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+ }
- public class TupleWriter implements ITupleWriter {
- public TupleWriter(ConfFactory cf) {
- this.cf = cf;
- }
+ public class TupleWriter implements ITupleWriter {
+ public TupleWriter(ConfFactory cf) {
+ this.cf = cf;
+ }
- ConfFactory cf;
- Writer writer = null;
+ ConfFactory cf;
+ Writer writer = null;
- KmerCountValue reEnterCount = new KmerCountValue();
- KmerBytesWritable reEnterKey = new KmerBytesWritable(kmerlength);
+ KmerCountValue reEnterCount = new KmerCountValue();
+ KmerBytesWritable reEnterKey = new KmerBytesWritable(kmerlength);
- /**
- * assumption is that output never change source!
- */
- @Override
- public void write(DataOutput output, ITupleReference tuple)
- throws HyracksDataException {
- try {
- byte[] kmer = tuple.getFieldData(0);
- int keyStart = tuple.getFieldStart(0);
- int keyLength = tuple.getFieldLength(0);
+ /**
+ * assumption is that output never change source!
+ */
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ try {
+ byte[] kmer = tuple.getFieldData(0);
+ int keyStart = tuple.getFieldStart(0);
+ int keyLength = tuple.getFieldLength(0);
- byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
- byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
- reEnterCount.set(bitmap, count);
- reEnterKey.set(kmer, keyStart, keyLength);
- writer.append(reEnterKey, reEnterCount);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
+ byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
+ byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
+ reEnterCount.set(bitmap, count);
+ reEnterKey.set(kmer, keyStart, keyLength);
+ writer.append(reEnterKey, reEnterCount);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- @Override
- public void open(DataOutput output) throws HyracksDataException {
- try {
- writer = SequenceFile.createWriter(cf.getConf(),
- (FSDataOutputStream) output, KmerBytesWritable.class,
- KmerCountValue.class, CompressionType.NONE, null);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+ try {
+ writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, KmerBytesWritable.class,
+ KmerCountValue.class, CompressionType.NONE, null);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- @Override
- public void close(DataOutput output) throws HyracksDataException {
- // TODO Auto-generated method stub
- }
- }
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ }
+ }
- @Override
- public ITupleWriter getTupleWriter(IHyracksTaskContext ctx)
- throws HyracksDataException {
- return new TupleWriter(confFactory);
- }
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new TupleWriter(confFactory);
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
index 4ba38d0..cfa7262 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.dataflow;
import java.io.DataOutput;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
index 79ad195..4f5b4da 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.dataflow;
import java.nio.ByteBuffer;
@@ -62,13 +77,13 @@
/** middle kmer */
for (int i = k; i < array.length - 1; i++) {
- pre = kmer.shiftKmerWithNextChar(array[i]);
+ pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[i]));
next = GeneCode.getAdjBit(array[i + 1]);
InsertToFrame(kmer, pre, next, writer);
}
/** last kmer */
- pre = kmer.shiftKmerWithNextChar(array[array.length - 1]);
+ pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[array.length - 1]));
next = 0;
InsertToFrame(kmer, pre, next, writer);
@@ -80,12 +95,12 @@
InsertToFrame(kmer, pre, next, writer);
/** middle kmer */
for (int i = k; i < array.length - 1; i++) {
- next = kmer.shiftKmerWithPreChar(array[i]);
+ next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[i]));
pre = GeneCode.getAdjBit(array[i + 1]);
InsertToFrame(kmer, pre, next, writer);
}
/** last kmer */
- next = kmer.shiftKmerWithPreChar(array[array.length - 1]);
+ next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[array.length - 1]));
pre = 0;
InsertToFrame(kmer, pre, next, writer);
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
index 62680ed..ea70fb0 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.dataflow.aggregators;
import java.io.DataOutput;
@@ -15,126 +30,110 @@
/**
* sum
- *
*/
-public class DistributedMergeLmerAggregateFactory implements
- IAggregatorDescriptorFactory {
- private static final long serialVersionUID = 1L;
+public class DistributedMergeLmerAggregateFactory implements IAggregatorDescriptorFactory {
+ private static final long serialVersionUID = 1L;
- public DistributedMergeLmerAggregateFactory() {
- }
+ public DistributedMergeLmerAggregateFactory() {
+ }
- public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {
- private static final int MAX = 127;
+ public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {
+ private static final int MAX = 127;
- @Override
- public void reset() {
- }
+ @Override
+ public void reset() {
+ }
- @Override
- public void close() {
- // TODO Auto-generated method stub
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
- }
+ }
- @Override
- public AggregateState createAggregateStates() {
- return new AggregateState(new Object() {
- });
- }
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState(new Object() {
+ });
+ }
- protected byte getField(IFrameTupleAccessor accessor, int tIndex,
- int fieldId) {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
- int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
- byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer()
- .array(), offset);
- return data;
- }
-
- @Override
- public void init(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = getField(accessor, tIndex, 2);
+ protected byte getField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+ int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+ byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
+ return data;
+ }
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when initializing the aggregator.");
- }
- }
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ byte count = getField(accessor, tIndex, 2);
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- IFrameTupleAccessor stateAccessor, int stateTupleIndex,
- AggregateState state) throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- short count = getField(accessor, tIndex, 2);
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when initializing the aggregator.");
+ }
+ }
- int statetupleOffset = stateAccessor
- .getTupleStartOffset(stateTupleIndex);
- int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex,
- 1);
- int countfieldStart = stateAccessor.getFieldStartOffset(
- stateTupleIndex, 2);
- int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength()
- + bitfieldStart;
- int countoffset = statetupleOffset
- + stateAccessor.getFieldSlotsLength() + countfieldStart;
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ short count = getField(accessor, tIndex, 2);
- byte[] data = stateAccessor.getBuffer().array();
+ int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+ int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
+ int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 2);
+ int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + bitfieldStart;
+ int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
- bitmap |= data[bitoffset];
- count += data[countoffset];
- if (count >= MAX) {
- count = (byte) MAX;
- }
- data[bitoffset] = bitmap;
- data[countoffset] = (byte) count;
- }
+ byte[] data = stateAccessor.getBuffer().array();
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = getField(accessor, tIndex, 2);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
- }
+ bitmap |= data[bitoffset];
+ count += data[countoffset];
+ if (count >= MAX) {
+ count = (byte) MAX;
+ }
+ data[bitoffset] = bitmap;
+ data[countoffset] = (byte) count;
+ }
- }
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ byte count = getField(accessor, tIndex, 2);
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- outputPartialResult(tupleBuilder, accessor, tIndex, state);
- }
-
- }
+ }
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields,
- int[] keyFieldsInPartialResults) throws HyracksDataException {
- return new DistributeAggregatorDescriptor();
- }
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ outputPartialResult(tupleBuilder, accessor, tIndex, state);
+ }
+
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ return new DistributeAggregatorDescriptor();
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
index 330f950..87c0207 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.dataflow.aggregators;
import java.io.DataOutput;
@@ -11,106 +26,93 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
public class LocalAggregatorDescriptor implements IAggregatorDescriptor {
- private static final int MAX = 127;
+ private static final int MAX = 127;
- @Override
- public void reset() {
- }
+ @Override
+ public void reset() {
+ }
- @Override
- public void close() {
- // TODO Auto-generated method stub
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
- }
+ }
- @Override
- public AggregateState createAggregateStates() {
- return new AggregateState(new Object() {
- });
- }
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState(new Object() {
+ });
+ }
- protected byte getField(IFrameTupleAccessor accessor, int tIndex,
- int fieldId) {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
- int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
- byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer()
- .array(), offset);
- return data;
- }
+ protected byte getField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+ int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+ byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
+ return data;
+ }
- @Override
- public void init(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = 1;
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ byte count = 1;
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when initializing the aggregator.");
- }
- }
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when initializing the aggregator.");
+ }
+ }
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- IFrameTupleAccessor stateAccessor, int stateTupleIndex,
- AggregateState state) throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- short count = 1;
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ short count = 1;
- int statetupleOffset = stateAccessor
- .getTupleStartOffset(stateTupleIndex);
- int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex,
- 1);
- int countfieldStart = stateAccessor.getFieldStartOffset(
- stateTupleIndex, 2);
- int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength()
- + bitfieldStart;
- int countoffset = statetupleOffset
- + stateAccessor.getFieldSlotsLength() + countfieldStart;
+ int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+ int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
+ int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 2);
+ int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + bitfieldStart;
+ int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
- byte[] data = stateAccessor.getBuffer().array();
+ byte[] data = stateAccessor.getBuffer().array();
- bitmap |= data[bitoffset];
- count += data[countoffset];
- if (count >= MAX) {
- count = (byte) MAX;
- }
- data[bitoffset] = bitmap;
- data[countoffset] = (byte) count;
- }
+ bitmap |= data[bitoffset];
+ count += data[countoffset];
+ if (count >= MAX) {
+ count = (byte) MAX;
+ }
+ data[bitoffset] = bitmap;
+ data[countoffset] = (byte) count;
+ }
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = getField(accessor, tIndex, 2);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
- }
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ byte count = getField(accessor, tIndex, 2);
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
- }
+ }
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- outputPartialResult(tupleBuilder, accessor, tIndex, state);
- }
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ outputPartialResult(tupleBuilder, accessor, tIndex, state);
+ }
};
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
index 58ff8a2..b5eb70f 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.dataflow.aggregators;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -8,20 +23,18 @@
/**
* count
- *
*/
public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- public MergeKmerAggregateFactory() {
- }
+ public MergeKmerAggregateFactory() {
+ }
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields,
- int[] keyFieldsInPartialResults) throws HyracksDataException {
- return new LocalAggregatorDescriptor();
- }
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ return new LocalAggregatorDescriptor();
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
index 99d6c89..27d5e15 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.driver;
import java.net.URL;
@@ -22,122 +37,112 @@
import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
public class Driver {
- public static enum Plan {
- BUILD_DEBRUJIN_GRAPH, GRAPH_CLEANNING, CONTIGS_GENERATION,
- }
+ public static enum Plan {
+ BUILD_DEBRUJIN_GRAPH,
+ GRAPH_CLEANNING,
+ CONTIGS_GENERATION,
+ }
- private static final String IS_PROFILING = "genomix.driver.profiling";
- private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
- private static final String applicationName = GenomixJob.JOB_NAME;
- private static final Log LOG = LogFactory.getLog(Driver.class);
- private JobGen jobGen;
- private boolean profiling;
+ private static final String IS_PROFILING = "genomix.driver.profiling";
+ private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
+ private static final Log LOG = LogFactory.getLog(Driver.class);
+ private JobGen jobGen;
+ private boolean profiling;
- private int numPartitionPerMachine;
+ private int numPartitionPerMachine;
- private IHyracksClientConnection hcc;
- private Scheduler scheduler;
+ private IHyracksClientConnection hcc;
+ private Scheduler scheduler;
- public Driver(String ipAddress, int port, int numPartitionPerMachine)
- throws HyracksException {
- try {
- hcc = new HyracksConnection(ipAddress, port);
- scheduler = new Scheduler(hcc.getNodeControllerInfos());
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- this.numPartitionPerMachine = numPartitionPerMachine;
- }
+ public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException {
+ try {
+ hcc = new HyracksConnection(ipAddress, port);
+ scheduler = new Scheduler(hcc.getNodeControllerInfos());
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ this.numPartitionPerMachine = numPartitionPerMachine;
+ }
- public void runJob(GenomixJob job) throws HyracksException {
- runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
- }
+ public void runJob(GenomixJob job) throws HyracksException {
+ runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
+ }
- public void runJob(GenomixJob job, Plan planChoice, boolean profiling)
- throws HyracksException {
- /** add hadoop configurations */
- URL hadoopCore = job.getClass().getClassLoader()
- .getResource("core-site.xml");
- job.addResource(hadoopCore);
- URL hadoopMapRed = job.getClass().getClassLoader()
- .getResource("mapred-site.xml");
- job.addResource(hadoopMapRed);
- URL hadoopHdfs = job.getClass().getClassLoader()
- .getResource("hdfs-site.xml");
- job.addResource(hadoopHdfs);
+ public void runJob(GenomixJob job, Plan planChoice, boolean profiling) throws HyracksException {
+ /** add hadoop configurations */
+ URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+ job.addResource(hadoopCore);
+ URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+ job.addResource(hadoopMapRed);
+ URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+ job.addResource(hadoopHdfs);
- LOG.info("job started");
- long start = System.currentTimeMillis();
- long end = start;
- long time = 0;
+ LOG.info("job started");
+ long start = System.currentTimeMillis();
+ long end = start;
+ long time = 0;
- this.profiling = profiling;
- try {
- Map<String, NodeControllerInfo> ncMap = hcc
- .getNodeControllerInfos();
- LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
- switch (planChoice) {
- case BUILD_DEBRUJIN_GRAPH:
- default:
- jobGen = new JobGenBrujinGraph(job, scheduler, ncMap,
- numPartitionPerMachine);
- break;
- }
+ this.profiling = profiling;
+ try {
+ Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
+ LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
+ switch (planChoice) {
+ case BUILD_DEBRUJIN_GRAPH:
+ default:
+ jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
+ break;
+ }
- start = System.currentTimeMillis();
- runCreate(jobGen);
- end = System.currentTimeMillis();
- time = end - start;
- LOG.info("result writing finished " + time + "ms");
- LOG.info("job finished");
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
+ start = System.currentTimeMillis();
+ runCreate(jobGen);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("result writing finished " + time + "ms");
+ LOG.info("job finished");
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
- private void runCreate(JobGen jobGen) throws Exception {
- try {
- JobSpecification createJob = jobGen.generateJob();
- execute(createJob);
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
- }
+ private void runCreate(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification createJob = jobGen.generateJob();
+ execute(createJob);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
- private void execute(JobSpecification job) throws Exception {
- job.setUseConnectorPolicyForScheduling(false);
- JobId jobId = hcc.startJob(
- job,
- profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet
- .noneOf(JobFlag.class));
- hcc.waitForCompletion(jobId);
- }
+ private void execute(JobSpecification job) throws Exception {
+ job.setUseConnectorPolicyForScheduling(false);
+ JobId jobId = hcc
+ .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+ hcc.waitForCompletion(jobId);
+ }
- public static void main(String[] args) throws Exception {
- GenomixJob jobConf = new GenomixJob();
- String[] otherArgs = new GenericOptionsParser(jobConf, args)
- .getRemainingArgs();
- if (otherArgs.length < 4) {
- System.err.println("Need <serverIP> <port> <input> <output>");
- System.exit(-1);
- }
- String ipAddress = otherArgs[0];
- int port = Integer.parseInt(otherArgs[1]);
- int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
- boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
- // FileInputFormat.setInputPaths(job, otherArgs[2]);
- {
- Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
- jobConf.set("mapred.input.dir", path.toString());
+ public static void main(String[] args) throws Exception {
+ GenomixJob jobConf = new GenomixJob();
+ String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
+ if (otherArgs.length < 4) {
+ System.err.println("Need <serverIP> <port> <input> <output>");
+ System.exit(-1);
+ }
+ String ipAddress = otherArgs[0];
+ int port = Integer.parseInt(otherArgs[1]);
+ int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
+ boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
+ // FileInputFormat.setInputPaths(job, otherArgs[2]);
+ {
+ Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
+ jobConf.set("mapred.input.dir", path.toString());
- Path outputDir = new Path(jobConf.getWorkingDirectory(),
- otherArgs[3]);
- jobConf.set("mapred.output.dir", outputDir.toString());
- }
- // FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
- // FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
- Driver driver = new Driver(ipAddress, port, numOfDuplicate);
- driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
- }
+ Path outputDir = new Path(jobConf.getWorkingDirectory(), otherArgs[3]);
+ jobConf.set("mapred.output.dir", outputDir.toString());
+ }
+ // FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
+ // FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
+ Driver driver = new Driver(ipAddress, port, numOfDuplicate);
+ driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
index 11786f3..be82477 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.job;
import java.io.IOException;
@@ -7,73 +22,73 @@
public class GenomixJob extends JobConf {
- public static final String JOB_NAME = "genomix";
+ public static final String JOB_NAME = "genomix";
- /** Kmers length */
- public static final String KMER_LENGTH = "genomix.kmer";
- /** Frame Size */
- public static final String FRAME_SIZE = "genomix.framesize";
- /** Frame Limit, hyracks need */
- public static final String FRAME_LIMIT = "genomix.framelimit";
- /** Table Size, hyracks need */
- public static final String TABLE_SIZE = "genomix.tablesize";
- /** Groupby types */
- public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
- /** Graph outputformat */
- public static final String OUTPUT_FORMAT = "genomix.graph.output";
- /** Get reversed Kmer Sequence */
- public static final String REVERSED_KMER = "genomix.kmer.reversed";
+ /** Kmers length */
+ public static final String KMER_LENGTH = "genomix.kmer";
+ /** Frame Size */
+ public static final String FRAME_SIZE = "genomix.framesize";
+ /** Frame Limit, hyracks need */
+ public static final String FRAME_LIMIT = "genomix.framelimit";
+ /** Table Size, hyracks need */
+ public static final String TABLE_SIZE = "genomix.tablesize";
+ /** Groupby types */
+ public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
+ /** Graph outputformat */
+ public static final String OUTPUT_FORMAT = "genomix.graph.output";
+ /** Get reversed Kmer Sequence */
+ public static final String REVERSED_KMER = "genomix.kmer.reversed";
- /** Configurations used by hybrid groupby function in graph build phrase */
- public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
- public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
- public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
- public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
- public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
+ /** Configurations used by hybrid groupby function in graph build phrase */
+ public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
+ public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
+ public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
+ public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
+ public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
- public static final int DEFAULT_KMER = 21;
- public static final int DEFAULT_FRAME_SIZE = 32768;
- public static final int DEFAULT_FRAME_LIMIT = 4096;
- public static final int DEFAULT_TABLE_SIZE = 10485767;
- public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
- public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
- public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
- public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
- public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
+ public static final int DEFAULT_KMER = 21;
+ public static final int DEFAULT_FRAME_SIZE = 32768;
+ public static final int DEFAULT_FRAME_LIMIT = 4096;
+ public static final int DEFAULT_TABLE_SIZE = 10485767;
+ public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
+ public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
+ public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
+ public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
+ public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
- public static final boolean DEFAULT_REVERSED = false;
+ public static final boolean DEFAULT_REVERSED = false;
- public static final String DEFAULT_GROUPBY_TYPE = "hybrid";
- public static final String DEFAULT_OUTPUT_FORMAT = "binary";
+ public static final String DEFAULT_GROUPBY_TYPE = "hybrid";
+ public static final String DEFAULT_OUTPUT_FORMAT = "binary";
- public GenomixJob() throws IOException {
- super(new Configuration());
- }
+ public GenomixJob() throws IOException {
+ super(new Configuration());
+ }
- public GenomixJob(Configuration conf) throws IOException {
- super(conf);
- }
+ public GenomixJob(Configuration conf) throws IOException {
+ super(conf);
+ }
- /**
- * Set the kmer length
- *
- * @param the
- * desired frame size
- */
- final public void setKmerLength(int kmerlength) {
- setInt(KMER_LENGTH, kmerlength);
- }
+ /**
+ * Set the kmer length
+ *
+ * @param the
+ * desired frame size
+ */
+ final public void setKmerLength(int kmerlength) {
+ setInt(KMER_LENGTH, kmerlength);
+ }
- final public void setFrameSize(int frameSize) {
- setInt(FRAME_SIZE, frameSize);
- }
+ final public void setFrameSize(int frameSize) {
+ setInt(FRAME_SIZE, frameSize);
+ }
- final public void setFrameLimit(int frameLimit) {
- setInt(FRAME_LIMIT, frameLimit);
- }
+ final public void setFrameLimit(int frameLimit) {
+ setInt(FRAME_LIMIT, frameLimit);
+ }
- final public void setTableSize(int tableSize) {
- setInt(TABLE_SIZE, tableSize);
- }
+ final public void setTableSize(int tableSize) {
+ setInt(TABLE_SIZE, tableSize);
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
index 557da6b..b1f9a29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.job;
import java.util.UUID;
@@ -9,19 +24,18 @@
public abstract class JobGen {
- protected final Configuration conf;
- protected final GenomixJob genomixJob;
- protected String jobId = new UUID(System.currentTimeMillis(),
- System.nanoTime()).toString();
+ protected final Configuration conf;
+ protected final GenomixJob genomixJob;
+ protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
- public JobGen(GenomixJob job) {
- this.conf = job;
- this.genomixJob = job;
- this.initJobConfiguration();
- }
+ public JobGen(GenomixJob job) {
+ this.conf = job;
+ this.genomixJob = job;
+ this.initJobConfiguration();
+ }
- protected abstract void initJobConfiguration();
+ protected abstract void initJobConfiguration();
- public abstract JobSpecification generateJob() throws HyracksException;
+ public abstract JobSpecification generateJob() throws HyracksException;
}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index 9462fb0..79b38e8 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.job;
import java.util.Map;
@@ -47,292 +62,234 @@
import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
public class JobGenBrujinGraph extends JobGen {
- public enum GroupbyType {
- EXTERNAL, PRECLUSTER, HYBRIDHASH,
- }
+ public enum GroupbyType {
+ EXTERNAL,
+ PRECLUSTER,
+ HYBRIDHASH,
+ }
- public enum OutputFormat {
- TEXT, BINARY,
- }
+ public enum OutputFormat {
+ TEXT,
+ BINARY,
+ }
- JobConf job;
- private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
- private Scheduler scheduler;
- private String[] ncNodeNames;
+ JobConf job;
+ private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+ private Scheduler scheduler;
+ private String[] ncNodeNames;
- private int kmers;
- private int frameLimits;
- private int frameSize;
- private int tableSize;
- private GroupbyType groupbyType;
- private OutputFormat outputFormat;
- private boolean bGenerateReversedKmer;
+ private int kmers;
+ private int frameLimits;
+ private int frameSize;
+ private int tableSize;
+ private GroupbyType groupbyType;
+ private OutputFormat outputFormat;
+ private boolean bGenerateReversedKmer;
- private AbstractOperatorDescriptor singleGrouper;
- private IConnectorDescriptor connPartition;
- private AbstractOperatorDescriptor crossGrouper;
- private RecordDescriptor readOutputRec;
- private RecordDescriptor combineOutputRec;
+ private AbstractOperatorDescriptor singleGrouper;
+ private IConnectorDescriptor connPartition;
+ private AbstractOperatorDescriptor crossGrouper;
+ private RecordDescriptor readOutputRec;
+ private RecordDescriptor combineOutputRec;
- /** works for hybrid hashing */
- private long inputSizeInRawRecords;
- private long inputSizeInUniqueKeys;
- private int recordSizeInBytes;
- private int hashfuncStartLevel;
+ /** works for hybrid hashing */
+ private long inputSizeInRawRecords;
+ private long inputSizeInUniqueKeys;
+ private int recordSizeInBytes;
+ private int hashfuncStartLevel;
- private void logDebug(String status) {
- String names = "";
- for (String str : ncNodeNames) {
- names += str + " ";
- }
- LOG.info(status + " nc nodes:" + ncNodeNames.length + " " + names);
- }
+ private void logDebug(String status) {
+ String names = "";
+ for (String str : ncNodeNames) {
+ names += str + " ";
+ }
+ LOG.info(status + " nc nodes:" + ncNodeNames.length + " " + names);
+ }
- public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler,
- final Map<String, NodeControllerInfo> ncMap,
- int numPartitionPerMachine) {
- super(job);
- this.scheduler = scheduler;
- String[] nodes = new String[ncMap.size()];
- ncMap.keySet().toArray(nodes);
- ncNodeNames = new String[nodes.length * numPartitionPerMachine];
- for (int i = 0; i < numPartitionPerMachine; i++) {
- System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length,
- nodes.length);
- }
- logDebug("initialize");
- }
+ public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) {
+ super(job);
+ this.scheduler = scheduler;
+ String[] nodes = new String[ncMap.size()];
+ ncMap.keySet().toArray(nodes);
+ ncNodeNames = new String[nodes.length * numPartitionPerMachine];
+ for (int i = 0; i < numPartitionPerMachine; i++) {
+ System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
+ }
+ logDebug("initialize");
+ }
- private ExternalGroupOperatorDescriptor newExternalGroupby(
- JobSpecification jobSpec, int[] keyFields,
- IAggregatorDescriptorFactory aggeragater) {
- return new ExternalGroupOperatorDescriptor(
- jobSpec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(KmerPointable.FACTORY) },
- new KmerNormarlizedComputerFactory(),
- aggeragater,
- new DistributedMergeLmerAggregateFactory(),
- combineOutputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(KmerPointable.FACTORY) }),
- tableSize), true);
- }
+ private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+ IAggregatorDescriptorFactory aggeragater) {
+ return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+ new KmerNormarlizedComputerFactory(), aggeragater, new DistributedMergeLmerAggregateFactory(),
+ combineOutputRec, new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(KmerPointable.FACTORY) }), tableSize), true);
+ }
- private HybridHashGroupOperatorDescriptor newHybridGroupby(
- JobSpecification jobSpec, int[] keyFields,
- long inputSizeInRawRecords, long inputSizeInUniqueKeys,
- int recordSizeInBytes, int hashfuncStartLevel,
- IAggregatorDescriptorFactory aggeragater)
- throws HyracksDataException {
- return new HybridHashGroupOperatorDescriptor(
- jobSpec,
- keyFields,
- frameLimits,
- inputSizeInRawRecords,
- inputSizeInUniqueKeys,
- recordSizeInBytes,
- tableSize,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(KmerPointable.FACTORY) },
- new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() },
- hashfuncStartLevel, new KmerNormarlizedComputerFactory(),
- aggeragater, new DistributedMergeLmerAggregateFactory(),
- combineOutputRec, true);
- }
+ private HybridHashGroupOperatorDescriptor newHybridGroupby(JobSpecification jobSpec, int[] keyFields,
+ long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int hashfuncStartLevel,
+ IAggregatorDescriptorFactory aggeragater) throws HyracksDataException {
+ return new HybridHashGroupOperatorDescriptor(jobSpec, keyFields, frameLimits, inputSizeInRawRecords,
+ inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+ new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() }, hashfuncStartLevel,
+ new KmerNormarlizedComputerFactory(), aggeragater, new DistributedMergeLmerAggregateFactory(),
+ combineOutputRec, true);
+ }
- private void generateDescriptorbyType(JobSpecification jobSpec)
- throws HyracksDataException {
- int[] keyFields = new int[] { 0 }; // the id of grouped key
+ private void generateDescriptorbyType(JobSpecification jobSpec) throws HyracksDataException {
+ int[] keyFields = new int[] { 0 }; // the id of grouped key
- switch (groupbyType) {
- case EXTERNAL:
- singleGrouper = newExternalGroupby(jobSpec, keyFields,
- new MergeKmerAggregateFactory());
- connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
- new KmerHashPartitioncomputerFactory());
- crossGrouper = newExternalGroupby(jobSpec, keyFields,
- new DistributedMergeLmerAggregateFactory());
- break;
- case PRECLUSTER:
- singleGrouper = newExternalGroupby(jobSpec, keyFields,
- new MergeKmerAggregateFactory());
- connPartition = new MToNPartitioningMergingConnectorDescriptor(
- jobSpec,
- new KmerHashPartitioncomputerFactory(),
- keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(KmerPointable.FACTORY) });
- crossGrouper = new PreclusteredGroupOperatorDescriptor(
- jobSpec,
- keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(KmerPointable.FACTORY) },
- new DistributedMergeLmerAggregateFactory(),
- combineOutputRec);
- break;
- case HYBRIDHASH:
- default:
- singleGrouper = newHybridGroupby(jobSpec, keyFields,
- inputSizeInRawRecords, inputSizeInUniqueKeys,
- recordSizeInBytes, hashfuncStartLevel,
- new MergeKmerAggregateFactory());
- connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
- new KmerHashPartitioncomputerFactory());
+ switch (groupbyType) {
+ case EXTERNAL:
+ singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
+ connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
+ crossGrouper = newExternalGroupby(jobSpec, keyFields, new DistributedMergeLmerAggregateFactory());
+ break;
+ case PRECLUSTER:
+ singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
+ connPartition = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+ new KmerHashPartitioncomputerFactory(), keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) });
+ crossGrouper = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+ new DistributedMergeLmerAggregateFactory(), combineOutputRec);
+ break;
+ case HYBRIDHASH:
+ default:
+ singleGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
+ recordSizeInBytes, hashfuncStartLevel, new MergeKmerAggregateFactory());
+ connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
- crossGrouper = newHybridGroupby(jobSpec, keyFields,
- inputSizeInRawRecords, inputSizeInUniqueKeys,
- recordSizeInBytes, hashfuncStartLevel,
- new DistributedMergeLmerAggregateFactory());
- break;
- }
- }
+ crossGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
+ recordSizeInBytes, hashfuncStartLevel, new DistributedMergeLmerAggregateFactory());
+ break;
+ }
+ }
- public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec)
- throws HyracksDataException {
- try {
+ public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+ try {
- InputSplit[] splits = job.getInputFormat().getSplits(job,
- ncNodeNames.length);
+ InputSplit[] splits = job.getInputFormat().getSplits(job, ncNodeNames.length);
- LOG.info("HDFS read into " + splits.length + " splits");
- String[] readSchedule = scheduler.getLocationConstraints(splits);
- String log = "";
- for (String schedule : readSchedule) {
- log += schedule + " ";
- }
- LOG.info("HDFS read schedule " + log);
- return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job,
- splits, readSchedule, new ReadsKeyValueParserFactory(kmers,
- bGenerateReversedKmer));
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
+ LOG.info("HDFS read into " + splits.length + " splits");
+ String[] readSchedule = scheduler.getLocationConstraints(splits);
+ String log = "";
+ for (String schedule : readSchedule) {
+ log += schedule + " ";
+ }
+ LOG.info("HDFS read schedule " + log);
+ return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job, splits, readSchedule,
+ new ReadsKeyValueParserFactory(kmers, bGenerateReversedKmer));
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
- @Override
- public JobSpecification generateJob() throws HyracksException {
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
- JobSpecification jobSpec = new JobSpecification();
- readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- null, ByteSerializerDeserializer.INSTANCE });
- combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- null, ByteSerializerDeserializer.INSTANCE,
- ByteSerializerDeserializer.INSTANCE });
- jobSpec.setFrameSize(frameSize);
+ JobSpecification jobSpec = new JobSpecification();
+ readOutputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] { null, ByteSerializerDeserializer.INSTANCE });
+ combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
+ ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
+ jobSpec.setFrameSize(frameSize);
- // File input
- HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+ // File input
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
- logDebug("Read Operator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
- readOperator, ncNodeNames);
+ logDebug("Read Operator");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
- generateDescriptorbyType(jobSpec);
- logDebug("SingleGroupby Operator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
- singleGrouper, ncNodeNames);
+ generateDescriptorbyType(jobSpec);
+ logDebug("SingleGroupby Operator");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, singleGrouper, ncNodeNames);
- IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
- jobSpec);
- jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
+ IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
+ jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
- logDebug("CrossGrouper Operator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
- crossGrouper, ncNodeNames);
- jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
+ logDebug("CrossGrouper Operator");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, crossGrouper, ncNodeNames);
+ jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
- // Output
- ITupleWriterFactory writer = null;
- switch (outputFormat) {
- case TEXT:
- writer = new KMerTextWriterFactory(kmers);
- break;
- case BINARY:
- default:
- writer = new KMerSequenceWriterFactory(job);
- break;
- }
- HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(
- jobSpec, job, writer);
+ // Output
+ ITupleWriterFactory writer = null;
+ switch (outputFormat) {
+ case TEXT:
+ writer = new KMerTextWriterFactory(kmers);
+ break;
+ case BINARY:
+ default:
+ writer = new KMerSequenceWriterFactory(job);
+ break;
+ }
+ HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, writer);
- logDebug("WriteOperator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
- writeOperator, ncNodeNames);
+ logDebug("WriteOperator");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames);
- IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(
- jobSpec);
- jobSpec.connect(printConn, crossGrouper, 0, writeOperator, 0);
- jobSpec.addRoot(writeOperator);
+ IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
+ jobSpec.connect(printConn, crossGrouper, 0, writeOperator, 0);
+ jobSpec.addRoot(writeOperator);
- if (groupbyType == GroupbyType.PRECLUSTER) {
- jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- }
- return jobSpec;
- }
+ if (groupbyType == GroupbyType.PRECLUSTER) {
+ jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ }
+ return jobSpec;
+ }
- @Override
- protected void initJobConfiguration() {
+ @Override
+ protected void initJobConfiguration() {
- kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
- if (kmers % 2 == 0){
- kmers--;
- conf.setInt(GenomixJob.KMER_LENGTH, kmers);
- }
- frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT,
- GenomixJob.DEFAULT_FRAME_LIMIT);
- tableSize = conf.getInt(GenomixJob.TABLE_SIZE,
- GenomixJob.DEFAULT_TABLE_SIZE);
- frameSize = conf.getInt(GenomixJob.FRAME_SIZE,
- GenomixJob.DEFAULT_FRAME_SIZE);
- inputSizeInRawRecords = conf.getLong(
- GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
- inputSizeInUniqueKeys = conf.getLong(
- GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
- recordSizeInBytes = conf.getInt(
- GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
- hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
- /** here read the different recordSize why ? */
- recordSizeInBytes = conf.getInt(
- GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
+ kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+ if (kmers % 2 == 0) {
+ kmers--;
+ conf.setInt(GenomixJob.KMER_LENGTH, kmers);
+ }
+ frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
+ tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
+ frameSize = conf.getInt(GenomixJob.FRAME_SIZE, GenomixJob.DEFAULT_FRAME_SIZE);
+ inputSizeInRawRecords = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
+ inputSizeInUniqueKeys = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
+ recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
+ hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
+ /** here read the different recordSize why ? */
+ recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
- bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER,
- GenomixJob.DEFAULT_REVERSED);
+ bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER, GenomixJob.DEFAULT_REVERSED);
- String type = conf.get(GenomixJob.GROUPBY_TYPE,
- GenomixJob.DEFAULT_GROUPBY_TYPE);
- if (type.equalsIgnoreCase("external")) {
- groupbyType = GroupbyType.EXTERNAL;
- } else if (type.equalsIgnoreCase("precluster")) {
- groupbyType = GroupbyType.PRECLUSTER;
- } else {
- groupbyType = GroupbyType.HYBRIDHASH;
- }
+ String type = conf.get(GenomixJob.GROUPBY_TYPE, GenomixJob.DEFAULT_GROUPBY_TYPE);
+ if (type.equalsIgnoreCase("external")) {
+ groupbyType = GroupbyType.EXTERNAL;
+ } else if (type.equalsIgnoreCase("precluster")) {
+ groupbyType = GroupbyType.PRECLUSTER;
+ } else {
+ groupbyType = GroupbyType.HYBRIDHASH;
+ }
- String output = conf.get(GenomixJob.OUTPUT_FORMAT,
- GenomixJob.DEFAULT_OUTPUT_FORMAT);
- if (output.equalsIgnoreCase("text")) {
- outputFormat = OutputFormat.TEXT;
- } else {
- outputFormat = OutputFormat.BINARY;
- }
- job = new JobConf(conf);
- LOG.info("Genomix Graph Build Configuration");
- LOG.info("Kmer:" + kmers);
- LOG.info("Groupby type:" + type);
- LOG.info("Output format:" + output);
- LOG.info("Frame limit" + frameLimits);
- LOG.info("Frame size" + frameSize);
- }
+ String output = conf.get(GenomixJob.OUTPUT_FORMAT, GenomixJob.DEFAULT_OUTPUT_FORMAT);
+ if (output.equalsIgnoreCase("text")) {
+ outputFormat = OutputFormat.TEXT;
+ } else {
+ outputFormat = OutputFormat.BINARY;
+ }
+ job = new JobConf(conf);
+ LOG.info("Genomix Graph Build Configuration");
+ LOG.info("Kmer:" + kmers);
+ LOG.info("Groupby type:" + type);
+ LOG.info("Output format:" + output);
+ LOG.info("Frame limit" + frameLimits);
+ LOG.info("Frame size" + frameSize);
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
index 39cc6fc..a88fa79 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.job;
import org.apache.hadoop.mapred.InputSplit;
@@ -21,6 +36,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
@@ -34,146 +50,122 @@
import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
public class JobGenStatistic extends JobGen {
- private int kmers;
- private JobConf hadoopjob;
- private RecordDescriptor readOutputRec;
- private String[] ncNodeNames;
- private Scheduler scheduler;
- private RecordDescriptor combineOutputRec;
-
+ private int kmers;
+ private JobConf hadoopjob;
+ private RecordDescriptor readOutputRec;
+ private String[] ncNodeNames;
+ private Scheduler scheduler;
+ private RecordDescriptor combineOutputRec;
- public JobGenStatistic(GenomixJob job) {
- super(job);
- // TODO Auto-generated constructor stub
- }
+ public JobGenStatistic(GenomixJob job) {
+ super(job);
+ // TODO Auto-generated constructor stub
+ }
- @Override
- protected void initJobConfiguration() {
- // TODO Auto-generated method stub
- kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
- hadoopjob = new JobConf(conf);
- hadoopjob.setInputFormat(SequenceFileInputFormat.class);
- }
+ @Override
+ protected void initJobConfiguration() {
+ // TODO Auto-generated method stub
+ kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+ hadoopjob = new JobConf(conf);
+ hadoopjob.setInputFormat(SequenceFileInputFormat.class);
+ }
- @Override
- public JobSpecification generateJob() throws HyracksException {
- // TODO Auto-generated method stub
- int[] degreeFields = { 0 };
- int[] countFields = { 1 };
- JobSpecification jobSpec = new JobSpecification();
- /** specify the record fields after read */
- readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- null, ByteSerializerDeserializer.INSTANCE,ByteSerializerDeserializer.INSTANCE });
- combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- null, ByteSerializerDeserializer.INSTANCE });
- /** the reader */
- HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
- readOperator, ncNodeNames);
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+ int[] degreeFields = { 0, 1 }; // indegree, outdegree
+ int[] countFields = { 2 };
+ JobSpecification jobSpec = new JobSpecification();
+ /** specify the record fields after read */
+ readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
+ ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
+ combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
+ ByteSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ /** the reader */
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
- /** the combiner aggregator */
- AbstractOperatorDescriptor degreeLocal = connectLocalAggregateByField(
- jobSpec, degreeFields, readOperator);
- AbstractOperatorDescriptor countLocal = connectLocalAggregateByField(
- jobSpec, countFields, readOperator);
+ /** the combiner aggregator */
+ AbstractOperatorDescriptor degreeLocal = connectLocalAggregateByField(jobSpec, degreeFields, readOperator);
+ AbstractOperatorDescriptor countLocal = connectLocalAggregateByField(jobSpec, countFields, readOperator);
- /** the final aggregator */
- AbstractOperatorDescriptor degreeMerger = connectFinalAggregateByField(
- jobSpec, degreeFields, degreeLocal);
- AbstractOperatorDescriptor countMerger = connectFinalAggregateByField(
- jobSpec, countFields, countLocal);
-
- /** writer */
- AbstractFileWriteOperatorDescriptor writeDegree = connectWriter(
- jobSpec, degreeFields, degreeMerger);
- AbstractFileWriteOperatorDescriptor writeCount = connectWriter(
- jobSpec, countFields, countMerger);
- jobSpec.addRoot(writeDegree);
- jobSpec.addRoot(writeCount);
- return null;
- }
+ /** the final aggregator */
+ AbstractOperatorDescriptor degreeMerger = connectFinalAggregateByField(jobSpec, degreeFields, degreeLocal);
+ AbstractOperatorDescriptor countMerger = connectFinalAggregateByField(jobSpec, countFields, countLocal);
- private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec)
- throws HyracksDataException {
- try {
+ /** writer */
+ AbstractFileWriteOperatorDescriptor writeDegree = connectWriter(jobSpec, degreeFields, degreeMerger);
+ AbstractFileWriteOperatorDescriptor writeCount = connectWriter(jobSpec, countFields, countMerger);
+ jobSpec.addRoot(writeDegree);
+ jobSpec.addRoot(writeCount);
+ return null;
+ }
- InputSplit[] splits = hadoopjob.getInputFormat().getSplits(
- hadoopjob, ncNodeNames.length);
+ private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+ try {
- String[] readSchedule = scheduler.getLocationConstraints(splits);
- return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec,
- hadoopjob, splits, readSchedule,
- new StatReadsKeyValueParserFactory());
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
+ InputSplit[] splits = hadoopjob.getInputFormat().getSplits(hadoopjob, ncNodeNames.length);
- private ExternalGroupOperatorDescriptor newExternalGroupby(
- JobSpecification jobSpec, int[] keyFields,
- IAggregatorDescriptorFactory aggeragater) {
- return new ExternalGroupOperatorDescriptor(jobSpec, keyFields,
- GenomixJob.DEFAULT_FRAME_LIMIT, new IBinaryComparatorFactory[] {
- new ByteComparatorFactory() }, null, aggeragater,
- new StatSumAggregateFactory(),
- combineOutputRec, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- new ByteComparatorFactory() }),
- GenomixJob.DEFAULT_TABLE_SIZE), true);
- }
+ String[] readSchedule = scheduler.getLocationConstraints(splits);
+ return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, hadoopjob, splits, readSchedule,
+ new StatReadsKeyValueParserFactory());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
- private AbstractOperatorDescriptor connectLocalAggregateByField(
- JobSpecification jobSpec, int[] fields,
- HDFSReadOperatorDescriptor readOperator) {
- AbstractOperatorDescriptor localAggregator = newExternalGroupby(
- jobSpec, fields, new StatCountAggregateFactory());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
- localAggregator, ncNodeNames);
- IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
- jobSpec);
- jobSpec.connect(readfileConn, readOperator, 0, localAggregator, 0);
- return localAggregator;
- }
+ private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+ IAggregatorDescriptorFactory aggeragater) {
+ return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, GenomixJob.DEFAULT_FRAME_LIMIT,
+ new IBinaryComparatorFactory[] { new ByteComparatorFactory(), new ByteComparatorFactory() }, null,
+ aggeragater, new StatSumAggregateFactory(), combineOutputRec, new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ new ByteComparatorFactory(), new ByteComparatorFactory() }),
+ GenomixJob.DEFAULT_TABLE_SIZE), true);
+ }
- private AbstractOperatorDescriptor connectFinalAggregateByField(JobSpecification jobSpec,
- int[] fields, AbstractOperatorDescriptor localAggregator) {
- AbstractOperatorDescriptor finalAggregator = newExternalGroupby(
- jobSpec, fields, new StatSumAggregateFactory());
- // only need one reducer
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
- finalAggregator, ncNodeNames[fields[0] % ncNodeNames.length]);
- IConnectorDescriptor mergeConn = new MToNPartitioningMergingConnectorDescriptor(
- jobSpec,
- new ITuplePartitionComputerFactory(){
- private static final long serialVersionUID = 1L;
- @Override
- public ITuplePartitionComputer createPartitioner() {
- return new ITuplePartitionComputer(){
- @Override
- public int partition(IFrameTupleAccessor accessor,
- int tIndex, int nParts)
- throws HyracksDataException {
- return 0;
- }
- };
- }
- },
- fields,
- new IBinaryComparatorFactory[]{new ByteComparatorFactory()});
- jobSpec.connect(mergeConn, localAggregator, 0, finalAggregator, 0);
- return finalAggregator;
- }
-
- private AbstractFileWriteOperatorDescriptor connectWriter(JobSpecification jobSpec, int [] fields, AbstractOperatorDescriptor finalAggregator){
- LineFileWriteOperatorDescriptor writeOperator = new LineFileWriteOperatorDescriptor(
- jobSpec, null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
- writeOperator, ncNodeNames[fields[0] % ncNodeNames.length]);
+ private AbstractOperatorDescriptor connectLocalAggregateByField(JobSpecification jobSpec, int[] fields,
+ HDFSReadOperatorDescriptor readOperator) {
+ AbstractOperatorDescriptor localAggregator = newExternalGroupby(jobSpec, fields,
+ new StatCountAggregateFactory());
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, localAggregator, ncNodeNames);
+ IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
+ jobSpec.connect(readfileConn, readOperator, 0, localAggregator, 0);
+ return localAggregator;
+ }
- IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(
- jobSpec);
- jobSpec.connect(printConn, finalAggregator, 0, writeOperator, 0);
- return writeOperator;
- }
+ private AbstractOperatorDescriptor connectFinalAggregateByField(JobSpecification jobSpec, int[] fields,
+ AbstractOperatorDescriptor localAggregator) {
+ AbstractOperatorDescriptor finalAggregator = newExternalGroupby(jobSpec, fields, new StatSumAggregateFactory());
+ // only need one reducer
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, finalAggregator, ncNodeNames[fields[0]
+ % ncNodeNames.length]);
+ IConnectorDescriptor mergeConn = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+ new ITuplePartitionComputerFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ return new ITuplePartitionComputer() {
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+ throws HyracksDataException {
+ return 0;
+ }
+ };
+ }
+ }, fields, new IBinaryComparatorFactory[] { new ByteComparatorFactory() });
+ jobSpec.connect(mergeConn, localAggregator, 0, finalAggregator, 0);
+ return finalAggregator;
+ }
+
+ private AbstractFileWriteOperatorDescriptor connectWriter(JobSpecification jobSpec, int[] fields,
+ AbstractOperatorDescriptor finalAggregator) {
+ LineFileWriteOperatorDescriptor writeOperator = new LineFileWriteOperatorDescriptor(jobSpec, null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames[fields[0]
+ % ncNodeNames.length]);
+
+ IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
+ jobSpec.connect(printConn, finalAggregator, 0, writeOperator, 0);
+ return writeOperator;
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
index b469be0..832966b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.util;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
@@ -7,34 +22,33 @@
public class ByteComparatorFactory implements IBinaryComparatorFactory, IBinaryHashFunctionFactory {
- /**
+ /**
*
*/
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- @Override
- public IBinaryComparator createBinaryComparator() {
- return new IBinaryComparator(){
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
- int l2) {
- return b1[s1]-b2[s2];
- }
-
- };
- }
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return b1[s1] - b2[s2];
+ }
- @Override
- public IBinaryHashFunction createBinaryHashFunction() {
- return new IBinaryHashFunction(){
+ };
+ }
- @Override
- public int hash(byte[] bytes, int offset, int length) {
- return bytes[offset];
- }
-
- };
- }
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction() {
+ return new IBinaryHashFunction() {
+
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ return bytes[offset];
+ }
+
+ };
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
index 49a7453..65303a8 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.util;
import java.io.DataOutput;
@@ -13,109 +28,102 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-public class StatCountAggregateFactory implements
- IAggregatorDescriptorFactory {
+public class StatCountAggregateFactory implements IAggregatorDescriptorFactory {
- /**
+ /**
*
*/
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- public class CountAggregator implements IAggregatorDescriptor {
+ public class CountAggregator implements IAggregatorDescriptor {
+ private final int[] keyFields;
- @Override
- public AggregateState createAggregateStates() {
- // TODO Auto-generated method stub
- return null;
- }
+ public CountAggregator(int[] keyFields) {
+ this.keyFields = keyFields;
+ }
- @Override
- public void init(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- int count = 1;
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeInt(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when initializing the aggregator.");
- }
- }
+ @Override
+ public AggregateState createAggregateStates() {
+ // TODO Auto-generated method stub
+ return null;
+ }
- @Override
- public void reset() {
- // TODO Auto-generated method stub
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ int count = 1;
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when initializing the aggregator.");
+ }
+ }
- }
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- IFrameTupleAccessor stateAccessor, int stateTupleIndex,
- AggregateState state) throws HyracksDataException {
- int count = 1;
+ }
- int statetupleOffset = stateAccessor
- .getTupleStartOffset(stateTupleIndex);
- int countfieldStart = stateAccessor.getFieldStartOffset(
- stateTupleIndex, 1);
- int countoffset = statetupleOffset
- + stateAccessor.getFieldSlotsLength() + countfieldStart;
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ int count = 1;
- byte[] data = stateAccessor.getBuffer().array();
- count += IntegerSerializerDeserializer.getInt(data, countoffset);
- IntegerSerializerDeserializer.putInt(count, data, countoffset);
- }
+ int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+ int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, keyFields.length);
+ int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- int count = getCount(accessor, tIndex);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeInt(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
- }
+ byte[] data = stateAccessor.getBuffer().array();
+ count += IntegerSerializerDeserializer.getInt(data, countoffset);
+ IntegerSerializerDeserializer.putInt(count, data, countoffset);
+ }
- }
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ int count = getCount(accessor, tIndex);
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
- protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
- int countoffset = tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart;
- byte[] data = accessor.getBuffer().array();
+ }
- return IntegerSerializerDeserializer.getInt(data, countoffset);
- }
+ protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, keyFields.length);
+ int countoffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
+ byte[] data = accessor.getBuffer().array();
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- outputPartialResult(tupleBuilder, accessor, tIndex, state);
- }
+ return IntegerSerializerDeserializer.getInt(data, countoffset);
+ }
- @Override
- public void close() {
- // TODO Auto-generated method stub
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ outputPartialResult(tupleBuilder, accessor, tIndex, state);
+ }
- }
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
- }
+ }
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields,
- int[] keyFieldsInPartialResults) throws HyracksDataException {
- // TODO Auto-generated method stub
- return new CountAggregator();
- }
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return new CountAggregator(keyFields);
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
index 561d64a..2fcca67 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
@@ -1,13 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.util;
import java.nio.ByteBuffer;
-import org.apache.hadoop.io.BytesWritable;
-
import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerCountValue;
-import edu.uci.ics.genomix.type.KmerUtil;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -17,70 +30,65 @@
import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
-public class StatReadsKeyValueParserFactory implements IKeyValueParserFactory<BytesWritable,KmerCountValue> {
+public class StatReadsKeyValueParserFactory implements IKeyValueParserFactory<KmerBytesWritable, KmerCountValue> {
- /**
+ /**
*
*/
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- @Override
- public IKeyValueParser<BytesWritable,KmerCountValue> createKeyValueParser(IHyracksTaskContext ctx)
- throws HyracksDataException {
-
- final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
- final ByteBuffer outputBuffer = ctx.allocateFrame();
- final FrameTupleAppender outputAppender = new FrameTupleAppender(
- ctx.getFrameSize());
- outputAppender.reset(outputBuffer, true);
-
- return new IKeyValueParser<BytesWritable,KmerCountValue>(){
+ @Override
+ public IKeyValueParser<KmerBytesWritable, KmerCountValue> createKeyValueParser(IHyracksTaskContext ctx)
+ throws HyracksDataException {
- @Override
- public void open(IFrameWriter writer) throws HyracksDataException {
- // TODO Auto-generated method stub
-
- }
+ final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
+ final ByteBuffer outputBuffer = ctx.allocateFrame();
+ final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outputAppender.reset(outputBuffer, true);
- @Override
- public void parse(BytesWritable key, KmerCountValue value,
- IFrameWriter writer) throws HyracksDataException {
- byte adjMap = value.getAdjBitMap();
- byte count = value.getCount();
- InsertToFrame((byte) (GeneCode.inDegree(adjMap)*10+GeneCode.outDegree(adjMap)),count,writer);
- }
+ return new IKeyValueParser<KmerBytesWritable, KmerCountValue>() {
- @Override
- public void close(IFrameWriter writer) throws HyracksDataException {
- FrameUtils.flushFrame(outputBuffer, writer);
- }
-
- private void InsertToFrame(byte degree, byte count,
- IFrameWriter writer) {
- try {
- tupleBuilder.reset();
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,degree);
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,count);
+ @Override
+ public void open(IFrameWriter writer) throws HyracksDataException {
+ // TODO Auto-generated method stub
- if (!outputAppender.append(
- tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!outputAppender.append(
- tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- throw new IllegalStateException(
- "Failed to copy an record into a frame: the record size is too large.");
- }
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- };
- }
+ }
+
+ @Override
+ public void parse(KmerBytesWritable key, KmerCountValue value, IFrameWriter writer)
+ throws HyracksDataException {
+ byte adjMap = value.getAdjBitMap();
+ byte count = value.getCount();
+ InsertToFrame((byte) (GeneCode.inDegree(adjMap)), (byte) (GeneCode.outDegree(adjMap)), count, writer);
+ }
+
+ @Override
+ public void close(IFrameWriter writer) throws HyracksDataException {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+
+ private void InsertToFrame(byte indegree, byte outdegree, byte count, IFrameWriter writer) {
+ try {
+ tupleBuilder.reset();
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, indegree);
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, outdegree);
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, count);
+
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ throw new IllegalStateException(
+ "Failed to copy an record into a frame: the record size is too large.");
+ }
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ };
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
index ce00667..39ac60a 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.util;
import java.io.DataOutput;
@@ -15,108 +30,102 @@
public class StatSumAggregateFactory implements IAggregatorDescriptorFactory {
- /**
+ /**
*
*/
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- public class DistributeAggregatorDescriptor implements
- IAggregatorDescriptor {
+ public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {
- @Override
- public AggregateState createAggregateStates() {
- // TODO Auto-generated method stub
- return null;
- }
+ private final int[] keyFields;
- protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
- int countoffset = tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart;
- byte[] data = accessor.getBuffer().array();
- return IntegerSerializerDeserializer.getInt(data, countoffset);
- }
+ public DistributeAggregatorDescriptor(int[] keyFields) {
+ this.keyFields = keyFields;
+ }
- @Override
- public void init(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- int count = getCount(accessor, tIndex);
+ @Override
+ public AggregateState createAggregateStates() {
+ // TODO Auto-generated method stub
+ return null;
+ }
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeInt(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when initializing the aggregator.");
- }
- }
+ protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
+ int countoffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
+ byte[] data = accessor.getBuffer().array();
+ return IntegerSerializerDeserializer.getInt(data, countoffset);
+ }
- @Override
- public void reset() {
- // TODO Auto-generated method stub
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ int count = getCount(accessor, tIndex);
- }
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when initializing the aggregator.");
+ }
+ }
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- IFrameTupleAccessor stateAccessor, int stateTupleIndex,
- AggregateState state) throws HyracksDataException {
- int count = getCount(accessor, tIndex);
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
- int statetupleOffset = stateAccessor
- .getTupleStartOffset(stateTupleIndex);
- int countfieldStart = stateAccessor.getFieldStartOffset(
- stateTupleIndex, 1);
- int countoffset = statetupleOffset
- + stateAccessor.getFieldSlotsLength() + countfieldStart;
+ }
- byte[] data = stateAccessor.getBuffer().array();
- count += IntegerSerializerDeserializer.getInt(data, countoffset);
- IntegerSerializerDeserializer.putInt(count, data, countoffset);
- }
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ int count = getCount(accessor, tIndex);
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- int count = getCount(accessor, tIndex);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeInt(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
- }
+ int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+ int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
+ int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
- }
+ byte[] data = stateAccessor.getBuffer().array();
+ count += IntegerSerializerDeserializer.getInt(data, countoffset);
+ IntegerSerializerDeserializer.putInt(count, data, countoffset);
+ }
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- outputPartialResult(tupleBuilder, accessor, tIndex, state);
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ int count = getCount(accessor, tIndex);
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
- }
+ }
- @Override
- public void close() {
- // TODO Auto-generated method stub
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ outputPartialResult(tupleBuilder, accessor, tIndex, state);
- }
+ }
- }
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields,
- int[] keyFieldsInPartialResults) throws HyracksDataException {
- // TODO Auto-generated method stub
- return new DistributeAggregatorDescriptor();
- }
+ }
+
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return new DistributeAggregatorDescriptor(keyFields);
+ }
}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
index bef13b5..847272a 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.example.jobrun;
import java.io.BufferedWriter;
@@ -194,9 +209,10 @@
continue;
}
SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
-
-// KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER));
+
+ // KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
+ GenomixJob.DEFAULT_KMER));
KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
while (reader.next(key, value)) {
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
index 22688e0..aa1f791 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
@@ -24,39 +24,40 @@
public class TestUtils {
/**
* Compare with the sorted expected file.
- * The actual file may not be sorted;
+ * The actual file may not be sorted;
+ *
* @param expectedFile
* @param actualFile
*/
- public static void compareWithSortedResult(File expectedFile, File actualFile) throws Exception{
+ public static void compareWithSortedResult(File expectedFile, File actualFile) throws Exception {
BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
ArrayList<String> actualLines = new ArrayList<String>();
String lineExpected, lineActual;
- try{
- while ( (lineActual = readerActual.readLine())!=null){
- actualLines.add(lineActual);
- }
- Collections.sort(actualLines);
- int num = 1;
- for(String actualLine : actualLines){
- lineExpected = readerExpected.readLine();
- if (lineExpected == null){
- throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
- }
- if ( !equalStrings(lineExpected, actualLine)){
- throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
- + actualLine);
- }
- ++num;
- }
+ try {
+ while ((lineActual = readerActual.readLine()) != null) {
+ actualLines.add(lineActual);
+ }
+ Collections.sort(actualLines);
+ int num = 1;
+ for (String actualLine : actualLines) {
lineExpected = readerExpected.readLine();
- if (lineExpected != null) {
- throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineExpected);
+ if (lineExpected == null) {
+ throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
}
- } finally{
- readerActual.close();
- readerExpected.close();
+ if (!equalStrings(lineExpected, actualLine)) {
+ throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
+ + actualLine);
+ }
+ ++num;
+ }
+ lineExpected = readerExpected.readLine();
+ if (lineExpected != null) {
+ throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineExpected);
+ }
+ } finally {
+ readerActual.close();
+ readerExpected.close();
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphVertex.java
deleted file mode 100644
index 88c5c35..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphVertex.java
+++ /dev/null
@@ -1,309 +0,0 @@
-package edu.uci.ics.genomix.pregelix;
-
-import java.util.Iterator;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForMergeGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForMergeGraphOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.old.Kmer;
-import edu.uci.ics.genomix.type.old.KmerUtil;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ValueStateWritable
- * edgeValue: NullWritable
- * message: LogAlgorithmMessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-public class LogAlgorithmForMergeGraphVertex extends Vertex<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
- public static final String KMER_SIZE = "LogAlgorithmForMergeGraphVertex.kmerSize";
- public static final String ITERATIONS = "MergeGraphVertex.iteration";
- public static int kmerSize = -1;
- private int maxIteration = -1;
-
- private byte[] tmpVertexId;
- private byte[] tmpDestVertexId;
- private BytesWritable destVertexId = new BytesWritable();
- private byte[] mergeChainVertexId;
- private int lengthOfMergeChainVertex;
- private byte tmpVertexValue;
- private ValueStateWritable tmpVal = new ValueStateWritable();
- private LogAlgorithmMessageWritable tmpMsg = new LogAlgorithmMessageWritable();
- /**
- * Log Algorithm for path merge graph
- */
- /**
- * Load KmerSize, MaxIteration
- */
- @Override
- public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
- if(kmerSize == -1)
- kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
- tmpVertexId = GraphVertexOperation.generateValidDataFromBytesWritable(getVertexId());
- tmpVal = getVertexValue();
- if (getSuperstep() == 1) {
- tmpMsg.setChainVertexId(new byte[0]);
- if(GraphVertexOperation.isHeadVertex(tmpVal.getValue())){
- tmpMsg.setMessage(Message.START);
- for(byte x = Kmer.GENE_CODE.A; x<= Kmer.GENE_CODE.T ; x++){
- if((tmpVal.getValue() & (1 << x)) != 0){
- tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(kmerSize, tmpVertexId, 0, tmpVertexId.length, x);
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- sendMsg(destVertexId,tmpMsg);
- }
- }
- voteToHalt();
- }
- if(GraphVertexOperation.isRearVertex(tmpVal.getValue())){
- tmpMsg.setMessage(Message.END);
-
- for(byte x = Kmer.GENE_CODE.A; x<= Kmer.GENE_CODE.T ; x++){
- if(((tmpVal.getValue()>> 4) & (1 << x)) != 0){
- tmpDestVertexId = KmerUtil.shiftKmerWithPreCode(kmerSize, tmpVertexId, 0, tmpVertexId.length, x);
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- sendMsg(destVertexId,tmpMsg);
- }
- }
- voteToHalt();
- }
- if(GraphVertexOperation.isPathVertex(tmpVal.getValue())){
- tmpVal.setState(State.MID_VERTEX);
- setVertexValue(tmpVal);
- }
- /*if(!GraphVertexOperation.isHeadVertex(tmpVal.getValue())
- && !GraphVertexOperation.isRearVertex(tmpVal.getValue())
- && !GraphVertexOperation.isRearVertex(tmpVal.getValue()))
- voteToHalt();*/
- }
- else if(getSuperstep() == 2 && getSuperstep() <= maxIteration){
- while(msgIterator.hasNext()){
- if(!GraphVertexOperation.isPathVertex(tmpVal.getValue())){
- msgIterator.next();
- voteToHalt();
- }
- else{
- tmpMsg = msgIterator.next();
- if(tmpMsg.getMessage() == Message.START &&
- (tmpVal.getState() == State.MID_VERTEX || tmpVal.getState() == State.END_VERTEX)){
- tmpVal.setState(State.START_VERTEX);
- setVertexValue(tmpVal);
- }
- else if(tmpMsg.getMessage() == Message.END && tmpVal.getState() == State.MID_VERTEX){
- tmpVal.setState(State.END_VERTEX);
- setVertexValue(tmpVal);
- voteToHalt();
- }
- else
- voteToHalt();
- }
- }
- }
- //head node sends message to path node
- else if(getSuperstep()%3 == 0 && getSuperstep() <= maxIteration){
- if(tmpVal.getState() == State.TODELETE || tmpVal.getState() == State.KILL_SELF)
- voteToHalt();
- else{
- if(getSuperstep() == 3){
- tmpMsg = new LogAlgorithmMessageWritable();
- if(Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpVal.getValue() & 0x0F)) == -1)
- voteToHalt();
- else{
- tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(kmerSize, tmpVertexId,
- 0, tmpVertexId.length,
- Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpVal.getValue() & 0x0F)));
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- if(tmpVal.getState() == State.START_VERTEX){
- tmpMsg.setMessage(Message.START);
- tmpMsg.setSourceVertexId(getVertexId().getBytes());
- sendMsg(destVertexId, tmpMsg);
- voteToHalt();
- }
- else if(tmpVal.getState() != State.END_VERTEX && tmpVal.getState() != State.FINAL_DELETE){
- tmpMsg.setMessage(Message.NON);
- tmpMsg.setSourceVertexId(getVertexId().getBytes());
- sendMsg(destVertexId,tmpMsg);
- voteToHalt();
- }
- }
- }
- else{
- if(msgIterator.hasNext()){
- tmpMsg = msgIterator.next();
- byte[] lastKmer = KmerUtil.getLastKmerFromChain(kmerSize,
- tmpVal.getLengthOfMergeChain(),
- tmpVal.getMergeChain(),
- 0, tmpVal.getMergeChain().length);
- if(Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpVal.getValue() & 0x0F)) == -1 || lastKmer == null)
- voteToHalt();
- else{
- tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(kmerSize, lastKmer,
- 0, lastKmer.length,
- Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpVal.getValue() & 0x0F)));
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- if(tmpVal.getState() == State.START_VERTEX){
- tmpMsg.setMessage(Message.START);
- tmpMsg.setSourceVertexId(getVertexId().getBytes());
- sendMsg(destVertexId, tmpMsg);
- voteToHalt();
- }
- else if(tmpVal.getState() != State.END_VERTEX && tmpVal.getState() != State.FINAL_DELETE){
- tmpMsg.setMessage(Message.NON);
- tmpMsg.setSourceVertexId(getVertexId().getBytes());
- sendMsg(destVertexId,tmpMsg);
- }
- }
- }
- }
- }
- }
-
- //path node sends message back to head node
- else if(getSuperstep()%3 == 1 && getSuperstep() <= maxIteration){
- if(tmpVal.getState() == State.TODELETE || tmpVal.getState() == State.KILL_SELF)
- voteToHalt();
- else{
- if(msgIterator.hasNext()){
- tmpMsg = msgIterator.next();
- int message = tmpMsg.getMessage();
- if(tmpVal.getLengthOfMergeChain() == 0){
- tmpVal.setLengthOfMergeChain(kmerSize);
- tmpVal.setMergeChain(tmpVertexId);
- setVertexValue(tmpVal);
- }
- tmpMsg.setLengthOfChain(tmpVal.getLengthOfMergeChain());
- tmpMsg.setChainVertexId(tmpVal.getMergeChain());
-
- tmpMsg.setNeighberInfo(tmpVal.getValue()); //set neighber
- tmpMsg.setSourceVertexState(tmpVal.getState());
-
- //kill Message because it has been merged by the head
- if(tmpVal.getState() == State.END_VERTEX || tmpVal.getState() == State.FINAL_DELETE){
- tmpMsg.setMessage(Message.END);
- tmpVal.setState(State.FINAL_DELETE);
- setVertexValue(tmpVal);
- //deleteVertex(getVertexId());
- }
- else
- tmpMsg.setMessage(Message.NON);
-
- if(message == Message.START){
- tmpVal.setState(State.TODELETE);
- setVertexValue(tmpVal);
- }
- destVertexId.set(tmpMsg.getSourceVertexId(), 0, tmpMsg.getSourceVertexId().length);
- sendMsg(destVertexId,tmpMsg);
- //voteToHalt();
- }
- else{
- if(getVertexValue().getState() != State.START_VERTEX //&& getVertexValue().getState() != State.NON_EXIST
- && getVertexValue().getState() != State.END_VERTEX && getVertexValue().getState() != State.FINAL_DELETE){
- tmpVal.setState(State.KILL_SELF);
- setVertexValue(tmpVal);
- voteToHalt();
- //deleteVertex(getVertexId()); //killSelf because it doesn't receive any message
- }
- }
- }
- }
- else if(getSuperstep()%3 == 2 && getSuperstep() <= maxIteration){
- if(tmpVal.getState() == State.TODELETE || tmpVal.getState() == State.KILL_SELF)
- voteToHalt(); //deleteVertex(getVertexId()); //killSelf
- else{
- if(msgIterator.hasNext()){
- tmpMsg = msgIterator.next();
-
- if(tmpMsg.getMessage() == Message.END){
- if(tmpVal.getState() != State.START_VERTEX)
- tmpVal.setState(State.END_VERTEX);
- else
- tmpVal.setState(State.FINAL_VERTEX);
- }
-
- if(getSuperstep() == 5){
- lengthOfMergeChainVertex = kmerSize;
- mergeChainVertexId = tmpVertexId;
- }
- else{
- lengthOfMergeChainVertex = tmpVal.getLengthOfMergeChain();
- mergeChainVertexId = tmpVal.getMergeChain();
- }
- byte[] tmplastKmer = KmerUtil.getLastKmerFromChain(tmpMsg.getLengthOfChain() - kmerSize + 1,
- tmpMsg.getLengthOfChain(), tmpMsg.getChainVertexId(),0, tmpMsg.getChainVertexId().length);
- mergeChainVertexId = KmerUtil.mergeTwoKmer(lengthOfMergeChainVertex,
- mergeChainVertexId,
- 0, mergeChainVertexId.length,
- tmpMsg.getLengthOfChain() - kmerSize + 1,
- tmplastKmer, 0, tmplastKmer.length);
- lengthOfMergeChainVertex = lengthOfMergeChainVertex + tmpMsg.getLengthOfChain()
- - kmerSize + 1;
- tmpVal.setLengthOfMergeChain(lengthOfMergeChainVertex);
- tmpVal.setMergeChain(mergeChainVertexId);
-
- tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getValue(),tmpMsg.getNeighberInfo());
- tmpVal.setValue(tmpVertexValue);
- if(tmpMsg.getMessage() != Message.END){
- setVertexValue(tmpVal);
- tmpMsg = new LogAlgorithmMessageWritable(); //reset
- tmpMsg.setNeighberInfo(tmpVertexValue);
- sendMsg(getVertexId(),tmpMsg);
- }
- }
- if(tmpVal.getState() == State.END_VERTEX || tmpVal.getState() == State.FINAL_DELETE)
- voteToHalt();
- if(tmpVal.getState() == State.FINAL_VERTEX){
- //String source = Kmer.recoverKmerFrom(tmpVal.getLengthOfMergeChain(), tmpVal.getMergeChain(), 0, tmpVal.getMergeChain().length);
- voteToHalt();
- }
- }
- }
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(LogAlgorithmForMergeGraphVertex.class.getSimpleName());
- job.setVertexClass(LogAlgorithmForMergeGraphVertex.class);
- /**
- * BinaryInput and BinaryOutput~/
- */
- job.setVertexInputFormatClass(LogAlgorithmForMergeGraphInputFormat.class);
- job.setVertexOutputFormatClass(LogAlgorithmForMergeGraphOutputFormat.class);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- job.setDynamicVertexValueSize(true);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/MergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/MergeGraphVertex.java
deleted file mode 100644
index ec45019..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/MergeGraphVertex.java
+++ /dev/null
@@ -1,191 +0,0 @@
-package edu.uci.ics.genomix.pregelix;
-
-import java.util.Iterator;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.genomix.type.old.Kmer;
-import edu.uci.ics.genomix.type.old.KmerUtil;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.State;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ByteWritable
- * edgeValue: NullWritable
- * message: MessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-public class MergeGraphVertex extends Vertex<BytesWritable, ValueStateWritable, NullWritable, MessageWritable>{
-
- public static final String KMER_SIZE = "MergeGraphVertex.kmerSize";
- public static final String ITERATIONS = "MergeGraphVertex.iteration";
- public static int kmerSize = -1;
- private int maxIteration = -1;
-
- private byte[] tmpVertexId;
- private byte[] tmpDestVertexId;
- private BytesWritable destVertexId = new BytesWritable();
- private BytesWritable tmpChainVertexId = new BytesWritable();
- private ValueStateWritable tmpVertexValue = new ValueStateWritable();
- private MessageWritable tmpMsg = new MessageWritable();
- /**
- * Naive Algorithm for path merge graph
- * @throws Exception
- * @throws
- */
- /**
- * Load KmerSize, MaxIteration
- */
- @Override
- public void compute(Iterator<MessageWritable> msgIterator) {
- if(kmerSize == -1)
- kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
- tmpVertexId = GraphVertexOperation.generateValidDataFromBytesWritable(getVertexId());
- if (getSuperstep() == 1) {
- if(GraphVertexOperation.isHeadVertex(getVertexValue().getValue())){
- tmpMsg.setSourceVertexId(tmpVertexId);
- tmpMsg.setHead(tmpVertexId);
- tmpMsg.setLengthOfChain(0);
- tmpMsg.setChainVertexId(tmpChainVertexId.getBytes());
- for(byte x = Kmer.GENE_CODE.A; x<= Kmer.GENE_CODE.T ; x++){
- if((getVertexValue().getValue() & (1 << x)) != 0){
- tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(kmerSize, tmpVertexId, 0, tmpVertexId.length, x);
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- sendMsg(destVertexId,tmpMsg);
- }
- }
- }
- }
-
- //path node sends message back to head node
- else if(getSuperstep()%2 == 0 && getSuperstep() <= maxIteration){
- if(msgIterator.hasNext()){
- tmpMsg = msgIterator.next();
-
- if(!tmpMsg.isRear()){
- if(getSuperstep() == 2)
- tmpMsg.setHead(tmpVertexId);
- if(GraphVertexOperation.isPathVertex(getVertexValue().getValue())){
- tmpDestVertexId = tmpMsg.getSourceVertexId();
- tmpMsg.setNeighberInfo(getVertexValue().getValue()); //set neighber
- if(tmpMsg.getLengthOfChain() == 0){
- tmpMsg.setLengthOfChain(kmerSize);
- tmpMsg.setChainVertexId(tmpVertexId);
- }
- else{
- String source = Kmer.recoverKmerFrom(kmerSize, tmpVertexId, 0, tmpVertexId.length);
- tmpMsg.setChainVertexId(KmerUtil.mergeKmerWithNextCode(
- tmpMsg.getLengthOfChain(),
- tmpMsg.getChainVertexId(),
- 0, tmpMsg.getChainVertexId().length,
- Kmer.GENE_CODE.getCodeFromSymbol((byte)source.charAt(source.length() - 1))));
- tmpMsg.incrementLength();
- deleteVertex(getVertexId());
- }
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- sendMsg(destVertexId,tmpMsg);
- }
- else if(GraphVertexOperation.isRearVertex(getVertexValue().getValue())){
- if(getSuperstep() == 2)
- voteToHalt();
- else{
- tmpDestVertexId = tmpMsg.getSourceVertexId();
- tmpMsg.setSourceVertexId(tmpVertexId);
- tmpMsg.setRear(true);
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- sendMsg(destVertexId,tmpMsg);
- }
- }
- }
- else{
- tmpVertexValue.setState(State.START_VERTEX);
- tmpVertexValue.setValue(GraphVertexOperation.updateRightNeighberByVertexId(getVertexValue().getValue(),
- tmpMsg.getSourceVertexId(), kmerSize));
- tmpVertexValue.setLengthOfMergeChain(tmpMsg.getLengthOfChain());
- tmpVertexValue.setMergeChain(tmpMsg.getChainVertexId());
- setVertexValue(tmpVertexValue);
- //String source = Kmer.recoverKmerFrom(tmpMsg.getLengthOfChain(), tmpMsg.getChainVertexId(), 0, tmpMsg.getChainVertexId().length);
- //System.out.print("");
- /*try {
-
- GraphVertexOperation.flushChainToFile(tmpMsg.getChainVertexId(),
- tmpMsg.getLengthOfChain(),tmpVertexId);
- } catch (IOException e) { e.printStackTrace(); }*/
- }
- }
- }
- //head node sends message to path node
- else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
- while (msgIterator.hasNext()){
- tmpMsg = msgIterator.next();
- if(!tmpMsg.isRear()){
- byte[] lastKmer = KmerUtil.getLastKmerFromChain(kmerSize,
- tmpMsg.getLengthOfChain(),
- tmpMsg.getChainVertexId(),
- 0, tmpMsg.getChainVertexId().length);
- tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(kmerSize, lastKmer,
- 0, lastKmer.length,
- Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpMsg.getNeighberInfo() & 0x0F)));
-
- tmpMsg.setSourceVertexId(tmpVertexId);
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- sendMsg(destVertexId,tmpMsg);
- }
- else{
- tmpDestVertexId = tmpMsg.getHead();
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- sendMsg(destVertexId,tmpMsg);
- }
- }
- }
- voteToHalt();
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(MergeGraphVertex.class.getSimpleName());
- job.setVertexClass(MergeGraphVertex.class);
- /**
- * BinaryInput and BinaryOutput
- */
- job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
- job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
index 823a984..e1868b1 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
@@ -3,7 +3,6 @@
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -14,12 +13,14 @@
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerCountValue;
public class BinaryVertexInputFormat <I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
extends VertexInputFormat<I, V, E, M>{
/** Uses the SequenceFileInputFormat to do everything */
+ @SuppressWarnings("rawtypes")
protected SequenceFileInputFormat binaryInputFormat = new SequenceFileInputFormat();
/**
@@ -37,7 +38,7 @@
public static abstract class BinaryVertexReader<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
implements VertexReader<I, V, E, M> {
/** Internal line record reader */
- private final RecordReader<BytesWritable,KmerCountValue> lineRecordReader;
+ private final RecordReader<KmerBytesWritable,KmerCountValue> lineRecordReader;
/** Context passed to initialize */
private TaskAttemptContext context;
@@ -47,7 +48,7 @@
* @param recordReader
* Line record reader from SequenceFileInputFormat
*/
- public BinaryVertexReader(RecordReader<BytesWritable, KmerCountValue> recordReader) {
+ public BinaryVertexReader(RecordReader<KmerBytesWritable, KmerCountValue> recordReader) {
this.lineRecordReader = recordReader;
}
@@ -73,7 +74,7 @@
*
* @return Record reader to be used for reading.
*/
- protected RecordReader<BytesWritable,KmerCountValue> getRecordReader() {
+ protected RecordReader<KmerBytesWritable,KmerCountValue> getRecordReader() {
return lineRecordReader;
}
@@ -87,7 +88,8 @@
}
}
- @Override
+ @SuppressWarnings("unchecked")
+ @Override
public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
// Ignore the hint of numWorkers here since we are using SequenceFileInputFormat
// to do this for us
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
index f497f21..1435770 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
@@ -2,7 +2,6 @@
import java.io.IOException;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;
@@ -12,6 +11,7 @@
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -49,7 +49,7 @@
/** Context passed to initialize */
private TaskAttemptContext context;
/** Internal line record writer */
- private final RecordWriter<BytesWritable, ValueStateWritable> lineRecordWriter;
+ private final RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter;
/**
* Initialize with the LineRecordWriter.
@@ -57,7 +57,7 @@
* @param lineRecordWriter
* Line record writer from SequenceFileOutputFormat
*/
- public BinaryVertexWriter(RecordWriter<BytesWritable, ValueStateWritable> lineRecordWriter) {
+ public BinaryVertexWriter(RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter) {
this.lineRecordWriter = lineRecordWriter;
}
@@ -76,7 +76,7 @@
*
* @return Record writer to be used for writing.
*/
- public RecordWriter<BytesWritable, ValueStateWritable> getRecordWriter() {
+ public RecordWriter<KmerBytesWritable, ValueStateWritable> getRecordWriter() {
return lineRecordWriter;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index 2d70fdf..60342a7 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -10,8 +10,8 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import edu.uci.ics.genomix.pregelix.LogAlgorithmForMergeGraphVertex;
-import edu.uci.ics.genomix.pregelix.MergeGraphVertex;
+import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.core.base.IDriver.Plan;
import edu.uci.ics.pregelix.core.driver.Driver;
@@ -60,11 +60,11 @@
for (int i = 1; i < inputs.length; i++)
FileInputFormat.addInputPaths(job, inputs[0]);
FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
- job.getConfiguration().setInt(MergeGraphVertex.KMER_SIZE, options.sizeKmer);
- job.getConfiguration().setInt(LogAlgorithmForMergeGraphVertex.KMER_SIZE, options.sizeKmer);
+ job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
+ job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
if (options.numIteration > 0){
- job.getConfiguration().setLong(MergeGraphVertex.ITERATIONS, options.numIteration);
- //job.getConfiguration().setLong(LogAlgorithmForMergeGraphVertex.ITERATIONS, options.numIteration);
+ job.getConfiguration().setLong(NaiveAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
+ job.getConfiguration().setLong(LogAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
}
return options;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphOutputFormat.java
deleted file mode 100644
index 2b87379..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphOutputFormat.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package edu.uci.ics.genomix.pregelix.format;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.io.VertexWriter;
-
-public class BinaryLoadGraphOutputFormat extends
- BinaryVertexOutputFormat<BytesWritable, ValueStateWritable, NullWritable> {
-
- @Override
- public VertexWriter<BytesWritable, ValueStateWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- RecordWriter<BytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
- return new BinaryLoadGraphVertexWriter(recordWriter);
- }
-
- /**
- * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
- */
- public static class BinaryLoadGraphVertexWriter extends
- BinaryVertexWriter<BytesWritable, ValueStateWritable, NullWritable> {
- public BinaryLoadGraphVertexWriter(RecordWriter<BytesWritable, ValueStateWritable> lineRecordWriter) {
- super(lineRecordWriter);
- }
-
- @Override
- public void writeVertex(Vertex<BytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
- InterruptedException {
- getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
- }
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
similarity index 66%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphInputFormat.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
index 0e74c2d..7898d35 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
@@ -2,7 +2,6 @@
import java.io.IOException;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -12,31 +11,32 @@
import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerCountValue;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexReader;
import edu.uci.ics.pregelix.api.util.BspUtils;
-public class LogAlgorithmForMergeGraphInputFormat extends
- BinaryVertexInputFormat<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
-
+public class LogAlgorithmForPathMergeInputFormat extends
+ BinaryVertexInputFormat<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
/**
* Format INPUT
*/
- @Override
- public VertexReader<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> createVertexReader(
+ @SuppressWarnings("unchecked")
+ @Override
+ public VertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
}
@SuppressWarnings("rawtypes")
class BinaryLoadGraphReader extends
- BinaryVertexReader<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
+ BinaryVertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
private Vertex vertex;
- private BytesWritable vertexId = new BytesWritable();
+ private KmerBytesWritable vertexId = null;
private ValueStateWritable vertexValue = new ValueStateWritable();
- public BinaryLoadGraphReader(RecordReader<BytesWritable,KmerCountValue> recordReader) {
+ public BinaryLoadGraphReader(RecordReader<KmerBytesWritable,KmerCountValue> recordReader) {
super(recordReader);
}
@@ -47,7 +47,7 @@
@SuppressWarnings("unchecked")
@Override
- public Vertex<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> getCurrentVertex() throws IOException,
+ public Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> getCurrentVertex() throws IOException,
InterruptedException {
if (vertex == null)
vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
@@ -55,19 +55,19 @@
vertex.getMsgList().clear();
vertex.getEdges().clear();
-
if(getRecordReader() != null){
/**
* set the src vertex id
*/
-
- vertexId = getRecordReader().getCurrentKey();
+ if(vertexId == null)
+ vertexId = new KmerBytesWritable(getRecordReader().getCurrentKey().getKmerLength());
+ vertexId.set(getRecordReader().getCurrentKey());
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
KmerCountValue kmerCountValue = getRecordReader().getCurrentValue();
- vertexValue.setValue(kmerCountValue.getAdjBitMap());
+ vertexValue.setAdjMap(kmerCountValue.getAdjBitMap());
vertexValue.setState(State.NON_VERTEX);
vertex.setVertexValue(vertexValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
similarity index 60%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphOutputFormat.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
index 865a787..29c5ccb 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
@@ -2,27 +2,26 @@
import java.io.IOException;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import edu.uci.ics.genomix.pregelix.GraphVertexOperation;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.old.Kmer;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
-public class LogAlgorithmForMergeGraphOutputFormat extends
- BinaryVertexOutputFormat<BytesWritable, ValueStateWritable, NullWritable> {
+public class LogAlgorithmForPathMergeOutputFormat extends
+ BinaryVertexOutputFormat<KmerBytesWritable, ValueStateWritable, NullWritable> {
@Override
- public VertexWriter<BytesWritable, ValueStateWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
+ public VertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
- RecordWriter<BytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ @SuppressWarnings("unchecked")
+ RecordWriter<KmerBytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
return new BinaryLoadGraphVertexWriter(recordWriter);
}
@@ -30,19 +29,20 @@
* Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
*/
public static class BinaryLoadGraphVertexWriter extends
- BinaryVertexWriter<BytesWritable, ValueStateWritable, NullWritable> {
+ BinaryVertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> {
- public BinaryLoadGraphVertexWriter(RecordWriter<BytesWritable, ValueStateWritable> lineRecordWriter) {
+ public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter) {
super(lineRecordWriter);
}
@Override
- public void writeVertex(Vertex<BytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
+ public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
InterruptedException {
if(vertex.getVertexValue().getState() != State.FINAL_DELETE
&& vertex.getVertexValue().getState() != State.END_VERTEX
&& vertex.getVertexValue().getState() != State.TODELETE
- && vertex.getVertexValue().getState() != State.KILL_SELF)
+ && vertex.getVertexValue().getState() != State.KILL_SELF
+ && vertex.getVertexValue().getState() != State.NON_EXIST)
getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
similarity index 63%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphInputFormat.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
index 4cd22ac..ca134c0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
@@ -1,35 +1,30 @@
package edu.uci.ics.genomix.pregelix.format;
import java.io.IOException;
-import java.util.logging.FileHandler;
-import java.util.logging.Logger;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerCountValue;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexReader;
import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.pregelix.GraphVertexOperation;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueWritable;
-import edu.uci.ics.genomix.pregelix.log.DataLoadLogFormatter;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat.BinaryVertexReader;
-public class BinaryLoadGraphInputFormat extends
- BinaryVertexInputFormat<BytesWritable, ValueStateWritable, NullWritable, MessageWritable>{
+public class NaiveAlgorithmForPathMergeInputFormat extends
+ BinaryVertexInputFormat<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable>{
/**
* Format INPUT
*/
- @Override
- public VertexReader<BytesWritable, ValueStateWritable, NullWritable, MessageWritable> createVertexReader(
+ @SuppressWarnings("unchecked")
+ @Override
+ public VertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
}
@@ -37,12 +32,12 @@
@SuppressWarnings("rawtypes")
class BinaryLoadGraphReader extends
- BinaryVertexReader<BytesWritable, ValueStateWritable, NullWritable, MessageWritable> {
- private Vertex vertex;
- private BytesWritable vertexId = new BytesWritable();
+ BinaryVertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> {
+ private Vertex vertex;
+ private KmerBytesWritable vertexId = null;
private ValueStateWritable vertexValue = new ValueStateWritable();
- public BinaryLoadGraphReader(RecordReader<BytesWritable,KmerCountValue> recordReader) {
+ public BinaryLoadGraphReader(RecordReader<KmerBytesWritable, KmerCountValue> recordReader) {
super(recordReader);
}
@@ -53,7 +48,7 @@
@SuppressWarnings("unchecked")
@Override
- public Vertex<BytesWritable, ValueStateWritable, NullWritable, MessageWritable> getCurrentVertex() throws IOException,
+ public Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> getCurrentVertex() throws IOException,
InterruptedException {
if (vertex == null)
vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
@@ -66,13 +61,15 @@
/**
* set the src vertex id
*/
+ if(vertexId == null)
+ vertexId = new KmerBytesWritable(getRecordReader().getCurrentKey().getKmerLength());
vertexId.set(getRecordReader().getCurrentKey());
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
KmerCountValue kmerCountValue = getRecordReader().getCurrentValue();
- vertexValue.setValue(kmerCountValue.getAdjBitMap());
+ vertexValue.setAdjMap(kmerCountValue.getAdjBitMap());
vertex.setVertexValue(vertexValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
new file mode 100644
index 0000000..482014e
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.genomix.pregelix.format;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+public class NaiveAlgorithmForPathMergeOutputFormat extends
+ BinaryVertexOutputFormat<KmerBytesWritable, ValueStateWritable, NullWritable> {
+
+ @Override
+ public VertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ @SuppressWarnings("unchecked")
+ RecordWriter<KmerBytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ return new BinaryLoadGraphVertexWriter(recordWriter);
+ }
+
+ /**
+ * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
+ */
+ public static class BinaryLoadGraphVertexWriter extends
+ BinaryVertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> {
+ public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ if(vertex.getVertexValue().getState() != State.NON_EXIST)
+ getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
+ }
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
index 3d07c5b..ac84d8e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
@@ -2,11 +2,12 @@
import java.io.DataInput;
import java.io.DataOutput;
-import java.io.File;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.pregelix.LogAlgorithmForMergeGraphVertex;
+
+import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class LogAlgorithmMessageWritable implements WritableComparable<LogAlgorithmMessageWritable>{
/**
@@ -15,65 +16,55 @@
* chainVertexId stores the chains of connected DNA
* file stores the point to the file that stores the chains of connected DNA
*/
- private byte[] sourceVertexId;
- private byte neighberInfo;
- private int lengthOfChain;
- private byte[] chainVertexId;
- private File file;
+ private VKmerBytesWritable sourceVertexId;
+ private VKmerBytesWritable chainVertexId;
+ private byte adjMap;
private int message;
private int sourceVertexState;
public LogAlgorithmMessageWritable(){
- sourceVertexId = new byte[(LogAlgorithmForMergeGraphVertex.kmerSize-1)/4 + 1];
+ sourceVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
+ chainVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
}
- public void set(byte[] sourceVertexId,byte neighberInfo, byte[] chainVertexId, File file){
- this.sourceVertexId = sourceVertexId;
- this.chainVertexId = chainVertexId;
- this.file = file;
- this.message = 0;
- this.lengthOfChain = 0;
+ public void set(VKmerBytesWritable sourceVertexId, VKmerBytesWritable chainVertexId, byte adjMap, int message, int sourceVertexState){
+ this.sourceVertexId.set(sourceVertexId);
+ this.chainVertexId.set(chainVertexId);
+ this.adjMap = adjMap;
+ this.message = message;
+ this.sourceVertexState = sourceVertexState;
}
public void reset(){
- sourceVertexId = new byte[(LogAlgorithmForMergeGraphVertex.kmerSize-1)/4 + 1];
- neighberInfo = (Byte) null;
- lengthOfChain = 0;
- chainVertexId = null;
+ //sourceVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
+ chainVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
+ adjMap = (byte)0;
message = 0;
sourceVertexState = 0;
}
- public byte[] getSourceVertexId() {
+ public VKmerBytesWritable getSourceVertexId() {
return sourceVertexId;
}
- public void setSourceVertexId(byte[] sourceVertexId) {
- this.sourceVertexId = sourceVertexId;
+ public void setSourceVertexId(VKmerBytesWritable sourceVertexId) {
+ this.sourceVertexId.set(sourceVertexId);
}
- public byte getNeighberInfo() {
- return neighberInfo;
+ public byte getAdjMap() {
+ return adjMap;
}
- public void setNeighberInfo(byte neighberInfo) {
- this.neighberInfo = neighberInfo;
+ public void setAdjMap(byte adjMap) {
+ this.adjMap = adjMap;
}
- public byte[] getChainVertexId() {
+ public VKmerBytesWritable getChainVertexId() {
return chainVertexId;
}
- public void setChainVertexId(byte[] chainVertexId) {
- this.chainVertexId = chainVertexId;
- }
-
- public File getFile() {
- return file;
- }
-
- public void setFile(File file) {
- this.file = file;
+ public void setChainVertexId(VKmerBytesWritable chainVertexId) {
+ this.chainVertexId.set(chainVertexId);
}
public int getMessage() {
@@ -93,84 +84,48 @@
}
public int getLengthOfChain() {
- return lengthOfChain;
- }
-
- public void setLengthOfChain(int lengthOfChain) {
- this.lengthOfChain = lengthOfChain;
- }
-
- public void incrementLength(){
- this.lengthOfChain++;
+ return chainVertexId.getKmerLength();
}
@Override
public void write(DataOutput out) throws IOException {
- // TODO Auto-generated method stub
- out.writeInt(lengthOfChain);
- if(lengthOfChain != 0)
- out.write(chainVertexId);
-
+ sourceVertexId.write(out);
+ chainVertexId.write(out);
+ out.write(adjMap);
out.writeInt(message);
out.writeInt(sourceVertexState);
-
- out.write(sourceVertexId);
- out.write(neighberInfo);
}
@Override
public void readFields(DataInput in) throws IOException {
- // TODO Auto-generated method stub
- lengthOfChain = in.readInt();
- if(lengthOfChain > 0){
- chainVertexId = new byte[(lengthOfChain-1)/4 + 1];
- in.readFully(chainVertexId);
- }
- else
- chainVertexId = new byte[0];
-
+ sourceVertexId.readFields(in);
+ chainVertexId.readFields(in);
+ adjMap = in.readByte();
message = in.readInt();
sourceVertexState = in.readInt();
-
- sourceVertexId = new byte[(LogAlgorithmForMergeGraphVertex.kmerSize-1)/4 + 1];
- in.readFully(sourceVertexId);
- neighberInfo = in.readByte();
}
- @Override
+ @Override
public int hashCode() {
- int hashCode = 0;
- for(int i = 0; i < chainVertexId.length; i++)
- hashCode = (int)chainVertexId[i];
- return hashCode;
+ return chainVertexId.hashCode();
}
+
@Override
public boolean equals(Object o) {
- if (o instanceof LogAlgorithmMessageWritable) {
+ if (o instanceof NaiveAlgorithmMessageWritable) {
LogAlgorithmMessageWritable tp = (LogAlgorithmMessageWritable) o;
- return chainVertexId == tp.chainVertexId && file == tp.file;
+ return chainVertexId.equals(tp.chainVertexId);
}
return false;
}
+
@Override
public String toString() {
- return chainVertexId.toString() + "\t" + file.getAbsolutePath();
+ return chainVertexId.toString();
}
- @Override
+
+ @Override
public int compareTo(LogAlgorithmMessageWritable tp) {
- // TODO Auto-generated method stub
- int cmp;
- if (chainVertexId == tp.chainVertexId)
- cmp = 0;
- else
- cmp = 1;
- if (cmp != 0)
- return cmp;
- if (file == tp.file)
- return 0;
- else
- return 1;
+ return chainVertexId.compareTo(tp.chainVertexId);
}
-
-
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
deleted file mode 100644
index 3872514..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ /dev/null
@@ -1,167 +0,0 @@
-package edu.uci.ics.genomix.pregelix.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.pregelix.MergeGraphVertex;
-
-public class MessageWritable implements WritableComparable<MessageWritable>{
- /**
- * sourceVertexId stores source vertexId when headVertex sends the message
- * stores neighber vertexValue when pathVertex sends the message
- * chainVertexId stores the chains of connected DNA
- * file stores the point to the file that stores the chains of connected DNA
- */
- private byte[] sourceVertexId;
- private byte neighberInfo;
- private byte[] chainVertexId;
- private File file;
- private boolean isRear;
- private int lengthOfChain;
- private byte[] head;
-
- public MessageWritable(){
- }
-
- public void set(byte[] sourceVertexId, byte neighberInfo, byte[] chainVertexId, File file, byte[] head){
- this.sourceVertexId = sourceVertexId;
- this.neighberInfo = neighberInfo;
- this.chainVertexId = chainVertexId;
- this.file = file;
- this.isRear = false;
- this.lengthOfChain = 0;
- this.head = head;
- }
-
- public byte[] getSourceVertexId() {
- return sourceVertexId;
- }
-
- public void setSourceVertexId(byte[] sourceVertexId) {
- this.sourceVertexId = sourceVertexId;
- }
-
- public byte getNeighberInfo() {
- return neighberInfo;
- }
-
- public void setNeighberInfo(byte neighberInfo) {
- this.neighberInfo = neighberInfo;
- }
-
- public byte[] getChainVertexId() {
- return chainVertexId;
- }
-
- public void setChainVertexId(byte[] chainVertexId) {
- this.chainVertexId = chainVertexId;
- }
-
- public File getFile() {
- return file;
- }
-
- public void setFile(File file) {
- this.file = file;
- }
-
- public boolean isRear() {
- return isRear;
- }
-
- public void setRear(boolean isRear) {
- this.isRear = isRear;
- }
-
- public int getLengthOfChain() {
- return lengthOfChain;
- }
-
- public void setLengthOfChain(int lengthOfChain) {
- this.lengthOfChain = lengthOfChain;
- }
-
-
- public byte[] getHead() {
- return head;
- }
-
- public void setHead(byte[] head) {
- this.head = head;
- }
-
- public void incrementLength(){
- this.lengthOfChain++;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- // TODO Auto-generated method stub
- out.writeInt(lengthOfChain);
- if(lengthOfChain != 0)
- out.write(chainVertexId);
- out.write(sourceVertexId);
- out.write(head);
- out.write(neighberInfo);
- out.writeBoolean(isRear);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- // TODO Auto-generated method stub
- lengthOfChain = in.readInt();
- if(lengthOfChain > 0){
- chainVertexId = new byte[(lengthOfChain-1)/4 + 1];
- in.readFully(chainVertexId);
- }
- else
- chainVertexId = new byte[0];
- sourceVertexId = new byte[(MergeGraphVertex.kmerSize-1)/4 + 1];
- in.readFully(sourceVertexId);
- head = new byte[(MergeGraphVertex.kmerSize-1)/4 + 1];
- in.readFully(head);
- neighberInfo = in.readByte();
- isRear = in.readBoolean();
-
- }
-
- @Override
- public int hashCode() {
- int hashCode = 0;
- for(int i = 0; i < chainVertexId.length; i++)
- hashCode = (int)chainVertexId[i];
- return hashCode;
- }
- @Override
- public boolean equals(Object o) {
- if (o instanceof MessageWritable) {
- MessageWritable tp = (MessageWritable) o;
- return chainVertexId == tp.chainVertexId && file == tp.file;
- }
- return false;
- }
- @Override
- public String toString() {
- return chainVertexId.toString() + "\t" + file.getAbsolutePath();
- }
-
- @Override
- public int compareTo(MessageWritable tp) {
- // TODO Auto-generated method stub
- int cmp;
- if (chainVertexId == tp.chainVertexId)
- cmp = 0;
- else
- cmp = 1;
- if (cmp != 0)
- return cmp;
- if (file == tp.file)
- return 0;
- else
- return 1;
- }
-
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
new file mode 100644
index 0000000..55a626d
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
@@ -0,0 +1,125 @@
+package edu.uci.ics.genomix.pregelix.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+
+public class NaiveAlgorithmMessageWritable implements WritableComparable<NaiveAlgorithmMessageWritable>{
+ /**
+ * sourceVertexId stores source vertexId when headVertex sends the message
+ * stores neighber vertexValue when pathVertex sends the message
+ * chainVertexId stores the chains of connected DNA
+ * file stores the point to the file that stores the chains of connected DNA
+ */
+ private KmerBytesWritable sourceVertexId;
+ private VKmerBytesWritable chainVertexId;
+ private KmerBytesWritable headVertexId;
+ private byte adjMap;
+ private boolean isRear;
+
+ public NaiveAlgorithmMessageWritable(){
+ sourceVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+ chainVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+ headVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+ }
+
+ public void set(KmerBytesWritable sourceVertex, VKmerBytesWritable chainVertex, KmerBytesWritable headVertex , byte adjMap, boolean isRear){
+ this.sourceVertexId.set(sourceVertex);
+ this.chainVertexId.set(chainVertex);
+ this.headVertexId.set(headVertex);
+ this.adjMap = adjMap;
+ this.isRear = isRear;
+ }
+
+ public KmerBytesWritable getSourceVertexId() {
+ return sourceVertexId;
+ }
+
+ public void setSourceVertexId(KmerBytesWritable source) {
+ this.sourceVertexId.set(source);
+ }
+
+ public byte getAdjMap() {
+ return adjMap;
+ }
+
+ public void setAdjMap(byte adjMap) {
+ this.adjMap = adjMap;
+ }
+
+ public void setChainVertexId(VKmerBytesWritable chainVertex) {
+ this.chainVertexId.set(chainVertex);
+ }
+
+ public VKmerBytesWritable getChainVertexId() {
+ return chainVertexId;
+ }
+
+ public boolean isRear() {
+ return isRear;
+ }
+
+ public void setRear(boolean isRear) {
+ this.isRear = isRear;
+ }
+
+ public int getLengthOfChain() {
+ return this.chainVertexId.getKmerLength();
+ }
+
+
+ public KmerBytesWritable getHeadVertexId() {
+ return headVertexId;
+ }
+
+ public void setHeadVertexId(KmerBytesWritable headVertexId) {
+ this.headVertexId.set(headVertexId);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ sourceVertexId.write(out);
+ headVertexId.write(out);
+ chainVertexId.write(out);
+ out.write(adjMap);
+ out.writeBoolean(isRear);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ sourceVertexId.readFields(in);
+ headVertexId.readFields(in);
+ chainVertexId.readFields(in);
+ adjMap = in.readByte();
+ isRear = in.readBoolean();
+ }
+
+ @Override
+ public int hashCode() {
+ return chainVertexId.hashCode();
+ }
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof NaiveAlgorithmMessageWritable) {
+ NaiveAlgorithmMessageWritable tp = (NaiveAlgorithmMessageWritable) o;
+ return chainVertexId.equals( tp.chainVertexId);
+ }
+ return false;
+ }
+ @Override
+ public String toString() {
+ return chainVertexId.toString();
+ }
+
+ @Override
+ public int compareTo(NaiveAlgorithmMessageWritable tp) {
+ return chainVertexId.compareTo(tp.chainVertexId);
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
index ffc1d38..769277c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
@@ -4,35 +4,42 @@
import org.apache.hadoop.io.WritableComparable;
+import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.old.Kmer;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class ValueStateWritable implements WritableComparable<ValueStateWritable> {
- private byte value;
+ private byte adjMap;
private int state;
- private int lengthOfMergeChain;
- private byte[] mergeChain;
+ private VKmerBytesWritable mergeChain;
public ValueStateWritable() {
state = State.NON_VERTEX;
- lengthOfMergeChain = 0;
+ mergeChain = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
}
- public ValueStateWritable(byte value, int state, int lengthOfMergeChain, byte[] mergeChain) {
- this.value = value;
+ public ValueStateWritable(byte adjMap, int state, VKmerBytesWritable mergeChain) {
+ this.adjMap = adjMap;
this.state = state;
- this.lengthOfMergeChain = lengthOfMergeChain;
- this.mergeChain = mergeChain;
+ this.mergeChain.set(mergeChain);
+ }
+
+ public void set(byte adjMap, int state, VKmerBytesWritable mergeChain){
+ this.adjMap = adjMap;
+ this.state = state;
+ this.mergeChain.set(mergeChain);
}
- public byte getValue() {
- return value;
+ public byte getAdjMap() {
+ return adjMap;
}
- public void setValue(byte value) {
- this.value = value;
+ public void setAdjMap(byte adjMap) {
+ this.adjMap = adjMap;
}
public int getState() {
@@ -44,43 +51,33 @@
}
public int getLengthOfMergeChain() {
- return lengthOfMergeChain;
+ return mergeChain.getKmerLength();
}
- public void setLengthOfMergeChain(int lengthOfMergeChain) {
- this.lengthOfMergeChain = lengthOfMergeChain;
- }
-
- public byte[] getMergeChain() {
+ public VKmerBytesWritable getMergeChain() {
return mergeChain;
}
- public void setMergeChain(byte[] mergeChain) {
- this.mergeChain = mergeChain;
+ public void setMergeChain(KmerBytesWritable mergeChain) {
+ this.mergeChain.set(mergeChain);
+ }
+
+ public void setMergeChain(VKmerBytesWritable mergeChain) {
+ this.mergeChain.set(mergeChain);
}
@Override
public void readFields(DataInput in) throws IOException {
- value = in.readByte();
+ adjMap = in.readByte();
state = in.readInt();
- lengthOfMergeChain = in.readInt();
- if(lengthOfMergeChain < 0)
- System.out.println();
- if(lengthOfMergeChain != 0){
- mergeChain = new byte[(lengthOfMergeChain-1)/4 + 1];
- in.readFully(mergeChain);
- }
- else
- mergeChain = new byte[0];
+ mergeChain.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
- out.writeByte(value);
+ out.writeByte(adjMap);
out.writeInt(state);
- out.writeInt(lengthOfMergeChain);
- if(lengthOfMergeChain != 0)
- out.write(mergeChain);
+ mergeChain.write(out);
}
@Override
@@ -91,11 +88,11 @@
@Override
public String toString() {
- if(lengthOfMergeChain == 0)
- return Kmer.GENE_CODE.getSymbolFromBitMap(value);
- return Kmer.GENE_CODE.getSymbolFromBitMap(value) + "\t" +
- lengthOfMergeChain + "\t" +
- Kmer.recoverKmerFrom(lengthOfMergeChain, mergeChain, 0, mergeChain.length) + "\t" +
+ if(mergeChain.getKmerLength() == 0)
+ return GeneCode.getSymbolFromBitMap(adjMap);
+ return GeneCode.getSymbolFromBitMap(adjMap) + "\t" +
+ getLengthOfMergeChain() + "\t" +
+ mergeChain.toString() + "\t" +
state;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueWritable.java
deleted file mode 100644
index a3f0b9f..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueWritable.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package edu.uci.ics.genomix.pregelix.io;
-
-import java.io.*;
-
-import org.apache.hadoop.io.WritableComparable;
-
-public class ValueWritable implements WritableComparable<ValueWritable> {
-
- private byte value;
- private int lengthOfMergeChain;
- private byte[] mergeChain;
-
- public ValueWritable() {
- lengthOfMergeChain = 0;
- }
-
- public ValueWritable(byte value, int lengthOfMergeChain, byte[] mergeChain) {
- this.value = value;
- this.lengthOfMergeChain = lengthOfMergeChain;
- this.mergeChain = mergeChain;
- }
-
- public byte getValue() {
- return value;
- }
-
- public void setValue(byte value) {
- this.value = value;
- }
-
- public int getLengthOfMergeChain() {
- return lengthOfMergeChain;
- }
-
- public void setLengthOfMergeChain(int lengthOfMergeChain) {
- this.lengthOfMergeChain = lengthOfMergeChain;
- }
-
- public byte[] getMergeChain() {
- return mergeChain;
- }
-
- public void setMergeChain(byte[] mergeChain) {
- this.mergeChain = mergeChain;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- value = in.readByte();
- lengthOfMergeChain = in.readInt();
- if(lengthOfMergeChain != 0){
- mergeChain = new byte[(lengthOfMergeChain-1)/4 + 1];
- in.readFully(mergeChain);
- }
- else
- mergeChain = new byte[0];
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeByte(value);
- out.writeInt(lengthOfMergeChain);
- if(lengthOfMergeChain != 0)
- out.write(mergeChain);
- }
-
- @Override
- public int compareTo(ValueWritable o) {
- // TODO Auto-generated method stub
- return 0;
- }
-
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
index 7e56a66..6105f18 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
@@ -4,27 +4,22 @@
import java.util.logging.Handler;
import java.util.logging.LogRecord;
-import org.apache.hadoop.io.BytesWritable;
-
-import edu.uci.ics.genomix.type.old.Kmer;
import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class DataLoadLogFormatter extends Formatter{
- private BytesWritable key;
+ private VKmerBytesWritable key;
private KmerCountValue value;
- private int k;
- public void set(BytesWritable key,
- KmerCountValue value, int k){
- this.key = key;
+ public void set(VKmerBytesWritable key,
+ KmerCountValue value){
+ this.key.set(key);
this.value = value;
- this.k = k;
}
public String format(LogRecord record) {
- StringBuilder builder = new StringBuilder(1000);
+ StringBuilder builder = new StringBuilder(1000);
- builder.append(Kmer.recoverKmerFrom(k, key.getBytes(), 0,
- key.getLength())
+ builder.append(key.toString()
+ "\t" + value.toString() + "\r\n");
if(!formatMessage(record).equals(""))
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
index 3af9b6f..d4f03ee 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
@@ -5,7 +5,7 @@
import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.old.Kmer;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class LogAlgorithmLogFormatter extends Formatter {
//
@@ -13,13 +13,11 @@
//
//private static final DateFormat df = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss.SSS");
private long step;
- private byte[] sourceVertexId;
- private byte[] destVertexId;
- private LogAlgorithmMessageWritable msg;
+ private VKmerBytesWritable sourceVertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
+ private LogAlgorithmMessageWritable msg = new LogAlgorithmMessageWritable();
private int state;
- private int k;
- private byte[] mergeChain;
- private int lengthOfMergeChain;
+ private VKmerBytesWritable mergeChain = new VKmerBytesWritable(1);;
//private boolean testDelete = false;
/** 0: general operation
* 1: testDelete
@@ -27,60 +25,57 @@
* 3: testVoteToHalt
*/
private int operation;
+
+ public LogAlgorithmLogFormatter(){
+ }
- public void set(long step, byte[] sourceVertexId,
- byte[] destVertexId, LogAlgorithmMessageWritable msg, int state, int k){
+ public void set(long step, VKmerBytesWritable sourceVertexId,
+ VKmerBytesWritable destVertexId, LogAlgorithmMessageWritable msg, int state){
this.step = step;
- this.sourceVertexId = sourceVertexId;
- this.destVertexId = destVertexId;
+ this.sourceVertexId.set(sourceVertexId);
+ this.destVertexId.set(destVertexId);
this.msg = msg;
this.state = state;
- this.k = k;
this.operation = 0;
}
- public void setMergeChain(long step, byte[] sourceVertexId,
- int lengthOfMergeChain, byte[] mergeChain, int k){
+ public void setMergeChain(long step, VKmerBytesWritable sourceVertexId,
+ VKmerBytesWritable mergeChain){
this.reset();
this.step = step;
- this.sourceVertexId = sourceVertexId;
- this.lengthOfMergeChain = lengthOfMergeChain;
- this.mergeChain = mergeChain;
- this.k = k;
+ this.sourceVertexId.set(sourceVertexId);
+ this.mergeChain.set(mergeChain);
this.operation = 2;
}
- public void setVotoToHalt(long step, byte[] sourceVertexId, int k){
+ public void setVotoToHalt(long step, VKmerBytesWritable sourceVertexId){
this.reset();
this.step = step;
- this.sourceVertexId = sourceVertexId;
- this.k = k;
+ this.sourceVertexId.set(sourceVertexId);
this.operation = 3;
}
public void reset(){
- this.sourceVertexId = null;
- this.destVertexId = null;
- this.msg = null;
+ this.sourceVertexId = new VKmerBytesWritable(1);
+ this.destVertexId = new VKmerBytesWritable(1);
+ this.msg = new LogAlgorithmMessageWritable();
this.state = 0;
- this.k = 0;
- this.mergeChain = null;
- this.lengthOfMergeChain = 0;
+ this.mergeChain = new VKmerBytesWritable(1);
}
public String format(LogRecord record) {
StringBuilder builder = new StringBuilder(1000);
- String source = Kmer.recoverKmerFrom(k, sourceVertexId, 0, sourceVertexId.length);
+ String source = sourceVertexId.toString();
String chain = "";
builder.append("Step: " + step + "\r\n");
builder.append("Source Code: " + source + "\r\n");
if(operation == 0){
- if(destVertexId != null){
- String dest = Kmer.recoverKmerFrom(k, destVertexId, 0, destVertexId.length);
+ if(destVertexId.getKmerLength() != -1){
+ String dest = destVertexId.toString();
builder.append("Send message to " + "\r\n");
builder.append("Destination Code: " + dest + "\r\n");
}
builder.append("Message is: " + Message.MESSAGE_CONTENT.getContentFromCode(msg.getMessage()) + "\r\n");
- if(msg.getLengthOfChain() != 0){
- chain = Kmer.recoverKmerFrom(msg.getLengthOfChain(), msg.getChainVertexId(), 0, msg.getChainVertexId().length);
+ if(msg.getLengthOfChain() != -1){
+ chain = msg.getChainVertexId().toString();
builder.append("Chain Message: " + chain + "\r\n");
builder.append("Chain Length: " + msg.getLengthOfChain() + "\r\n");
}
@@ -88,9 +83,9 @@
builder.append("State is: " + State.STATE_CONTENT.getContentFromCode(state) + "\r\n");
}
if(operation == 2){
- chain = Kmer.recoverKmerFrom(lengthOfMergeChain, mergeChain, 0, mergeChain.length);
+ chain = mergeChain.toString();
builder.append("Merge Chain: " + chain + "\r\n");
- builder.append("Merge Chain Length: " + lengthOfMergeChain + "\r\n");
+ builder.append("Merge Chain Length: " + mergeChain.getKmerLength() + "\r\n");
}
if(operation == 3)
builder.append("Vote to halt!");
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
index c337a16..332d6d0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
@@ -2,8 +2,8 @@
import java.util.logging.*;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.type.old.Kmer;
+import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class NaiveAlgorithmLogFormatter extends Formatter {
//
@@ -11,22 +11,20 @@
//
//private static final DateFormat df = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss.SSS");
private long step;
- private byte[] sourceVertexId;
- private byte[] destVertexId;
- private MessageWritable msg;
- private int k;
+ private VKmerBytesWritable sourceVertexId;
+ private VKmerBytesWritable destVertexId;
+ private NaiveAlgorithmMessageWritable msg;
- public void set(long step, byte[] sourceVertexId,
- byte[] destVertexId, MessageWritable msg, int k){
+ public void set(long step, VKmerBytesWritable sourceVertexId,
+ VKmerBytesWritable destVertexId, NaiveAlgorithmMessageWritable msg){
this.step = step;
- this.sourceVertexId = sourceVertexId;
- this.destVertexId = destVertexId;
+ this.sourceVertexId.set(sourceVertexId);
+ this.destVertexId.set(destVertexId);
this.msg = msg;
- this.k = k;
}
public String format(LogRecord record) {
StringBuilder builder = new StringBuilder(1000);
- String source = Kmer.recoverKmerFrom(k, sourceVertexId, 0, sourceVertexId.length);
+ String source = sourceVertexId.toString();
String chain = "";
@@ -35,11 +33,11 @@
if(destVertexId != null){
builder.append("Send message to " + "\r\n");
- String dest = Kmer.recoverKmerFrom(k, destVertexId, 0, destVertexId.length);
+ String dest = destVertexId.toString();
builder.append("Destination Code: " + dest + "\r\n");
}
if(msg.getLengthOfChain() != 0){
- chain = Kmer.recoverKmerFrom(msg.getLengthOfChain(), msg.getChainVertexId(), 0, msg.getChainVertexId().length);
+ chain = msg.getChainVertexId().toString();
builder.append("Chain Message: " + chain + "\r\n");
builder.append("Chain Length: " + msg.getLengthOfChain() + "\r\n");
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LoadGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LoadGraphVertex.java
similarity index 62%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LoadGraphVertex.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LoadGraphVertex.java
index 580e1fe..6fef3a6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LoadGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LoadGraphVertex.java
@@ -1,16 +1,15 @@
-package edu.uci.ics.genomix.pregelix;
+package edu.uci.ics.genomix.pregelix.operator;
import java.util.Iterator;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -18,7 +17,7 @@
* vertexId: BytesWritable
* vertexValue: ByteWritable
* edgeValue: NullWritable
- * message: MessageWritable
+ * message: NaiveAlgorithmMessageWritable
*
* DNA:
* A: 00
@@ -42,14 +41,13 @@
* The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
* The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
*/
-public class LoadGraphVertex extends Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
+public class LoadGraphVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable>{
/**
* For test, just output original file
*/
@Override
- public void compute(Iterator<MessageWritable> msgIterator) {
- deleteVertex(getVertexId());
+ public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
voteToHalt();
}
@@ -57,15 +55,15 @@
* @param args
*/
public static void main(String[] args) throws Exception {
- //final int k = Integer.parseInt(args[0]);
PregelixJob job = new PregelixJob(LoadGraphVertex.class.getSimpleName());
job.setVertexClass(LoadGraphVertex.class);
/**
* BinaryInput and BinaryOutput
*/
- job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
- job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
- job.setOutputKeyClass(BytesWritable.class);
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
new file mode 100644
index 0000000..fa77d43
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
@@ -0,0 +1,371 @@
+package edu.uci.ics.genomix.pregelix.operator;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ValueStateWritable
+ * edgeValue: NullWritable
+ * message: LogAlgorithmMessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+public class LogAlgorithmForPathMergeVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
+
+ public static final String KMER_SIZE = "LogAlgorithmForPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
+ public static int kmerSize = -1;
+ private int maxIteration = -1;
+
+ private ValueStateWritable vertexVal = new ValueStateWritable();
+
+ private LogAlgorithmMessageWritable msg = new LogAlgorithmMessageWritable();
+
+ private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
+ private VKmerBytesWritable vertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex(){
+ if(kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
+ vertexId.set(getVertexId());
+ vertexVal = getVertexValue();
+ }
+ /**
+ * get destination vertex
+ */
+ public VKmerBytesWritable getNextDestVertexId(VKmerBytesWritable vertexId, byte geneCode){
+ return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+ }
+
+ public VKmerBytesWritable getPreDestVertexId(VKmerBytesWritable vertexId, byte geneCode){
+ return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
+ }
+
+ public VKmerBytesWritable getNextDestVertexIdFromBitmap(VKmerBytesWritable chainVertexId, byte adjMap){
+ return getDestVertexIdFromChain(chainVertexId, adjMap);//GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)
+ }
+
+ public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap){
+ lastKmer.set(kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId));
+ return getNextDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
+ }
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(VKmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if((adjMap & (1 << x)) != 0){
+ destVertexId.set(getNextDestVertexId(vertexId, x));
+ sendMsg(destVertexId, msg);
+ }
+ }
+ }
+ /**
+ * head send message to all previous nodes
+ */
+ public void sendMsgToAllPreviousNodes(VKmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if(((adjMap >> 4) & (1 << x)) != 0){
+ destVertexId.set(getPreDestVertexId(vertexId, x));
+ sendMsg(destVertexId, msg);
+ }
+ }
+ }
+
+ /**
+ * set vertex state
+ */
+ public void setState(){
+ if(msg.getMessage() == Message.START &&
+ (vertexVal.getState() == State.MID_VERTEX || vertexVal.getState() == State.END_VERTEX)){
+ vertexVal.setState(State.START_VERTEX);
+ setVertexValue(vertexVal);
+ }
+ else if(msg.getMessage() == Message.END && vertexVal.getState() == State.MID_VERTEX){
+ vertexVal.setState(State.END_VERTEX);
+ setVertexValue(vertexVal);
+ voteToHalt();
+ }
+ else
+ voteToHalt();
+ }
+ /**
+ * send start message to next node
+ */
+ public void sendStartMsgToNextNode(){
+ msg.setMessage(Message.START);
+ msg.setSourceVertexId(vertexId);
+ sendMsg(destVertexId, msg);
+ voteToHalt();
+ }
+ /**
+ * send end message to next node
+ */
+ public void sendEndMsgToNextNode(){
+ msg.setMessage(Message.END);
+ msg.setSourceVertexId(vertexId);
+ sendMsg(destVertexId, msg);
+ voteToHalt();
+ }
+ /**
+ * send non message to next node
+ */
+ public void sendNonMsgToNextNode(){
+ msg.setMessage(Message.NON);
+ msg.setSourceVertexId(vertexId);
+ sendMsg(destVertexId, msg);
+ }
+ /**
+ * head send message to path
+ */
+ public void sendMsgToPathVertex(VKmerBytesWritable chainVertexId, byte adjMap){
+ if(GeneCode.getGeneCodeFromBitMap((byte)(vertexVal.getAdjMap() & 0x0F)) == -1) //|| lastKmer == null
+ voteToHalt();
+ else{
+ destVertexId.set(getNextDestVertexIdFromBitmap(chainVertexId, adjMap));
+ if(vertexVal.getState() == State.START_VERTEX){
+ sendStartMsgToNextNode();
+ }
+ else if(vertexVal.getState() != State.END_VERTEX && vertexVal.getState() != State.FINAL_DELETE){
+ sendEndMsgToNextNode();
+ }
+ }
+ }
+ /**
+ * path send message to head
+ */
+ public void responseMsgToHeadVertex(){
+ if(vertexVal.getLengthOfMergeChain() == -1){
+ vertexVal.setMergeChain(vertexId);
+ setVertexValue(vertexVal);
+ }
+ msg.set(msg.getSourceVertexId(), vertexVal.getMergeChain(), vertexVal.getAdjMap(), msg.getMessage(), vertexVal.getState());
+ setMessageType(msg.getMessage());
+ destVertexId.set(msg.getSourceVertexId());
+ sendMsg(destVertexId,msg);
+ }
+ /**
+ * set message type
+ */
+ public void setMessageType(int message){
+ //kill Message because it has been merged by the head
+ if(vertexVal.getState() == State.END_VERTEX || vertexVal.getState() == State.FINAL_DELETE){
+ msg.setMessage(Message.END);
+ vertexVal.setState(State.FINAL_DELETE);
+ setVertexValue(vertexVal);
+ //deleteVertex(getVertexId());
+ }
+ else
+ msg.setMessage(Message.NON);
+
+ if(message == Message.START){
+ vertexVal.setState(State.TODELETE);
+ setVertexValue(vertexVal);
+ }
+ }
+ /**
+ * set vertexValue's state chainVertexId, value
+ */
+ public void setVertexValueAttributes(){
+ if(msg.getMessage() == Message.END){
+ if(vertexVal.getState() != State.START_VERTEX)
+ vertexVal.setState(State.END_VERTEX);
+ else
+ vertexVal.setState(State.FINAL_VERTEX);
+ }
+
+ if(getSuperstep() == 5)
+ chainVertexId.set(vertexId);
+ else
+ chainVertexId.set(vertexVal.getMergeChain());
+ lastKmer.set(kmerFactory.getLastKmerFromChain(msg.getLengthOfChain() - kmerSize + 1, msg.getChainVertexId()));
+ chainVertexId.set(kmerFactory.mergeTwoKmer(chainVertexId, lastKmer));
+ vertexVal.setMergeChain(chainVertexId);
+
+ byte tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getAdjMap(), msg.getAdjMap());
+ vertexVal.setAdjMap(tmpVertexValue);
+ }
+ /**
+ * send message to self
+ */
+ public void sendMsgToSelf(){
+ if(msg.getMessage() != Message.END){
+ setVertexValue(vertexVal);
+ msg.reset(); //reset
+ msg.setAdjMap(vertexVal.getAdjMap());
+ sendMsg(vertexId,msg);
+ }
+ }
+ /**
+ * start sending message
+ */
+ public void startSendMsg(){
+ if(GraphVertexOperation.isHeadVertex(vertexVal.getAdjMap())){
+ msg.set(vertexId, chainVertexId, (byte)0, Message.START, State.NON_VERTEX); //msg.set(null, (byte)0, chainVertexId, Message.START, State.NON_VERTEX);
+ sendMsgToAllNextNodes(vertexId, vertexVal.getAdjMap());
+ voteToHalt();
+ }
+ if(GraphVertexOperation.isRearVertex(vertexVal.getAdjMap())){
+ msg.set(vertexId, chainVertexId, (byte)0, Message.END, State.NON_VERTEX);
+ sendMsgToAllPreviousNodes(vertexId, vertexVal.getAdjMap());
+ voteToHalt();
+ }
+ if(GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
+ vertexVal.setState(State.MID_VERTEX);
+ setVertexValue(vertexVal);
+ }
+ }
+ /**
+ * initiate head, rear and path node
+ */
+ public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ while(msgIterator.hasNext()){
+ if(!GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
+ msgIterator.next();
+ voteToHalt();
+ }
+ else{
+ msg = msgIterator.next();
+ setState();
+ }
+ }
+ }
+ /**
+ * head send message to path
+ */
+ public void sendMsgToPathVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ if(getSuperstep() == 3){
+ msg.reset();
+ sendMsgToPathVertex(vertexId, vertexVal.getAdjMap());
+ }
+ else{
+ if(msgIterator.hasNext()){
+ msg = msgIterator.next();
+ sendMsgToPathVertex(vertexVal.getMergeChain(), msg.getAdjMap());
+ }
+ }
+ }
+ /**
+ * path response message to head
+ */
+ public void responseMsgToHeadVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ if(msgIterator.hasNext()){
+ msg = msgIterator.next();
+ responseMsgToHeadVertex();
+ //voteToHalt();
+ }
+ else{
+ if(getVertexValue().getState() != State.START_VERTEX
+ && getVertexValue().getState() != State.END_VERTEX && getVertexValue().getState() != State.FINAL_DELETE){
+ //vertexVal.setState(State.KILL_SELF);
+ //setVertexValue(vertexVal);
+ //voteToHalt();
+ deleteVertex(getVertexId());//killSelf because it doesn't receive any message
+ }
+ }
+ }
+ /**
+ * merge chainVertex and store in vertexVal.chainVertexId
+ */
+ public void mergeChainVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ if(msgIterator.hasNext()){
+ msg = msgIterator.next();
+ setVertexValueAttributes();
+ sendMsgToSelf();
+ }
+ if(vertexVal.getState() == State.END_VERTEX || vertexVal.getState() == State.FINAL_DELETE){
+ voteToHalt();
+ }
+ if(vertexVal.getState() == State.FINAL_VERTEX){
+ //String source = vertexVal.getMergeChain().toString();
+ voteToHalt();
+ }
+ }
+ @Override
+ public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ initVertex();
+ if(vertexVal.getState() != State.NON_EXIST && vertexVal.getState() != State.KILL_SELF){
+ if (getSuperstep() == 1)
+ startSendMsg();
+ else if(getSuperstep() == 2)
+ initState(msgIterator);
+ else if(getSuperstep()%3 == 0 && getSuperstep() <= maxIteration){
+ sendMsgToPathVertex(msgIterator);
+ }
+ else if(getSuperstep()%3 == 1 && getSuperstep() <= maxIteration){
+ responseMsgToHeadVertex(msgIterator);
+ }
+ else if(getSuperstep()%3 == 2 && getSuperstep() <= maxIteration){
+ if(vertexVal.getState() == State.TODELETE){ //|| vertexVal.getState() == State.KILL_SELF)
+ //vertexVal.setState(State.NON_EXIST);
+ //setVertexValue(vertexVal);
+ //voteToHalt();
+ deleteVertex(getVertexId()); //killSelf
+ }
+ else{
+ mergeChainVertex(msgIterator);
+ }
+ }
+ }
+ }
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(LogAlgorithmForPathMergeVertex.class.getSimpleName());
+ job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+ /**
+ * BinaryInput and BinaryOutput~/
+ */
+ job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ job.setDynamicVertexValueSize(true);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
new file mode 100644
index 0000000..8a70bd5
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
@@ -0,0 +1,211 @@
+package edu.uci.ics.genomix.pregelix.operator;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: NaiveAlgorithmMessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class NaiveAlgorithmForPathMergeVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable>{
+
+ public static final String KMER_SIZE = "NaiveAlgorithmForPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
+ public static int kmerSize = -1;
+ private int maxIteration = -1;
+
+ private ValueStateWritable vertexVal = new ValueStateWritable();
+
+ private NaiveAlgorithmMessageWritable msg = new NaiveAlgorithmMessageWritable();
+
+ private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
+ private VKmerBytesWritable vertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
+
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex(){
+ if(kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
+ vertexId.set(getVertexId());
+ vertexVal = getVertexValue();
+ }
+ public void findDestination(){
+ destVertexId.set(msg.getSourceVertexId());
+ }
+ /**
+ * get destination vertex
+ */
+ public VKmerBytesWritable getDestVertexId(VKmerBytesWritable vertexId, byte geneCode){
+ return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+ }
+
+ public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap){
+ lastKmer.set(kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId));
+ return getDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
+ }
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(VKmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if((adjMap & (1 << x)) != 0){
+ destVertexId.set(getDestVertexId(vertexId, x));
+ sendMsg(destVertexId, msg);
+ }
+ }
+ }
+ /**
+ * initiate chain vertex
+ */
+ public void initChainVertex(){
+ if(!msg.isRear()){
+ findDestination();
+ if(GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
+ chainVertexId.set(vertexId);
+ msg.set(vertexId, chainVertexId, vertexId, vertexVal.getAdjMap(), false);
+ sendMsg(destVertexId,msg);
+ }else if(GraphVertexOperation.isRearVertex(vertexVal.getAdjMap()))
+ voteToHalt();
+ }
+ }
+ /**
+ * head node sends message to path node
+ */
+ public void sendMsgToPathVertex(){
+ if(!msg.isRear()){
+ destVertexId.set(getDestVertexIdFromChain(msg.getChainVertexId(), msg.getAdjMap()));
+ }else{
+ destVertexId.set(msg.getHeadVertexId());
+ }
+ msg.set(vertexId, msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, msg.isRear());
+ sendMsg(destVertexId,msg);
+ }
+ /**
+ * path node sends message back to head node
+ */
+ public void responseMsgToHeadVertex(){
+ if(!msg.isRear()){
+ findDestination();
+ if(GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
+ chainVertexId = kmerFactory.mergeKmerWithNextCode(msg.getChainVertexId(),
+ vertexId.getGeneCodeAtPosition(kmerSize - 1));
+ deleteVertex(getVertexId());
+ //vertexVal.setState(State.NON_EXIST);
+ //setVertexValue(vertexVal);
+ msg.set(vertexId, chainVertexId, msg.getHeadVertexId(), vertexVal.getAdjMap(), false);
+ sendMsg(destVertexId,msg);
+ }
+ else if(GraphVertexOperation.isRearVertex(vertexVal.getAdjMap())){
+ msg.set(vertexId, msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, true);
+ sendMsg(destVertexId,msg);
+ }
+ }else{// is Rear
+ chainVertexId.set(msg.getSourceVertexId());
+ vertexVal.set(GraphVertexOperation.updateRightNeighberByVertexId(vertexVal.getAdjMap(), chainVertexId, kmerSize),
+ State.START_VERTEX, msg.getChainVertexId());
+ setVertexValue(vertexVal);
+ //String source = msg.getChainVertexId().toString();
+ //System.out.print("");
+ }
+ }
+
+ @Override
+ public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ initVertex();
+ if(vertexVal.getState() != State.NON_EXIST){
+ if (getSuperstep() == 1) {
+ if(GraphVertexOperation.isHeadVertex(vertexVal.getAdjMap())){
+ msg.set(vertexId, chainVertexId, vertexId, (byte)0, false);
+ sendMsgToAllNextNodes(vertexId, vertexVal.getAdjMap());
+ }
+ }
+ else if(getSuperstep() == 2){
+ if(msgIterator.hasNext()){
+ msg = msgIterator.next();
+ initChainVertex();
+ }
+ }
+ //head node sends message to path node
+ else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
+ while (msgIterator.hasNext()){
+ msg = msgIterator.next();
+ sendMsgToPathVertex();
+ }
+ }
+ //path node sends message back to head node
+ else if(getSuperstep()%2 == 0 && getSuperstep() > 2 && getSuperstep() <= maxIteration){
+ while(msgIterator.hasNext()){
+ msg = msgIterator.next();
+ responseMsgToHeadVertex();
+ }
+ }
+ }
+ voteToHalt();
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(NaiveAlgorithmForPathMergeVertex.class.getSimpleName());
+ job.setVertexClass(NaiveAlgorithmForPathMergeVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
index d9f0efe..c7349dd 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
@@ -5,14 +5,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import edu.uci.ics.genomix.type.old.Kmer;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerCountValue;
-
public class CombineSequenceFile {
/**
@@ -25,24 +23,23 @@
Configuration conf = new Configuration();
FileSystem fileSys = FileSystem.get(conf);
- Path p = new Path("data/SinglePath_55");
- Path p2 = new Path("data/result");
- Path outFile = new Path(p2, "output");
+ Path p = new Path("output");
+ //Path p2 = new Path("data/result");
+ Path outFile = new Path("output");
SequenceFile.Reader reader;
SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
- outFile, BytesWritable.class, KmerCountValue.class,
+ outFile, KmerBytesWritable.class, KmerCountValue.class,
CompressionType.NONE);
- BytesWritable key = new BytesWritable();
+ KmerBytesWritable key = new KmerBytesWritable(kmerSize);
KmerCountValue value = new KmerCountValue();
- File dir = new File("data/SinglePath_55");
+ File dir = new File("output");
for(File child : dir.listFiles()){
String name = child.getAbsolutePath();
Path inFile = new Path(p, name);
reader = new SequenceFile.Reader(fileSys, inFile, conf);
while (reader.next(key, value)) {
- System.out.println(Kmer.recoverKmerFrom(kmerSize, key.getBytes(), 0,
- key.getLength())
+ System.out.println(key.toString()
+ "\t" + value.toString());
writer.append(key, value);
}
@@ -53,8 +50,7 @@
reader = new SequenceFile.Reader(fileSys, outFile, conf);
while (reader.next(key, value)) {
- System.err.println(Kmer.recoverKmerFrom(kmerSize, key.getBytes(), 0,
- key.getLength())
+ System.err.println(key.toString()
+ "\t" + value.toString());
}
reader.close();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
new file mode 100644
index 0000000..d97a2fd
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
@@ -0,0 +1,48 @@
+package edu.uci.ics.genomix.pregelix.sequencefile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerCountValue;
+
+public class GenerateSmallFile {
+
+ public static void generateNumOfLinesFromBigFile(Path inFile, Path outFile, int numOfLines) throws IOException{
+ Configuration conf = new Configuration();
+ FileSystem fileSys = FileSystem.get(conf);
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ outFile, KmerBytesWritable.class, KmerCountValue.class,
+ CompressionType.NONE);
+ KmerBytesWritable outKey = new KmerBytesWritable(55);
+ KmerCountValue outValue = new KmerCountValue();
+ int i = 0;
+
+ for(i = 0; i < numOfLines; i++){
+ //System.out.println(i);
+ reader.next(outKey, outValue);
+ writer.append(outKey, outValue);
+ }
+ writer.close();
+ reader.close();
+ }
+ /**
+ * @param args
+ * @throws IOException
+ */
+ public static void main(String[] args) throws IOException {
+ // TODO Auto-generated method stub
+ Path dir = new Path("data");
+ Path inFile = new Path(dir, "part-0");
+ Path outFile = new Path(dir, "part-0-out-5000000");
+ generateNumOfLinesFromBigFile(inFile,outFile,5000000);
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
index 18214a8..c759261 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
@@ -7,30 +7,28 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.type.old.Kmer;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
public class GenerateTextFile {
public static void generate() throws IOException{
- BufferedWriter bw = new BufferedWriter(new FileWriter("text/new_SimplePath"));
+ BufferedWriter bw = new BufferedWriter(new FileWriter("text/log_TreePath"));
Configuration conf = new Configuration();
FileSystem fileSys = FileSystem.get(conf);
for(int i = 0; i < 2; i++){
Path path = new Path("output/part-" + i);
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
- BytesWritable key = new BytesWritable();
+ KmerBytesWritable key = new KmerBytesWritable(5);
ValueStateWritable value = new ValueStateWritable();
while(reader.next(key, value)){
if (key == null || value == null){
break;
}
- bw.write(Kmer.recoverKmerFrom(5, key.getBytes(), 0,
- key.getLength())
+ bw.write(key.toString()
+ "\t" + value.toString());
bw.newLine();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
index 61bcb0c..ae3c621 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
@@ -9,6 +9,7 @@
public static final int FINAL_VERTEX = 5;
public static final int FINAL_DELETE = 6;
public static final int KILL_SELF = 7;
+ public static final int NON_EXIST = 8;
public final static class STATE_CONTENT{
@@ -39,6 +40,9 @@
case KILL_SELF:
r = "KILL_SELF";
break;
+ case NON_EXIST:
+ r = "NON_EXIST";
+ break;
}
return r;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/GraphVertexOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/GraphVertexOperation.java
similarity index 64%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/GraphVertexOperation.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/GraphVertexOperation.java
index 553de45..5cb0c2b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/GraphVertexOperation.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/GraphVertexOperation.java
@@ -1,16 +1,14 @@
-package edu.uci.ics.genomix.pregelix;
+package edu.uci.ics.genomix.pregelix.util;
-import org.apache.hadoop.io.BytesWritable;
-
-import edu.uci.ics.genomix.type.old.Kmer;
-import edu.uci.ics.genomix.type.old.KmerUtil;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class GraphVertexOperation {
/**
* generate the valid data(byte[]) from BytesWritable
*/
- public static byte[] generateValidDataFromBytesWritable(BytesWritable bw){
+ public static byte[] generateValidDataFromBytesWritable(VKmerBytesWritable bw){
byte[] wholeBytes = bw.getBytes();
int validNum = bw.getLength();
byte[] validBytes = new byte[validNum];
@@ -23,7 +21,7 @@
* @param vertexValue
*/
public static boolean isPathVertex(byte value){
- if(KmerUtil.inDegree(value) == 1 && KmerUtil.outDegree(value) == 1)
+ if(GeneCode.inDegree(value) == 1 && GeneCode.outDegree(value) == 1)
return true;
return false;
}
@@ -32,7 +30,7 @@
* @param vertexValue
*/
public static boolean isHeadVertex(byte value){
- if(KmerUtil.outDegree(value) > 0 && !isPathVertex(value))
+ if(GeneCode.outDegree(value) > 0 && !isPathVertex(value))
return true;
return false;
}
@@ -41,18 +39,17 @@
* @param vertexValue
*/
public static boolean isRearVertex(byte value){
- if(KmerUtil.inDegree(value) > 0 && !isPathVertex(value))
+ if(GeneCode.inDegree(value) > 0 && !isPathVertex(value))
return true;
return false;
}
/**
* update right neighber based on next vertexId
*/
- public static byte updateRightNeighberByVertexId(byte oldVertexValue, byte[] neighberVertexId, int k){
+ public static byte updateRightNeighberByVertexId(byte oldVertexValue, VKmerBytesWritable neighberVertex, int k){
+ byte geneCode = neighberVertex.getGeneCodeAtPosition(k-1);
- String neighberVertex = Kmer.recoverKmerFrom(k, neighberVertexId, 0, neighberVertexId.length);
-
- byte newBit = Kmer.GENE_CODE.getAdjBit((byte)neighberVertex.charAt(neighberVertex.length() - 1));
+ byte newBit = GeneCode.getAdjBit(geneCode);
return (byte) ((byte)(oldVertexValue & 0xF0) | (byte) (newBit & 0x0F));
}
/**
@@ -61,4 +58,5 @@
public static byte updateRightNeighber(byte oldVertexValue, byte newVertexValue){
return (byte) ((byte)(oldVertexValue & 0xF0) | (byte) (newVertexValue & 0x0F));
}
+
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 763d46e..f03a4ab 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -5,19 +5,18 @@
import java.io.IOException;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import edu.uci.ics.genomix.pregelix.LoadGraphVertex;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphOutputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForMergeGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForMergeGraphOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.LogAlgorithmForMergeGraphVertex;
-import edu.uci.ics.genomix.pregelix.MergeGraphVertex;
+import edu.uci.ics.genomix.pregelix.operator.LoadGraphVertex;
+import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -30,10 +29,11 @@
private static void generateLoadGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(LoadGraphVertex.class);
- job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
- job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(ByteWritable.class);
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -45,15 +45,15 @@
private static void generateMergeGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(MergeGraphVertex.class);
- job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
- job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
+ job.setVertexClass(NaiveAlgorithmForPathMergeVertex.class);
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
- job.getConfiguration().setInt(MergeGraphVertex.KMER_SIZE, 55);
+ job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 5);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -63,15 +63,15 @@
private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(LogAlgorithmForMergeGraphVertex.class);
- job.setVertexInputFormatClass(LogAlgorithmForMergeGraphInputFormat.class);
- job.setVertexOutputFormatClass(LogAlgorithmForMergeGraphOutputFormat.class);
+ job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+ job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
- job.getConfiguration().setInt(LogAlgorithmForMergeGraphVertex.KMER_SIZE, 5);
+ job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 5);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -88,8 +88,6 @@
//genLoadGraph();
//genMergeGraph();
genLogAlgorithmForMergeGraph();
- //genSequenceLoadGraph();
- //genBasicBinaryLoadGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
index ffc1a25..fd0749a 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
@@ -40,7 +40,7 @@
private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
private static final String FILE_EXTENSION_OF_RESULTS = "result";
- private static final String DATA_PATH = "data/result/TreePath";// sequenceShortFileMergeTest
+ private static final String DATA_PATH = "data/sequencefile/LongPath";
private static final String HDFS_PATH = "/webmap/";
private static final String HYRACKS_APP_NAME = "pregelix";
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/example/util/TestUtils.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/example/util/TestUtils.java
index 4ea3c1d..8ac1b09 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/example/util/TestUtils.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/example/util/TestUtils.java
@@ -20,6 +20,13 @@
public class TestUtils {
+ public static void compareWithResultDir(File expectedFileDir, File actualFileDir) throws Exception {
+ String[] fileNames = expectedFileDir.list();
+ for (String fileName : fileNames) {
+ compareWithResult(new File(expectedFileDir, fileName), new File(actualFileDir, fileName));
+ }
+ }
+
public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
@@ -28,7 +35,6 @@
try {
while ((lineExpected = readerExpected.readLine()) != null) {
lineActual = readerActual.readLine();
- // Assert.assertEquals(lineExpected, lineActual);
if (lineActual == null) {
throw new Exception("Actual result changed at line " + num + ":\n< " + lineExpected + "\n> ");
}
@@ -62,8 +68,10 @@
if (row1.equals(row2))
continue;
- String[] fields1 = row1.split(" ");
- String[] fields2 = row2.split(" ");
+ boolean spaceOrTab = false;
+ spaceOrTab = row1.contains(" ");
+ String[] fields1 = spaceOrTab ? row1.split(" ") : row1.split("\t");
+ String[] fields2 = spaceOrTab ? row2.split(" ") : row2.split("\t");
for (int j = 0; j < fields1.length; j++) {
if (fields1[j].equals(fields2[j])) {
@@ -76,7 +84,7 @@
float float1 = (float) double1.doubleValue();
float float2 = (float) double2.doubleValue();
- if (Math.abs(float1 - float2) == 0)
+ if (Math.abs(float1 - float2) < 1.0e-7)
continue;
else {
return false;