finished framework
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
deleted file mode 100644
index 7aa05f5..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.type.old;
-
-@Deprecated
-public class Kmer {
-
- public final static byte[] GENE_SYMBOL = { 'A', 'C', 'G', 'T' };
-
- public final static class GENE_CODE {
-
- /**
- * make sure this 4 ids equal to the sequence id of char in {@GENE_SYMBOL}
- */
- public static final byte A = 0;
- public static final byte C = 1;
- public static final byte G = 2;
- public static final byte T = 3;
-
- public static byte getCodeFromSymbol(byte ch) {
- byte r = 0;
- switch (ch) {
- case 'A':
- case 'a':
- r = A;
- break;
- case 'C':
- case 'c':
- r = C;
- break;
- case 'G':
- case 'g':
- r = G;
- break;
- case 'T':
- case 't':
- r = T;
- break;
- }
- return r;
- }
-
- public static byte getSymbolFromCode(byte code) {
- if (code > 3) {
- return '!';
- }
- return GENE_SYMBOL[code];
- }
-
- public static byte getAdjBit(byte t) {
- byte r = 0;
- switch (t) {
- case 'A':
- case 'a':
- r = 1 << A;
- break;
- case 'C':
- case 'c':
- r = 1 << C;
- break;
- case 'G':
- case 'g':
- r = 1 << G;
- break;
- case 'T':
- case 't':
- r = 1 << T;
- break;
- }
- return r;
- }
-
- /**
- * It works for path merge.
- * Merge the kmer by his next, we need to make sure the @{t} is a single neighbor.
- *
- * @param t
- * the neighbor code in BitMap
- * @return the genecode
- */
- public static byte getGeneCodeFromBitMap(byte t) {
- switch (t) {
- case 1 << A:
- return A;
- case 1 << C:
- return C;
- case 1 << G:
- return G;
- case 1 << T:
- return T;
- }
- return -1;
- }
-
- public static byte mergePreNextAdj(byte pre, byte next) {
- return (byte) (pre << 4 | (next & 0x0f));
- }
-
- public static String getSymbolFromBitMap(byte code) {
- int left = (code >> 4) & 0x0F;
- int right = code & 0x0F;
- StringBuilder str = new StringBuilder();
- for (int i = A; i <= T; i++) {
- if ((left & (1 << i)) != 0) {
- str.append((char) GENE_SYMBOL[i]);
- }
- }
- str.append('|');
- for (int i = A; i <= T; i++) {
- if ((right & (1 << i)) != 0) {
- str.append((char) GENE_SYMBOL[i]);
- }
- }
- return str.toString();
- }
- }
-
- public static String recoverKmerFrom(int k, byte[] keyData, int keyStart, int keyLength) {
- StringBuilder strKmer = new StringBuilder();
- int byteId = keyStart + keyLength - 1;
- byte currentbyte = keyData[byteId];
- for (int geneCount = 0; geneCount < k; geneCount++) {
- if (geneCount % 4 == 0 && geneCount > 0) {
- currentbyte = keyData[--byteId];
- }
- strKmer.append((char) GENE_SYMBOL[(currentbyte >> ((geneCount % 4) * 2)) & 0x03]);
- }
- return strKmer.toString();
- }
-
- public static int getByteNumFromK(int k) {
- int x = k / 4;
- if (k % 4 != 0) {
- x += 1;
- }
- return x;
- }
-
- /**
- * Compress Kmer into bytes array AATAG will compress as [0x000G, 0xATAA]
- *
- * @param kmer
- * @param input
- * array
- * @param start
- * position
- * @return initialed kmer array
- */
- public static byte[] compressKmer(int k, byte[] array, int start) {
- final int byteNum = getByteNumFromK(k);
- byte[] bytes = new byte[byteNum];
-
- byte l = 0;
- int bytecount = 0;
- int bcount = byteNum - 1;
- for (int i = start; i < start + k; i++) {
- byte code = GENE_CODE.getCodeFromSymbol(array[i]);
- l |= (byte) (code << bytecount);
- bytecount += 2;
- if (bytecount == 8) {
- bytes[bcount--] = l;
- l = 0;
- bytecount = 0;
- }
- }
- if (bcount >= 0) {
- bytes[0] = l;
- }
- return bytes;
- }
-
- /**
- * Shift Kmer to accept new input
- *
- * @param kmer
- * @param bytes
- * Kmer Array
- * @param c
- * Input new gene character
- * @return the shiftout gene, in gene code format
- */
- public static byte moveKmer(int k, byte[] kmer, byte c) {
- int byteNum = kmer.length;
- byte output = (byte) (kmer[byteNum - 1] & 0x03);
- for (int i = byteNum - 1; i > 0; i--) {
- byte in = (byte) (kmer[i - 1] & 0x03);
- kmer[i] = (byte) (((kmer[i] >>> 2) & 0x3f) | (in << 6));
- }
- int pos = ((k - 1) % 4) << 1;
- byte code = (byte) (GENE_CODE.getCodeFromSymbol(c) << pos);
- kmer[0] = (byte) (((kmer[0] >>> 2) & 0x3f) | code);
- return (byte) (1 << output);
- }
-
- public static byte reverseKmerByte(byte k) {
- int x = (((k >> 2) & 0x33) | ((k << 2) & 0xcc));
- return (byte) (((x >> 4) & 0x0f) | ((x << 4) & 0xf0));
- }
-
- public static byte[] reverseKmer(int k, byte[] kmer) {
- byte[] reverseKmer = new byte[kmer.length];
-
- int curPosAtKmer = ((k - 1) % 4) << 1;
- int curByteAtKmer = 0;
-
- int curPosAtReverse = 0;
- int curByteAtReverse = reverseKmer.length - 1;
- reverseKmer[curByteAtReverse] = 0;
- for (int i = 0; i < k; i++) {
- byte gene = (byte) ((kmer[curByteAtKmer] >> curPosAtKmer) & 0x03);
- reverseKmer[curByteAtReverse] |= gene << curPosAtReverse;
- curPosAtReverse += 2;
- if (curPosAtReverse >= 8) {
- curPosAtReverse = 0;
- reverseKmer[--curByteAtReverse] = 0;
- }
- curPosAtKmer -= 2;
- if (curPosAtKmer < 0) {
- curPosAtKmer = 6;
- curByteAtKmer++;
- }
- }
-
- return reverseKmer;
- }
-
- /**
- * Compress Reversed Kmer into bytes array AATAG will compress as
- * [0x000A,0xATAG]
- *
- * @param kmer
- * @param input
- * array
- * @param start
- * position
- * @return initialed kmer array
- */
- public static byte[] compressKmerReverse(int k, byte[] array, int start) {
- final int byteNum = getByteNumFromK(k);
- byte[] bytes = new byte[byteNum];
-
- byte l = 0;
- int bytecount = 0;
- int bcount = byteNum - 1;
- for (int i = start + k - 1; i >= 0; i--) {
- byte code = GENE_CODE.getCodeFromSymbol(array[i]);
- l |= (byte) (code << bytecount);
- bytecount += 2;
- if (bytecount == 8) {
- bytes[bcount--] = l;
- l = 0;
- bytecount = 0;
- }
- }
- if (bcount >= 0) {
- bytes[0] = l;
- }
- return bytes;
- }
-
- /**
- * Shift Kmer to accept new input
- *
- * @param kmer
- * @param bytes
- * Kmer Array
- * @param c
- * Input new gene character
- * @return the shiftout gene, in gene code format
- */
- public static byte moveKmerReverse(int k, byte[] kmer, byte c) {
- int pos = ((k - 1) % 4) << 1;
- byte output = (byte) ((kmer[0] >> pos) & 0x03);
- for (int i = 0; i < kmer.length - 1; i++) {
- byte in = (byte) ((kmer[i + 1] >> 6) & 0x03);
- kmer[i] = (byte) ((kmer[i] << 2) | in);
- }
- // (k%4) * 2
- if (k % 4 != 0) {
- kmer[0] &= (1 << ((k % 4) << 1)) - 1;
- }
- kmer[kmer.length - 1] = (byte) ((kmer[kmer.length - 1] << 2) | GENE_CODE.getCodeFromSymbol(c));
- return (byte) (1 << output);
- }
-
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
deleted file mode 100644
index 7a96d2f..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
+++ /dev/null
@@ -1,136 +0,0 @@
-package edu.uci.ics.genomix.type.old;
-
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
-import org.apache.hadoop.io.BinaryComparable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-
-@Deprecated
-public class KmerBytesWritable extends BinaryComparable implements WritableComparable<BinaryComparable> {
- private static final int LENGTH_BYTES = 4;
- private static final byte[] EMPTY_BYTES = {};
- private byte size;
- private byte[] bytes;
-
- public KmerBytesWritable() {
- this(EMPTY_BYTES);
- }
-
- public KmerBytesWritable(byte[] bytes) {
- this.bytes = bytes;
- this.size = (byte) bytes.length;
- }
-
- @Override
- public byte[] getBytes() {
- return bytes;
- }
-
- @Deprecated
- public byte[] get() {
- return getBytes();
- }
-
- @Override
- public int getLength() {
- return (int) size;
- }
-
- @Deprecated
- public int getSize() {
- return getLength();
- }
-
- public void setSize(byte size) {
- if ((int) size > getCapacity()) {
- setCapacity((byte) (size * 3 / 2));
- }
- this.size = size;
- }
-
- public int getCapacity() {
- return bytes.length;
- }
-
- public void setCapacity(byte new_cap) {
- if (new_cap != getCapacity()) {
- byte[] new_data = new byte[new_cap];
- if (new_cap < size) {
- size = new_cap;
- }
- if (size != 0) {
- System.arraycopy(bytes, 0, new_data, 0, size);
- }
- bytes = new_data;
- }
- }
-
- public void set(KmerBytesWritable newData) {
- set(newData.bytes, (byte) 0, newData.size);
- }
-
- public void set(byte[] newData, byte offset, byte length) {
- setSize((byte) 0);
- setSize(length);
- System.arraycopy(newData, offset, bytes, 0, size);
- }
-
- public void readFields(DataInput in) throws IOException {
- setSize((byte) 0); // clear the old data
- setSize(in.readByte());
- in.readFully(bytes, 0, size);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeByte(size);
- out.write(bytes, 0, size);
- }
-
- @Override
- public int hashCode() {
- return super.hashCode();
- }
-
- @Override
- public boolean equals(Object right_obj) {
- if (right_obj instanceof KmerBytesWritable)
- return super.equals(right_obj);
- return false;
- }
-
- @Override
- public String toString() {
- StringBuffer sb = new StringBuffer(3 * size);
- for (int idx = 0; idx < (int) size; idx++) {
- // if not the first, put a blank separator in
- if (idx != 0) {
- sb.append(' ');
- }
- String num = Integer.toHexString(0xff & bytes[idx]);
- // if it is only one digit, add a leading 0.
- if (num.length() < 2) {
- sb.append('0');
- }
- sb.append(num);
- }
- return sb.toString();
- }
-
- public static class Comparator extends WritableComparator {
- public Comparator() {
- super(KmerBytesWritable.class);
- }
-
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
- }
- }
-
- static { // register this comparator
- WritableComparator.define(KmerBytesWritable.class, new Comparator());
- }
-
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
deleted file mode 100644
index a4b1bec..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
+++ /dev/null
@@ -1,241 +0,0 @@
-package edu.uci.ics.genomix.type.old;
-
-import java.util.Arrays;
-
-@Deprecated
-public class KmerUtil {
-
- public static int countNumberOfBitSet(int i) {
- int c = 0;
- for (; i != 0; c++) {
- i &= i - 1;
- }
- return c;
- }
-
- public static int inDegree(byte bitmap) {
- return countNumberOfBitSet((bitmap >> 4) & 0x0f);
- }
-
- public static int outDegree(byte bitmap) {
- return countNumberOfBitSet(bitmap & 0x0f);
- }
-
- /**
- * Get last kmer from kmer-chain.
- * e.g. kmerChain is AAGCTA, if k =5, it will
- * return AGCTA
- *
- * @param k
- * @param kInChain
- * @param kmerChain
- * @return LastKmer bytes array
- */
- public static byte[] getLastKmerFromChain(int k, int kInChain, byte[] kmerChain, int offset, int length) {
- if (k > kInChain) {
- return null;
- }
- if (k == kInChain) {
- return kmerChain.clone();
- }
- int byteNum = Kmer.getByteNumFromK(k);
- byte[] kmer = new byte[byteNum];
-
- /** from end to start */
- int byteInChain = length - 1 - (kInChain - k) / 4;
- int posInByteOfChain = ((kInChain - k) % 4) << 1; // *2
- int byteInKmer = byteNum - 1;
- for (; byteInKmer >= 0 && byteInChain > 0; byteInKmer--, byteInChain--) {
- kmer[byteInKmer] = (byte) ((0xff & kmerChain[offset + byteInChain]) >> posInByteOfChain);
- kmer[byteInKmer] |= ((kmerChain[offset + byteInChain - 1] << (8 - posInByteOfChain)));
- }
-
- /** last kmer byte */
- if (byteInKmer == 0) {
- kmer[0] = (byte) ((kmerChain[offset] & 0xff) >> posInByteOfChain);
- }
- return kmer;
- }
-
- /**
- * Get first kmer from kmer-chain e.g. kmerChain is AAGCTA, if k=5, it will
- * return AAGCT
- *
- * @param k
- * @param kInChain
- * @param kmerChain
- * @return FirstKmer bytes array
- */
- public static byte[] getFirstKmerFromChain(int k, int kInChain, byte[] kmerChain, int offset, int length) {
- if (k > kInChain) {
- return null;
- }
- if (k == kInChain) {
- return kmerChain.clone();
- }
- int byteNum = Kmer.getByteNumFromK(k);
- byte[] kmer = new byte[byteNum];
-
- int i = 1;
- for (; i < kmer.length; i++) {
- kmer[kmer.length - i] = kmerChain[offset + length - i];
- }
- int posInByteOfChain = (k % 4) << 1; // *2
- if (posInByteOfChain == 0) {
- kmer[0] = kmerChain[offset + length - i];
- } else {
- kmer[0] = (byte) (kmerChain[offset + length - i] & ((1 << posInByteOfChain) - 1));
- }
- return kmer;
- }
-
- /**
- * Merge kmer with next neighbor in gene-code format.
- * The k of new kmer will increase by 1
- * e.g. AAGCT merge with A => AAGCTA
- *
- * @param k
- * :input k of kmer
- * @param kmer
- * : input bytes of kmer
- * @param nextCode
- * : next neighbor in gene-code format
- * @return the merged Kmer, this K of this Kmer is k+1
- */
- public static byte[] mergeKmerWithNextCode(int k, byte[] kmer, int offset, int length, byte nextCode) {
- int byteNum = length;
- if (k % 4 == 0) {
- byteNum++;
- }
- byte[] mergedKmer = new byte[byteNum];
- for (int i = 1; i <= length; i++) {
- mergedKmer[mergedKmer.length - i] = kmer[offset + length - i];
- }
- if (mergedKmer.length > length) {
- mergedKmer[0] = (byte) (nextCode & 0x3);
- } else {
- mergedKmer[0] = (byte) (kmer[offset] | ((nextCode & 0x3) << ((k % 4) << 1)));
- }
- return mergedKmer;
- }
-
- /**
- * Merge kmer with previous neighbor in gene-code format.
- * The k of new kmer will increase by 1
- * e.g. AAGCT merge with A => AAAGCT
- *
- * @param k
- * :input k of kmer
- * @param kmer
- * : input bytes of kmer
- * @param preCode
- * : next neighbor in gene-code format
- * @return the merged Kmer,this K of this Kmer is k+1
- */
- public static byte[] mergeKmerWithPreCode(int k, byte[] kmer, int offset, int length, byte preCode) {
- int byteNum = length;
- byte[] mergedKmer = null;
- int byteInMergedKmer = 0;
- if (k % 4 == 0) {
- byteNum++;
- mergedKmer = new byte[byteNum];
- mergedKmer[0] = (byte) ((kmer[offset] >> 6) & 0x3);
- byteInMergedKmer++;
- } else {
- mergedKmer = new byte[byteNum];
- }
- for (int i = 0; i < length - 1; i++, byteInMergedKmer++) {
- mergedKmer[byteInMergedKmer] = (byte) ((kmer[offset + i] << 2) | ((kmer[offset + i + 1] >> 6) & 0x3));
- }
- mergedKmer[byteInMergedKmer] = (byte) ((kmer[offset + length - 1] << 2) | (preCode & 0x3));
- return mergedKmer;
- }
-
- /**
- * Merge two kmer to one kmer
- * e.g. ACTA + ACCGT => ACTAACCGT
- *
- * @param preK
- * : previous k of kmer
- * @param kmerPre
- * : bytes array of previous kmer
- * @param nextK
- * : next k of kmer
- * @param kmerNext
- * : bytes array of next kmer
- * @return merged kmer, the new k is @preK + @nextK
- */
- public static byte[] mergeTwoKmer(int preK, byte[] kmerPre, int offsetPre, int lengthPre, int nextK,
- byte[] kmerNext, int offsetNext, int lengthNext) {
- int byteNum = Kmer.getByteNumFromK(preK + nextK);
- byte[] mergedKmer = new byte[byteNum];
- int i = 1;
- for (; i <= lengthPre; i++) {
- mergedKmer[byteNum - i] = kmerPre[offsetPre + lengthPre - i];
- }
- if (i > 1) {
- i--;
- }
- if (preK % 4 == 0) {
- for (int j = 1; j <= lengthNext; j++) {
- mergedKmer[byteNum - i - j] = kmerNext[offsetNext + lengthNext - j];
- }
- } else {
- int posNeedToMove = ((preK % 4) << 1);
- mergedKmer[byteNum - i] |= kmerNext[offsetNext + lengthNext - 1] << posNeedToMove;
- for (int j = 1; j < lengthNext; j++) {
- mergedKmer[byteNum - i - j] = (byte) (((kmerNext[offsetNext + lengthNext - j] & 0xff) >> (8 - posNeedToMove)) | (kmerNext[offsetNext
- + lengthNext - j - 1] << posNeedToMove));
- }
- if ((nextK % 4) * 2 + posNeedToMove > 8) {
- mergedKmer[0] = (byte) (kmerNext[offsetNext] >> (8 - posNeedToMove));
- }
- }
- return mergedKmer;
- }
-
- /**
- * Safely shifted the kmer forward without change the input kmer
- * e.g. AGCGC shift with T => GCGCT
- *
- * @param k
- * : kmer length
- * @param kmer
- * : input kmer
- * @param afterCode
- * : input genecode
- * @return new created kmer that shifted by afterCode, the K will not change
- */
- public static byte[] shiftKmerWithNextCode(int k, final byte[] kmer, int offset, int length, byte afterCode) {
- byte[] shifted = Arrays.copyOfRange(kmer, offset, offset + length);
- Kmer.moveKmer(k, shifted, Kmer.GENE_CODE.getSymbolFromCode(afterCode));
- return shifted;
- }
-
- /**
- * Safely shifted the kmer backward without change the input kmer
- * e.g. AGCGC shift with T => TAGCG
- *
- * @param k
- * : kmer length
- * @param kmer
- * : input kmer
- * @param preCode
- * : input genecode
- * @return new created kmer that shifted by preCode, the K will not change
- */
- public static byte[] shiftKmerWithPreCode(int k, final byte[] kmer, int offset, int length, byte preCode) {
- byte[] shifted = Arrays.copyOfRange(kmer, offset, offset + length);
- Kmer.moveKmerReverse(k, shifted, Kmer.GENE_CODE.getSymbolFromCode(preCode));
- return shifted;
- }
-
- public static byte getGeneCodeAtPosition(int pos, int k, final byte[] kmer, int offset, int length) {
- if (pos >= k) {
- return -1;
- }
- int posByte = pos / 4;
- int shift = (pos % 4) << 1;
- return (byte) ((kmer[offset + length - 1 - posByte] >> shift) & 0x3);
- }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
deleted file mode 100644
index ebf1282..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.data.std.accessors;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class ByteSerializerDeserializer implements ISerializerDeserializer<Byte> {
-
- private static final long serialVersionUID = 1L;
-
- public static final ByteSerializerDeserializer INSTANCE = new ByteSerializerDeserializer();
-
- private ByteSerializerDeserializer() {
- }
-
- @Override
- public Byte deserialize(DataInput in) throws HyracksDataException {
- try {
- return in.readByte();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void serialize(Byte instance, DataOutput out) throws HyracksDataException {
- try {
- out.writeByte(instance.intValue());
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- public static byte getByte(byte[] bytes, int offset) {
- return bytes[offset];
- }
-
- public static void putByte(byte val, byte[] bytes, int offset) {
- bytes[offset] = val;
- }
-
-}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
deleted file mode 100644
index 34c29c7..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.data.std.accessors;
-
-import edu.uci.ics.genomix.data.std.primitive.KmerPointable;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-
-public class KmerBinaryHashFunctionFamily implements IBinaryHashFunctionFamily {
- private static final long serialVersionUID = 1L;
-
- @Override
- public IBinaryHashFunction createBinaryHashFunction(final int seed) {
-
- return new IBinaryHashFunction() {
- private KmerPointable p = new KmerPointable();
-
- @Override
- public int hash(byte[] bytes, int offset, int length) {
- if (length + offset >= bytes.length)
- throw new IllegalStateException("out of bound");
- p.set(bytes, offset, length);
- int hash = p.hash() * (seed + 1);
- if (hash < 0) {
- hash = -(hash + 1);
- }
- return hash;
- }
- };
- }
-}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
deleted file mode 100644
index 8aaf380..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.data.std.accessors;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-
-public class KmerHashPartitioncomputerFactory implements ITuplePartitionComputerFactory {
-
- private static final long serialVersionUID = 1L;
-
- public static int hashBytes(byte[] bytes, int offset, int length) {
- int hash = 1;
- for (int i = offset; i < offset + length; i++)
- hash = (31 * hash) + (int) bytes[i];
- return hash;
- }
-
- @Override
- public ITuplePartitionComputer createPartitioner() {
- return new ITuplePartitionComputer() {
- @Override
- public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) {
- int startOffset = accessor.getTupleStartOffset(tIndex);
- int fieldOffset = accessor.getFieldStartOffset(tIndex, 0);
- int slotLength = accessor.getFieldSlotsLength();
- int fieldLength = accessor.getFieldLength(tIndex, 0);
-
- ByteBuffer buf = accessor.getBuffer();
-
- int hash = hashBytes(buf.array(), startOffset + fieldOffset + slotLength, fieldLength);
- if (hash < 0) {
- hash = -(hash + 1);
- }
-
- return hash % nParts;
- }
- };
- }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
deleted file mode 100644
index 83b9d12..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.data.std.accessors;
-
-import edu.uci.ics.genomix.data.std.primitive.KmerPointable;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-
-public class KmerNormarlizedComputerFactory implements INormalizedKeyComputerFactory {
- private static final long serialVersionUID = 1L;
-
- @Override
- public INormalizedKeyComputer createNormalizedKeyComputer() {
- return new INormalizedKeyComputer() {
- /**
- * read one int from Kmer, make sure this int is consistent whith Kmer compartor
- */
- @Override
- public int normalize(byte[] bytes, int start, int length) {
- return KmerPointable.getIntReverse(bytes, start, length);
- }
- };
- }
-}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
deleted file mode 100644
index 8febfa5..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.data.std.primitive;
-
-import edu.uci.ics.genomix.data.std.accessors.KmerHashPartitioncomputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.data.std.api.AbstractPointable;
-import edu.uci.ics.hyracks.data.std.api.IComparable;
-import edu.uci.ics.hyracks.data.std.api.IHashable;
-import edu.uci.ics.hyracks.data.std.api.INumeric;
-import edu.uci.ics.hyracks.data.std.api.IPointable;
-import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
-
-public final class KmerPointable extends AbstractPointable implements IHashable, IComparable, INumeric {
- public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean isFixedLength() {
- return false;
- }
-
- @Override
- public int getFixedLength() {
- return -1;
- }
- };
-
- public static final IPointableFactory FACTORY = new IPointableFactory() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public IPointable createPointable() {
- return new KmerPointable();
- }
-
- @Override
- public ITypeTraits getTypeTraits() {
- return TYPE_TRAITS;
- }
- };
-
- public static short getShortReverse(byte[] bytes, int offset, int length) {
- if (length < 2) {
- return (short) (bytes[offset] & 0xff);
- }
- return (short) (((bytes[offset + length - 1] & 0xff) << 8) + (bytes[offset + length - 2] & 0xff));
- }
-
- public static int getIntReverse(byte[] bytes, int offset, int length) {
- int shortValue = getShortReverse(bytes, offset, length) & 0xffff;
-
- if (length < 3) {
- return shortValue;
- }
- if (length == 3) {
- return (((bytes[offset + 2] & 0xff) << 16) + ((bytes[offset + 1] & 0xff) << 8) + ((bytes[offset] & 0xff)));
- }
- return ((bytes[offset + length - 1] & 0xff) << 24) + ((bytes[offset + length - 2] & 0xff) << 16)
- + ((bytes[offset + length - 3] & 0xff) << 8) + ((bytes[offset + length - 4] & 0xff) << 0);
- }
-
- public static long getLongReverse(byte[] bytes, int offset, int length) {
- if (length < 8) {
- return ((long) getIntReverse(bytes, offset, length)) & 0x0ffffffffL;
- }
- return (((long) (bytes[offset + length - 1] & 0xff)) << 56)
- + (((long) (bytes[offset + length - 2] & 0xff)) << 48)
- + (((long) (bytes[offset + length - 3] & 0xff)) << 40)
- + (((long) (bytes[offset + length - 4] & 0xff)) << 32)
- + (((long) (bytes[offset + length - 5] & 0xff)) << 24)
- + (((long) (bytes[offset + length - 6] & 0xff)) << 16)
- + (((long) (bytes[offset + length - 7] & 0xff)) << 8) + (((long) (bytes[offset + length - 8] & 0xff)));
- }
-
- @Override
- public int compareTo(IPointable pointer) {
- return compareTo(pointer.getByteArray(), pointer.getStartOffset(), pointer.getLength());
- }
-
- @Override
- public int compareTo(byte[] bytes, int offset, int length) {
-
- if (this.length != length) {
- return this.length - length;
- }
- for (int i = length - 1; i >= 0; i--) {
- int cmp = (this.bytes[this.start + i] & 0xff) - (bytes[offset + i] & 0xff);
- if (cmp != 0) {
- return cmp;
- }
- }
-
- return 0;
- }
-
- @Override
- public int hash() {
- int hash = KmerHashPartitioncomputerFactory.hashBytes(bytes, start, length);
- return hash;
- }
-
- @Override
- public byte byteValue() {
- return bytes[start + length - 1];
- }
-
- @Override
- public short shortValue() {
- return getShortReverse(bytes, start, length);
- }
-
- @Override
- public int intValue() {
- return getIntReverse(bytes, start, length);
- }
-
- @Override
- public long longValue() {
- return getLongReverse(bytes, start, length);
- }
-
- @Override
- public float floatValue() {
- return Float.intBitsToFloat(intValue());
- }
-
- @Override
- public double doubleValue() {
- return Double.longBitsToDouble(longValue());
- }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
deleted file mode 100644
index ed1b926..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.dataflow;
-
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
-import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-
-/**
- * used by precluster groupby
- */
-public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
- private static final long serialVersionUID = 1L;
- private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
- private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
-
- @Override
- public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
- int[] fanouts) {
- if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
- return senderSideMaterializePolicy;
- } else {
- return pipeliningPolicy;
- }
- }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
deleted file mode 100644
index 405e109..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.dataflow;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.mapred.JobConf;
-
-import edu.uci.ics.genomix.job.GenomixJob;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
-
-@SuppressWarnings("deprecation")
-public class KMerSequenceWriterFactory implements ITupleWriterFactory {
-
- private static final long serialVersionUID = 1L;
- private ConfFactory confFactory;
- private final int kmerlength;
-
- public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
- this.confFactory = new ConfFactory(conf);
- this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
- }
-
- public class TupleWriter implements ITupleWriter {
- public TupleWriter(ConfFactory cf) {
- this.cf = cf;
- }
-
- ConfFactory cf;
- Writer writer = null;
-
- KmerCountValue reEnterCount = new KmerCountValue();
- KmerBytesWritable reEnterKey = new KmerBytesWritable(kmerlength);
-
- /**
- * assumption is that output never change source!
- */
- @Override
- public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
- try {
- byte[] kmer = tuple.getFieldData(0);
- int keyStart = tuple.getFieldStart(0);
- int keyLength = tuple.getFieldLength(0);
-
- byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
- byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
- reEnterCount.set(bitmap, count);
- reEnterKey.set(kmer, keyStart, keyLength);
- writer.append(reEnterKey, reEnterCount);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void open(DataOutput output) throws HyracksDataException {
- try {
- writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, KmerBytesWritable.class,
- KmerCountValue.class, CompressionType.NONE, null);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void close(DataOutput output) throws HyracksDataException {
- // TODO Auto-generated method stub
- }
- }
-
- @Override
- public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
- return new TupleWriter(confFactory);
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
deleted file mode 100644
index cfa7262..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.dataflow;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-
-public class KMerTextWriterFactory implements ITupleWriterFactory {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- private KmerBytesWritable kmer;
-
- public KMerTextWriterFactory(int k) {
- kmer = new KmerBytesWritable(k);
- }
-
- public class TupleWriter implements ITupleWriter {
- @Override
- public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
- try {
- kmer.set(tuple.getFieldData(0), tuple.getFieldStart(0), tuple.getFieldLength(0));
- output.write(kmer.toString().getBytes());
- output.writeByte('\t');
- output.write(GeneCode.getSymbolFromBitMap(tuple.getFieldData(1)[tuple.getFieldStart(1)]).getBytes());
- output.writeByte('\t');
- output.write(String.valueOf((int) tuple.getFieldData(2)[tuple.getFieldStart(2)]).getBytes());
- output.writeByte('\n');
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void open(DataOutput output) throws HyracksDataException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void close(DataOutput output) throws HyracksDataException {
- // TODO Auto-generated method stub
- }
- }
-
- @Override
- public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
- // TODO Auto-generated method stub
- return new TupleWriter();
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
deleted file mode 100644
index 409b434..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.dataflow;
-
-import java.nio.ByteBuffer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
-
-;
-
-public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
- private static final long serialVersionUID = 1L;
-
- private KmerBytesWritable kmer;
- private boolean bReversed;
-
- public ReadsKeyValueParserFactory(int k, boolean bGenerateReversed) {
- bReversed = bGenerateReversed;
- kmer = new KmerBytesWritable(k);
- }
-
- @Override
- public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
- final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
- final ByteBuffer outputBuffer = ctx.allocateFrame();
- final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
- outputAppender.reset(outputBuffer, true);
-
- return new IKeyValueParser<LongWritable, Text>() {
-
- @Override
- public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
- String geneLine = value.toString(); // Read the Real Gene Line
- Pattern genePattern = Pattern.compile("[AGCT]+");
- Matcher geneMatcher = genePattern.matcher(geneLine);
- boolean isValid = geneMatcher.matches();
- if (isValid) {
- SplitReads(geneLine.getBytes(), writer);
- }
- }
-
- private void SplitReads(byte[] array, IFrameWriter writer) {
- /** first kmer */
- int k = kmer.getKmerLength();
- if (k >= array.length){
- return;
- }
- kmer.setByRead(array, 0);
- byte pre = 0;
- byte next = GeneCode.getAdjBit(array[k]);
- InsertToFrame(kmer, pre, next, writer);
-
- /** middle kmer */
- for (int i = k; i < array.length - 1; i++) {
- pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[i]));
- next = GeneCode.getAdjBit(array[i + 1]);
- InsertToFrame(kmer, pre, next, writer);
- }
-
- /** last kmer */
- pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[array.length - 1]));
- next = 0;
- InsertToFrame(kmer, pre, next, writer);
-
- if (bReversed) {
- /** first kmer */
- kmer.setByReadReverse(array, 0);
- next = 0;
- pre = GeneCode.getAdjBit(array[k]);
- InsertToFrame(kmer, pre, next, writer);
- /** middle kmer */
- for (int i = k; i < array.length - 1; i++) {
- next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[i]));
- pre = GeneCode.getAdjBit(array[i + 1]);
- InsertToFrame(kmer, pre, next, writer);
- }
- /** last kmer */
- next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[array.length - 1]));
- pre = 0;
- InsertToFrame(kmer, pre, next, writer);
- }
- }
-
- /**
- * At this graph building phase, we assume the kmer length are all
- * the same Thus we didn't output those Kmer length
- *
- * @param kmer
- * :input kmer
- * @param pre
- * : pre neighbor code
- * @param next
- * : next neighbor code
- * @param writer
- * : output writer
- */
- private void InsertToFrame(KmerBytesWritable kmer, byte pre, byte next, IFrameWriter writer) {
- try {
- byte adj = GeneCode.mergePreNextAdj(pre, next);
- tupleBuilder.reset();
- tupleBuilder.addField(kmer.getBytes(), 0, kmer.getLength());
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, adj);
-
- if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- throw new IllegalStateException(
- "Failed to copy an record into a frame: the record size is too large.");
- }
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- @Override
- public void open(IFrameWriter writer) throws HyracksDataException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void close(IFrameWriter writer) throws HyracksDataException {
- FrameUtils.flushFrame(outputBuffer, writer);
- }
- };
- }
-
-}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
deleted file mode 100644
index ea70fb0..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.dataflow.aggregators;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-
-/**
- * sum
- */
-public class DistributedMergeLmerAggregateFactory implements IAggregatorDescriptorFactory {
- private static final long serialVersionUID = 1L;
-
- public DistributedMergeLmerAggregateFactory() {
- }
-
- public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {
- private static final int MAX = 127;
-
- @Override
- public void reset() {
- }
-
- @Override
- public void close() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public AggregateState createAggregateStates() {
- return new AggregateState(new Object() {
- });
- }
-
- protected byte getField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
- int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
- byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
- return data;
- }
-
- @Override
- public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = getField(accessor, tIndex, 2);
-
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when initializing the aggregator.");
- }
- }
-
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
- int stateTupleIndex, AggregateState state) throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- short count = getField(accessor, tIndex, 2);
-
- int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
- int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
- int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 2);
- int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + bitfieldStart;
- int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
-
- byte[] data = stateAccessor.getBuffer().array();
-
- bitmap |= data[bitoffset];
- count += data[countoffset];
- if (count >= MAX) {
- count = (byte) MAX;
- }
- data[bitoffset] = bitmap;
- data[countoffset] = (byte) count;
- }
-
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = getField(accessor, tIndex, 2);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
- }
-
- }
-
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- outputPartialResult(tupleBuilder, accessor, tIndex, state);
- }
-
- }
-
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
- throws HyracksDataException {
- return new DistributeAggregatorDescriptor();
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
deleted file mode 100644
index 87c0207..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.dataflow.aggregators;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-
-public class LocalAggregatorDescriptor implements IAggregatorDescriptor {
- private static final int MAX = 127;
-
- @Override
- public void reset() {
- }
-
- @Override
- public void close() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public AggregateState createAggregateStates() {
- return new AggregateState(new Object() {
- });
- }
-
- protected byte getField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
- int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
- byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
- return data;
- }
-
- @Override
- public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = 1;
-
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when initializing the aggregator.");
- }
- }
-
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
- int stateTupleIndex, AggregateState state) throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- short count = 1;
-
- int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
- int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
- int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 2);
- int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + bitfieldStart;
- int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
-
- byte[] data = stateAccessor.getBuffer().array();
-
- bitmap |= data[bitoffset];
- count += data[countoffset];
- if (count >= MAX) {
- count = (byte) MAX;
- }
- data[bitoffset] = bitmap;
- data[countoffset] = (byte) count;
- }
-
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = getField(accessor, tIndex, 2);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
- }
-
- }
-
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- outputPartialResult(tupleBuilder, accessor, tIndex, state);
- }
-
-};
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
deleted file mode 100644
index b5eb70f..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.dataflow.aggregators;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-
-/**
- * count
- */
-public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
- private static final long serialVersionUID = 1L;
-
- public MergeKmerAggregateFactory() {
- }
-
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
- throws HyracksDataException {
- return new LocalAggregatorDescriptor();
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
deleted file mode 100644
index 27d5e15..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.driver;
-
-import java.net.URL;
-import java.util.EnumSet;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.GenericOptionsParser;
-
-import edu.uci.ics.genomix.job.GenomixJob;
-import edu.uci.ics.genomix.job.JobGen;
-import edu.uci.ics.genomix.job.JobGenBrujinGraph;
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
-
-public class Driver {
- public static enum Plan {
- BUILD_DEBRUJIN_GRAPH,
- GRAPH_CLEANNING,
- CONTIGS_GENERATION,
- }
-
- private static final String IS_PROFILING = "genomix.driver.profiling";
- private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
- private static final Log LOG = LogFactory.getLog(Driver.class);
- private JobGen jobGen;
- private boolean profiling;
-
- private int numPartitionPerMachine;
-
- private IHyracksClientConnection hcc;
- private Scheduler scheduler;
-
- public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException {
- try {
- hcc = new HyracksConnection(ipAddress, port);
- scheduler = new Scheduler(hcc.getNodeControllerInfos());
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- this.numPartitionPerMachine = numPartitionPerMachine;
- }
-
- public void runJob(GenomixJob job) throws HyracksException {
- runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
- }
-
- public void runJob(GenomixJob job, Plan planChoice, boolean profiling) throws HyracksException {
- /** add hadoop configurations */
- URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
- job.addResource(hadoopCore);
- URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
- job.addResource(hadoopMapRed);
- URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
- job.addResource(hadoopHdfs);
-
- LOG.info("job started");
- long start = System.currentTimeMillis();
- long end = start;
- long time = 0;
-
- this.profiling = profiling;
- try {
- Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
- LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
- switch (planChoice) {
- case BUILD_DEBRUJIN_GRAPH:
- default:
- jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
- break;
- }
-
- start = System.currentTimeMillis();
- runCreate(jobGen);
- end = System.currentTimeMillis();
- time = end - start;
- LOG.info("result writing finished " + time + "ms");
- LOG.info("job finished");
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
-
- private void runCreate(JobGen jobGen) throws Exception {
- try {
- JobSpecification createJob = jobGen.generateJob();
- execute(createJob);
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
- }
-
- private void execute(JobSpecification job) throws Exception {
- job.setUseConnectorPolicyForScheduling(false);
- JobId jobId = hcc
- .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
- hcc.waitForCompletion(jobId);
- }
-
- public static void main(String[] args) throws Exception {
- GenomixJob jobConf = new GenomixJob();
- String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
- if (otherArgs.length < 4) {
- System.err.println("Need <serverIP> <port> <input> <output>");
- System.exit(-1);
- }
- String ipAddress = otherArgs[0];
- int port = Integer.parseInt(otherArgs[1]);
- int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
- boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
- // FileInputFormat.setInputPaths(job, otherArgs[2]);
- {
- Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
- jobConf.set("mapred.input.dir", path.toString());
-
- Path outputDir = new Path(jobConf.getWorkingDirectory(), otherArgs[3]);
- jobConf.set("mapred.output.dir", outputDir.toString());
- }
- // FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
- // FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
- Driver driver = new Driver(ipAddress, port, numOfDuplicate);
- driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
- }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java
new file mode 100644
index 0000000..4d00731
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java
@@ -0,0 +1,26 @@
+package edu.uci.ics.genomix.hyracks.data.accessors;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class ReadIDNormarlizedComputeFactory implements INormalizedKeyComputerFactory{
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+ return new INormalizedKeyComputer(){
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ return IntegerSerializerDeserializer.getInt(bytes, start);
+ }
+
+ };
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeSequenceWriterFactory.java
new file mode 100644
index 0000000..44b8f55
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeSequenceWriterFactory.java
@@ -0,0 +1,27 @@
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class NodeSequenceWriterFactory implements ITupleWriterFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public NodeSequenceWriterFactory(JobConf job) {
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeTextWriterFactory.java
new file mode 100644
index 0000000..21098a9
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeTextWriterFactory.java
@@ -0,0 +1,21 @@
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class NodeTextWriterFactory implements ITupleWriterFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggerateReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
similarity index 96%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggerateReadIDAggregateFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
index 2ed4c9b..cb9db16 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggerateReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
@@ -9,7 +9,7 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-public class AggerateReadIDAggregateFactory implements IAggregatorDescriptorFactory{
+public class AggregateReadIDAggregateFactory implements IAggregatorDescriptorFactory{
/**
*
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
index b83aaf7..a20b60d 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -1,5 +1,75 @@
package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
-public class MergeReadIDAggregateFactory {
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class MergeReadIDAggregateFactory implements IAggregatorDescriptorFactory{
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return new IAggregatorDescriptor(){
+
+ @Override
+ public AggregateState createAggregateStates() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ };
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index 6eb5bac..4357d7a 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -22,31 +22,39 @@
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import edu.uci.ics.genomix.hyracks.data.accessors.KmerBinaryHashFunctionFamily;
import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDNormarlizedComputeFactory;
import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.genomix.hyracks.dataflow.KMerSequenceWriterFactory;
import edu.uci.ics.genomix.hyracks.dataflow.KMerTextWriterFactory;
import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.NodeSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.NodeTextWriterFactory;
import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateReadIDAggregateFactory;
import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeReadIDAggregateFactory;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
@@ -55,7 +63,6 @@
import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
@@ -79,7 +86,7 @@
private Scheduler scheduler;
private String[] ncNodeNames;
- private int kmers;
+ private int kmerSize;
private int frameLimits;
private int frameSize;
private int tableSize;
@@ -87,17 +94,14 @@
private OutputFormat outputFormat;
private boolean bGenerateReversedKmer;
- private AbstractOperatorDescriptor singleGrouper;
- private IConnectorDescriptor connPartition;
- private AbstractOperatorDescriptor crossGrouper;
- private RecordDescriptor readOutputRec;
- private RecordDescriptor combineOutputRec;
-
/** works for hybrid hashing */
private long inputSizeInRawRecords;
private long inputSizeInUniqueKeys;
private int recordSizeInBytes;
private int hashfuncStartLevel;
+ private ExternalGroupOperatorDescriptor readLocalAggregator;
+ private MToNPartitioningConnectorDescriptor readConnPartition;
+ private ExternalGroupOperatorDescriptor readCrossAggregator;
private void logDebug(String status) {
LOG.debug(status + " nc nodes:" + ncNodeNames.length);
@@ -117,154 +121,154 @@
}
private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
- IAggregatorDescriptorFactory aggeragater) {
+ IAggregatorDescriptorFactory aggeragater, IAggregatorDescriptorFactory merger,
+ ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+ IPointableFactory pointable, RecordDescriptor outRed) {
return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
- new KmerNormarlizedComputerFactory(), aggeragater, new MergeKmerAggregateFactory(), combineOutputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(KmerPointable.FACTORY) }), tableSize), true);
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, normalizer,
+ aggeragater, merger, outRed, new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(pointable) }),
+ tableSize), true);
}
- private HybridHashGroupOperatorDescriptor newHybridGroupby(JobSpecification jobSpec, int[] keyFields,
- long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int hashfuncStartLevel,
- IAggregatorDescriptorFactory aggeragater) throws HyracksDataException {
- return new HybridHashGroupOperatorDescriptor(jobSpec, keyFields, frameLimits, inputSizeInRawRecords,
- inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
- new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() }, hashfuncStartLevel,
- new KmerNormarlizedComputerFactory(), aggeragater, new MergeKmerAggregateFactory(), combineOutputRec, true);
- }
-
- private void generateKmerAggeragateDescriptorbyType(JobSpecification jobSpec) throws HyracksDataException {
+ private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec,
+ IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
+ ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+ IPointableFactory pointable, RecordDescriptor outRed) throws HyracksDataException {
int[] keyFields = new int[] { 0 }; // the id of grouped key
+ Object[] obj = new Object[3];
switch (groupbyType) {
case EXTERNAL:
- singleGrouper = newExternalGroupby(jobSpec, keyFields, new AggregateKmerAggregateFactory());
- connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
- crossGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
+ obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
+ outRed);
+ obj[1] = new MToNPartitioningConnectorDescriptor(jobSpec, partition);
+ obj[2] = newExternalGroupby(jobSpec, keyFields, merger, merger, partition, normalizer, pointable,
+ outRed);
break;
case PRECLUSTER:
default:
- singleGrouper = newExternalGroupby(jobSpec, keyFields, new AggregateKmerAggregateFactory());
- connPartition = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
- new KmerHashPartitioncomputerFactory(), keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) });
- crossGrouper = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
- new MergeKmerAggregateFactory(), combineOutputRec);
- break;
- case HYBRIDHASH:
- singleGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
- recordSizeInBytes, hashfuncStartLevel, new AggregateKmerAggregateFactory());
- connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
-
- crossGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
- recordSizeInBytes, hashfuncStartLevel, new MergeKmerAggregateFactory());
+ obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
+ outRed);
+ obj[1] = new MToNPartitioningMergingConnectorDescriptor(jobSpec, partition, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) });
+ obj[2] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, merger,
+ outRed);
break;
}
+ return obj;
}
- public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+ public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec, RecordDescriptor outRec)
+ throws HyracksDataException {
try {
InputSplit[] splits = job.getInputFormat().getSplits(job, ncNodeNames.length);
LOG.info("HDFS read into " + splits.length + " splits");
String[] readSchedule = scheduler.getLocationConstraints(splits);
- return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job, splits, readSchedule,
- new ReadsKeyValueParserFactory(kmers, bGenerateReversedKmer));
+ return new HDFSReadOperatorDescriptor(jobSpec, outRec, job, splits, readSchedule,
+ new ReadsKeyValueParserFactory(kmerSize, bGenerateReversedKmer));
} catch (Exception e) {
throw new HyracksDataException(e);
}
}
- private AbstractOperatorDescriptor newGroupByReadOperator(JobSpecification jobSpec, RecordDescriptor nodeOutputRec) {
- // TODO Auto-generated method stub
- return null;
+ private void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
+ IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
+ jobSpec.connect(conn, preOp, 0, nextOp, 0);
}
-
+
@Override
public JobSpecification generateJob() throws HyracksException {
JobSpecification jobSpec = new JobSpecification();
- readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
- null, null});
- combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+ RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null });
+ RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
jobSpec.setFrameSize(frameSize);
// File input
- HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
-
logDebug("ReadKmer Operator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec, readKmerOutputRec);
- generateKmerAggeragateDescriptorbyType(jobSpec);
+ Object[] objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateKmerAggregateFactory(),
+ new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
+ new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec);
+ AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
logDebug("LocalKmerGroupby Operator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, singleGrouper, ncNodeNames);
-
- IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
- jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
+ connectOperators(jobSpec, readOperator, ncNodeNames, kmerLocalAggregator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
logDebug("CrossKmerGroupby Operator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, crossGrouper, ncNodeNames);
- jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
+ IConnectorDescriptor kmerConnPartition = (IConnectorDescriptor) objs[1];
+ AbstractOperatorDescriptor kmerCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+ connectOperators(jobSpec, kmerLocalAggregator, ncNodeNames, kmerCrossAggregator, ncNodeNames, kmerConnPartition);
logDebug("Map Kmer to Read Operator");
- //Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,Kmer,{OtherReadID,...})
- RecordDescriptor mapToReadOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- null, null,null, null });
- AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec, mapToReadOutputRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, mapKmerToRead, ncNodeNames);
- IConnectorDescriptor mapReadConn = new OneToOneConnectorDescriptor(jobSpec);
- jobSpec.connect(mapReadConn, crossGrouper, 0, mapKmerToRead, 0);
+ //Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,{OtherPosition,...},Kmer)
+ RecordDescriptor readIDOutputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] { null, null, null, null });
+ AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec, readIDOutputRec);
+ connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapKmerToRead, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
logDebug("Group by Read Operator");
- // (ReadID,PosInRead,Kmer,{OtherReadID,...})
- RecordDescriptor nodeOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- null, null, null, null });
- AbstractOperatorDescriptor groupbyReadOperator = newGroupByReadOperator(jobSpec,nodeOutputRec);
- IConnectorDescriptor readPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new ReadIDPartitionComputerFactory());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, groupbyReadOperator, ncNodeNames);
- jobSpec.connect(readPartition, mapKmerToRead, 0, groupbyReadOperator, 0);
+ // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
+ RecordDescriptor nodeCombineRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+ objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateReadIDAggregateFactory(),
+ new MergeReadIDAggregateFactory(), new ReadIDPartitionComputerFactory(),
+ new ReadIDNormarlizedComputeFactory(), IntegerPointable.FACTORY, nodeCombineRec);
+ AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
+ connectOperators(jobSpec, mapKmerToRead, ncNodeNames, readLocalAggregator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
- // Output
- ITupleWriterFactory writer = null;
+ IConnectorDescriptor readconn = (IConnectorDescriptor) objs[1];
+ AbstractOperatorDescriptor readCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+ connectOperators(jobSpec, readLocalAggregator, ncNodeNames, readCrossAggregator, ncNodeNames, readconn);
+
+ // Output Kmer
+ ITupleWriterFactory kmerWriter = null;
+ ITupleWriterFactory nodeWriter = null;
switch (outputFormat) {
case TEXT:
- writer = new KMerTextWriterFactory(kmers);
+ kmerWriter = new KMerTextWriterFactory(kmerSize);
+ nodeWriter = new NodeTextWriterFactory();
break;
case BINARY:
default:
- writer = new KMerSequenceWriterFactory(job);
+ kmerWriter = new KMerSequenceWriterFactory(job);
+ nodeWriter = new NodeSequenceWriterFactory(job);
break;
}
- HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, writer);
-
logDebug("WriteOperator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames);
+ HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, kmerWriter);
+ connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ jobSpec.addRoot(writeKmerOperator);
- IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
- jobSpec.connect(printConn, groupbyReadOperator, 0, writeOperator, 0);
- jobSpec.addRoot(writeOperator);
-
+ // Output Node
+ HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, nodeWriter);
+ connectOperators(jobSpec, readCrossAggregator, ncNodeNames, writeNodeOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ jobSpec.addRoot(writeNodeOperator);
+
if (groupbyType == GroupbyType.PRECLUSTER) {
jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
}
return jobSpec;
}
-
-
@Override
protected void initJobConfiguration() {
- kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
- if (kmers % 2 == 0) {
- kmers--;
- conf.setInt(GenomixJob.KMER_LENGTH, kmers);
+ kmerSize = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+ if (kmerSize % 2 == 0) {
+ kmerSize--;
+ conf.setInt(GenomixJob.KMER_LENGTH, kmerSize);
}
frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
@@ -300,7 +304,7 @@
}
job = new JobConf(conf);
LOG.info("Genomix Graph Build Configuration");
- LOG.info("Kmer:" + kmers);
+ LOG.info("Kmer:" + kmerSize);
LOG.info("Groupby type:" + type);
LOG.info("Output format:" + output);
LOG.info("Frame limit" + frameLimits);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
deleted file mode 100644
index be82477..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.job;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-
-public class GenomixJob extends JobConf {
-
- public static final String JOB_NAME = "genomix";
-
- /** Kmers length */
- public static final String KMER_LENGTH = "genomix.kmer";
- /** Frame Size */
- public static final String FRAME_SIZE = "genomix.framesize";
- /** Frame Limit, hyracks need */
- public static final String FRAME_LIMIT = "genomix.framelimit";
- /** Table Size, hyracks need */
- public static final String TABLE_SIZE = "genomix.tablesize";
- /** Groupby types */
- public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
- /** Graph outputformat */
- public static final String OUTPUT_FORMAT = "genomix.graph.output";
- /** Get reversed Kmer Sequence */
- public static final String REVERSED_KMER = "genomix.kmer.reversed";
-
- /** Configurations used by hybrid groupby function in graph build phrase */
- public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
- public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
- public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
- public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
- public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
-
- public static final int DEFAULT_KMER = 21;
- public static final int DEFAULT_FRAME_SIZE = 32768;
- public static final int DEFAULT_FRAME_LIMIT = 4096;
- public static final int DEFAULT_TABLE_SIZE = 10485767;
- public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
- public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
- public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
- public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
- public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
-
- public static final boolean DEFAULT_REVERSED = false;
-
- public static final String DEFAULT_GROUPBY_TYPE = "hybrid";
- public static final String DEFAULT_OUTPUT_FORMAT = "binary";
-
- public GenomixJob() throws IOException {
- super(new Configuration());
- }
-
- public GenomixJob(Configuration conf) throws IOException {
- super(conf);
- }
-
- /**
- * Set the kmer length
- *
- * @param the
- * desired frame size
- */
- final public void setKmerLength(int kmerlength) {
- setInt(KMER_LENGTH, kmerlength);
- }
-
- final public void setFrameSize(int frameSize) {
- setInt(FRAME_SIZE, frameSize);
- }
-
- final public void setFrameLimit(int frameLimit) {
- setInt(FRAME_LIMIT, frameLimit);
- }
-
- final public void setTableSize(int tableSize) {
- setInt(TABLE_SIZE, tableSize);
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
deleted file mode 100644
index b1f9a29..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.job;
-
-import java.util.UUID;
-
-import org.apache.hadoop.conf.Configuration;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-
-public abstract class JobGen {
-
- protected final Configuration conf;
- protected final GenomixJob genomixJob;
- protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
-
- public JobGen(GenomixJob job) {
- this.conf = job;
- this.genomixJob = job;
- this.initJobConfiguration();
- }
-
- protected abstract void initJobConfiguration();
-
- public abstract JobSpecification generateJob() throws HyracksException;
-
-}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
deleted file mode 100644
index 79b38e8..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.job;
-
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.data.std.accessors.KmerBinaryHashFunctionFamily;
-import edu.uci.ics.genomix.data.std.accessors.KmerHashPartitioncomputerFactory;
-import edu.uci.ics.genomix.data.std.accessors.KmerNormarlizedComputerFactory;
-import edu.uci.ics.genomix.data.std.primitive.KmerPointable;
-import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.genomix.dataflow.KMerSequenceWriterFactory;
-import edu.uci.ics.genomix.dataflow.KMerTextWriterFactory;
-import edu.uci.ics.genomix.dataflow.ReadsKeyValueParserFactory;
-import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
-import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
-
-public class JobGenBrujinGraph extends JobGen {
- public enum GroupbyType {
- EXTERNAL,
- PRECLUSTER,
- HYBRIDHASH,
- }
-
- public enum OutputFormat {
- TEXT,
- BINARY,
- }
-
- JobConf job;
- private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
- private Scheduler scheduler;
- private String[] ncNodeNames;
-
- private int kmers;
- private int frameLimits;
- private int frameSize;
- private int tableSize;
- private GroupbyType groupbyType;
- private OutputFormat outputFormat;
- private boolean bGenerateReversedKmer;
-
- private AbstractOperatorDescriptor singleGrouper;
- private IConnectorDescriptor connPartition;
- private AbstractOperatorDescriptor crossGrouper;
- private RecordDescriptor readOutputRec;
- private RecordDescriptor combineOutputRec;
-
- /** works for hybrid hashing */
- private long inputSizeInRawRecords;
- private long inputSizeInUniqueKeys;
- private int recordSizeInBytes;
- private int hashfuncStartLevel;
-
- private void logDebug(String status) {
- String names = "";
- for (String str : ncNodeNames) {
- names += str + " ";
- }
- LOG.info(status + " nc nodes:" + ncNodeNames.length + " " + names);
- }
-
- public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
- int numPartitionPerMachine) {
- super(job);
- this.scheduler = scheduler;
- String[] nodes = new String[ncMap.size()];
- ncMap.keySet().toArray(nodes);
- ncNodeNames = new String[nodes.length * numPartitionPerMachine];
- for (int i = 0; i < numPartitionPerMachine; i++) {
- System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
- }
- logDebug("initialize");
- }
-
- private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
- IAggregatorDescriptorFactory aggeragater) {
- return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
- new KmerNormarlizedComputerFactory(), aggeragater, new DistributedMergeLmerAggregateFactory(),
- combineOutputRec, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(KmerPointable.FACTORY) }), tableSize), true);
- }
-
- private HybridHashGroupOperatorDescriptor newHybridGroupby(JobSpecification jobSpec, int[] keyFields,
- long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int hashfuncStartLevel,
- IAggregatorDescriptorFactory aggeragater) throws HyracksDataException {
- return new HybridHashGroupOperatorDescriptor(jobSpec, keyFields, frameLimits, inputSizeInRawRecords,
- inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
- new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() }, hashfuncStartLevel,
- new KmerNormarlizedComputerFactory(), aggeragater, new DistributedMergeLmerAggregateFactory(),
- combineOutputRec, true);
- }
-
- private void generateDescriptorbyType(JobSpecification jobSpec) throws HyracksDataException {
- int[] keyFields = new int[] { 0 }; // the id of grouped key
-
- switch (groupbyType) {
- case EXTERNAL:
- singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
- connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
- crossGrouper = newExternalGroupby(jobSpec, keyFields, new DistributedMergeLmerAggregateFactory());
- break;
- case PRECLUSTER:
- singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
- connPartition = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
- new KmerHashPartitioncomputerFactory(), keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) });
- crossGrouper = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
- new DistributedMergeLmerAggregateFactory(), combineOutputRec);
- break;
- case HYBRIDHASH:
- default:
- singleGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
- recordSizeInBytes, hashfuncStartLevel, new MergeKmerAggregateFactory());
- connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
-
- crossGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
- recordSizeInBytes, hashfuncStartLevel, new DistributedMergeLmerAggregateFactory());
- break;
- }
- }
-
- public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
- try {
-
- InputSplit[] splits = job.getInputFormat().getSplits(job, ncNodeNames.length);
-
- LOG.info("HDFS read into " + splits.length + " splits");
- String[] readSchedule = scheduler.getLocationConstraints(splits);
- String log = "";
- for (String schedule : readSchedule) {
- log += schedule + " ";
- }
- LOG.info("HDFS read schedule " + log);
- return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job, splits, readSchedule,
- new ReadsKeyValueParserFactory(kmers, bGenerateReversedKmer));
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public JobSpecification generateJob() throws HyracksException {
-
- JobSpecification jobSpec = new JobSpecification();
- readOutputRec = new RecordDescriptor(
- new ISerializerDeserializer[] { null, ByteSerializerDeserializer.INSTANCE });
- combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
- ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
- jobSpec.setFrameSize(frameSize);
-
- // File input
- HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
-
- logDebug("Read Operator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
-
- generateDescriptorbyType(jobSpec);
- logDebug("SingleGroupby Operator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, singleGrouper, ncNodeNames);
-
- IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
- jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
-
- logDebug("CrossGrouper Operator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, crossGrouper, ncNodeNames);
- jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
-
- // Output
- ITupleWriterFactory writer = null;
- switch (outputFormat) {
- case TEXT:
- writer = new KMerTextWriterFactory(kmers);
- break;
- case BINARY:
- default:
- writer = new KMerSequenceWriterFactory(job);
- break;
- }
- HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, writer);
-
- logDebug("WriteOperator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames);
-
- IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
- jobSpec.connect(printConn, crossGrouper, 0, writeOperator, 0);
- jobSpec.addRoot(writeOperator);
-
- if (groupbyType == GroupbyType.PRECLUSTER) {
- jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- }
- return jobSpec;
- }
-
- @Override
- protected void initJobConfiguration() {
-
- kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
- if (kmers % 2 == 0) {
- kmers--;
- conf.setInt(GenomixJob.KMER_LENGTH, kmers);
- }
- frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
- tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
- frameSize = conf.getInt(GenomixJob.FRAME_SIZE, GenomixJob.DEFAULT_FRAME_SIZE);
- inputSizeInRawRecords = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
- inputSizeInUniqueKeys = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
- recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
- hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
- /** here read the different recordSize why ? */
- recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
-
- bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER, GenomixJob.DEFAULT_REVERSED);
-
- String type = conf.get(GenomixJob.GROUPBY_TYPE, GenomixJob.DEFAULT_GROUPBY_TYPE);
- if (type.equalsIgnoreCase("external")) {
- groupbyType = GroupbyType.EXTERNAL;
- } else if (type.equalsIgnoreCase("precluster")) {
- groupbyType = GroupbyType.PRECLUSTER;
- } else {
- groupbyType = GroupbyType.HYBRIDHASH;
- }
-
- String output = conf.get(GenomixJob.OUTPUT_FORMAT, GenomixJob.DEFAULT_OUTPUT_FORMAT);
- if (output.equalsIgnoreCase("text")) {
- outputFormat = OutputFormat.TEXT;
- } else {
- outputFormat = OutputFormat.BINARY;
- }
- job = new JobConf(conf);
- LOG.info("Genomix Graph Build Configuration");
- LOG.info("Kmer:" + kmers);
- LOG.info("Groupby type:" + type);
- LOG.info("Output format:" + output);
- LOG.info("Frame limit" + frameLimits);
- LOG.info("Frame size" + frameSize);
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
deleted file mode 100644
index a88fa79..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.job;
-
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.util.ByteComparatorFactory;
-import edu.uci.ics.genomix.util.StatCountAggregateFactory;
-import edu.uci.ics.genomix.util.StatReadsKeyValueParserFactory;
-import edu.uci.ics.genomix.util.StatSumAggregateFactory;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
-
-public class JobGenStatistic extends JobGen {
- private int kmers;
- private JobConf hadoopjob;
- private RecordDescriptor readOutputRec;
- private String[] ncNodeNames;
- private Scheduler scheduler;
- private RecordDescriptor combineOutputRec;
-
- public JobGenStatistic(GenomixJob job) {
- super(job);
- // TODO Auto-generated constructor stub
- }
-
- @Override
- protected void initJobConfiguration() {
- // TODO Auto-generated method stub
- kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
- hadoopjob = new JobConf(conf);
- hadoopjob.setInputFormat(SequenceFileInputFormat.class);
- }
-
- @Override
- public JobSpecification generateJob() throws HyracksException {
- int[] degreeFields = { 0, 1 }; // indegree, outdegree
- int[] countFields = { 2 };
- JobSpecification jobSpec = new JobSpecification();
- /** specify the record fields after read */
- readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
- ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
- combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
- ByteSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
- /** the reader */
- HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
-
- /** the combiner aggregator */
- AbstractOperatorDescriptor degreeLocal = connectLocalAggregateByField(jobSpec, degreeFields, readOperator);
- AbstractOperatorDescriptor countLocal = connectLocalAggregateByField(jobSpec, countFields, readOperator);
-
- /** the final aggregator */
- AbstractOperatorDescriptor degreeMerger = connectFinalAggregateByField(jobSpec, degreeFields, degreeLocal);
- AbstractOperatorDescriptor countMerger = connectFinalAggregateByField(jobSpec, countFields, countLocal);
-
- /** writer */
- AbstractFileWriteOperatorDescriptor writeDegree = connectWriter(jobSpec, degreeFields, degreeMerger);
- AbstractFileWriteOperatorDescriptor writeCount = connectWriter(jobSpec, countFields, countMerger);
- jobSpec.addRoot(writeDegree);
- jobSpec.addRoot(writeCount);
- return null;
- }
-
- private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
- try {
-
- InputSplit[] splits = hadoopjob.getInputFormat().getSplits(hadoopjob, ncNodeNames.length);
-
- String[] readSchedule = scheduler.getLocationConstraints(splits);
- return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, hadoopjob, splits, readSchedule,
- new StatReadsKeyValueParserFactory());
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
- IAggregatorDescriptorFactory aggeragater) {
- return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, GenomixJob.DEFAULT_FRAME_LIMIT,
- new IBinaryComparatorFactory[] { new ByteComparatorFactory(), new ByteComparatorFactory() }, null,
- aggeragater, new StatSumAggregateFactory(), combineOutputRec, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- new ByteComparatorFactory(), new ByteComparatorFactory() }),
- GenomixJob.DEFAULT_TABLE_SIZE), true);
- }
-
- private AbstractOperatorDescriptor connectLocalAggregateByField(JobSpecification jobSpec, int[] fields,
- HDFSReadOperatorDescriptor readOperator) {
- AbstractOperatorDescriptor localAggregator = newExternalGroupby(jobSpec, fields,
- new StatCountAggregateFactory());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, localAggregator, ncNodeNames);
- IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
- jobSpec.connect(readfileConn, readOperator, 0, localAggregator, 0);
- return localAggregator;
- }
-
- private AbstractOperatorDescriptor connectFinalAggregateByField(JobSpecification jobSpec, int[] fields,
- AbstractOperatorDescriptor localAggregator) {
- AbstractOperatorDescriptor finalAggregator = newExternalGroupby(jobSpec, fields, new StatSumAggregateFactory());
- // only need one reducer
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, finalAggregator, ncNodeNames[fields[0]
- % ncNodeNames.length]);
- IConnectorDescriptor mergeConn = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
- new ITuplePartitionComputerFactory() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public ITuplePartitionComputer createPartitioner() {
- return new ITuplePartitionComputer() {
- @Override
- public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts)
- throws HyracksDataException {
- return 0;
- }
- };
- }
- }, fields, new IBinaryComparatorFactory[] { new ByteComparatorFactory() });
- jobSpec.connect(mergeConn, localAggregator, 0, finalAggregator, 0);
- return finalAggregator;
- }
-
- private AbstractFileWriteOperatorDescriptor connectWriter(JobSpecification jobSpec, int[] fields,
- AbstractOperatorDescriptor finalAggregator) {
- LineFileWriteOperatorDescriptor writeOperator = new LineFileWriteOperatorDescriptor(jobSpec, null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames[fields[0]
- % ncNodeNames.length]);
-
- IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
- jobSpec.connect(printConn, finalAggregator, 0, writeOperator, 0);
- return writeOperator;
- }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
deleted file mode 100644
index 832966b..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.util;
-
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-
-public class ByteComparatorFactory implements IBinaryComparatorFactory, IBinaryHashFunctionFactory {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- @Override
- public IBinaryComparator createBinaryComparator() {
- return new IBinaryComparator() {
-
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- return b1[s1] - b2[s2];
- }
-
- };
- }
-
- @Override
- public IBinaryHashFunction createBinaryHashFunction() {
- return new IBinaryHashFunction() {
-
- @Override
- public int hash(byte[] bytes, int offset, int length) {
- return bytes[offset];
- }
-
- };
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
deleted file mode 100644
index 65303a8..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.util;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-
-public class StatCountAggregateFactory implements IAggregatorDescriptorFactory {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public class CountAggregator implements IAggregatorDescriptor {
- private final int[] keyFields;
-
- public CountAggregator(int[] keyFields) {
- this.keyFields = keyFields;
- }
-
- @Override
- public AggregateState createAggregateStates() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- int count = 1;
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeInt(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when initializing the aggregator.");
- }
- }
-
- @Override
- public void reset() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
- int stateTupleIndex, AggregateState state) throws HyracksDataException {
- int count = 1;
-
- int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
- int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, keyFields.length);
- int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
-
- byte[] data = stateAccessor.getBuffer().array();
- count += IntegerSerializerDeserializer.getInt(data, countoffset);
- IntegerSerializerDeserializer.putInt(count, data, countoffset);
- }
-
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- int count = getCount(accessor, tIndex);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeInt(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
- }
-
- }
-
- protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, keyFields.length);
- int countoffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
- byte[] data = accessor.getBuffer().array();
-
- return IntegerSerializerDeserializer.getInt(data, countoffset);
- }
-
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- outputPartialResult(tupleBuilder, accessor, tIndex, state);
- }
-
- @Override
- public void close() {
- // TODO Auto-generated method stub
-
- }
-
- }
-
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
- throws HyracksDataException {
- // TODO Auto-generated method stub
- return new CountAggregator(keyFields);
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
deleted file mode 100644
index 2fcca67..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.util;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
-
-public class StatReadsKeyValueParserFactory implements IKeyValueParserFactory<KmerBytesWritable, KmerCountValue> {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- @Override
- public IKeyValueParser<KmerBytesWritable, KmerCountValue> createKeyValueParser(IHyracksTaskContext ctx)
- throws HyracksDataException {
-
- final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
- final ByteBuffer outputBuffer = ctx.allocateFrame();
- final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
- outputAppender.reset(outputBuffer, true);
-
- return new IKeyValueParser<KmerBytesWritable, KmerCountValue>() {
-
- @Override
- public void open(IFrameWriter writer) throws HyracksDataException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void parse(KmerBytesWritable key, KmerCountValue value, IFrameWriter writer)
- throws HyracksDataException {
- byte adjMap = value.getAdjBitMap();
- byte count = value.getCount();
- InsertToFrame((byte) (GeneCode.inDegree(adjMap)), (byte) (GeneCode.outDegree(adjMap)), count, writer);
- }
-
- @Override
- public void close(IFrameWriter writer) throws HyracksDataException {
- FrameUtils.flushFrame(outputBuffer, writer);
- }
-
- private void InsertToFrame(byte indegree, byte outdegree, byte count, IFrameWriter writer) {
- try {
- tupleBuilder.reset();
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, indegree);
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, outdegree);
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, count);
-
- if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- throw new IllegalStateException(
- "Failed to copy an record into a frame: the record size is too large.");
- }
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- };
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
deleted file mode 100644
index 39ac60a..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.util;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-
-public class StatSumAggregateFactory implements IAggregatorDescriptorFactory {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {
-
- private final int[] keyFields;
-
- public DistributeAggregatorDescriptor(int[] keyFields) {
- this.keyFields = keyFields;
- }
-
- @Override
- public AggregateState createAggregateStates() {
- // TODO Auto-generated method stub
- return null;
- }
-
- protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
- int countoffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
- byte[] data = accessor.getBuffer().array();
- return IntegerSerializerDeserializer.getInt(data, countoffset);
- }
-
- @Override
- public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- int count = getCount(accessor, tIndex);
-
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeInt(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when initializing the aggregator.");
- }
- }
-
- @Override
- public void reset() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
- int stateTupleIndex, AggregateState state) throws HyracksDataException {
- int count = getCount(accessor, tIndex);
-
- int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
- int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
- int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
-
- byte[] data = stateAccessor.getBuffer().array();
- count += IntegerSerializerDeserializer.getInt(data, countoffset);
- IntegerSerializerDeserializer.putInt(count, data, countoffset);
- }
-
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- int count = getCount(accessor, tIndex);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeInt(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
- }
-
- }
-
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- outputPartialResult(tupleBuilder, accessor, tIndex, state);
-
- }
-
- @Override
- public void close() {
- // TODO Auto-generated method stub
-
- }
-
- }
-
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
- throws HyracksDataException {
- // TODO Auto-generated method stub
- return new DistributeAggregatorDescriptor(keyFields);
- }
-
-}