Update MergeGraph
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3002 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
new file mode 100644
index 0000000..6e832a6
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
@@ -0,0 +1,227 @@
+package edu.uci.ics.pregelix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+
+import edu.uci.ics.pregelix.SequenceFile.GenerateSequenceFile;
+import edu.uci.ics.pregelix.bitwise.BitwiseOperation;
+
+public class GraphVertexOperation {
+ public static final int k = 3; //kmer, k: the length of kmer
+ static private final Path TMP_DIR = new Path(
+ GenerateSequenceFile.class.getSimpleName() + "_INTERIM");
+ /**
+ * Single Vertex: in-degree = out-degree = 1
+ * @param vertexValue
+ */
+ public static boolean isPathVertex(ByteWritable vertexValue){
+ byte value = vertexValue.get();
+ byte[] bit = new byte[8];
+ for(int i = 0; i < 8; i++)
+ bit[i] = (byte) ((value >> i) & 0x01);
+
+ //check out-degree
+ if(((bit[0]==1)&&(bit[1]==0)&&(bit[2]==0)&&(bit[3]==0))
+ || ((bit[0]==0)&&(bit[1]==1)&&(bit[2]==0)&&(bit[3]==0))
+ || ((bit[0]==0)&&(bit[1]==0)&&(bit[2]==1)&&(bit[3]==0))
+ || ((bit[0]==0)&&(bit[1]==0)&&(bit[2]==0)&&(bit[3]==1))
+ ){
+ //check in-degree
+ if(((bit[4]==1)&&(bit[5]==0)&&(bit[6]==0)&&(bit[7]==0))
+ || ((bit[4]==0)&&(bit[5]==1)&&(bit[6]==0)&&(bit[7]==0))
+ || ((bit[4]==0)&&(bit[5]==0)&&(bit[6]==1)&&(bit[7]==0))
+ || ((bit[4]==0)&&(bit[5]==0)&&(bit[6]==0)&&(bit[7]==1))
+ )
+ return true;
+ else
+ return false;
+ }
+ else
+ return false;
+ }
+ /**
+ * Head Vertex: out-degree = 1, in-degree != 1
+ * @param vertexValue
+ */
+ public static boolean isHead(ByteWritable vertexValue){
+ byte value = vertexValue.get();
+ byte[] bit = new byte[8];
+ for(int i = 0; i < 8; i++)
+ bit[i] = (byte) ((value >> i) & 0x01);
+
+ //check out-degree
+ if(((bit[0]==1)&&(bit[1]==0)&&(bit[2]==0)&&(bit[3]==0))
+ || ((bit[0]==0)&&(bit[1]==1)&&(bit[2]==0)&&(bit[3]==0))
+ || ((bit[0]==0)&&(bit[1]==0)&&(bit[2]==1)&&(bit[3]==0))
+ || ((bit[0]==0)&&(bit[1]==0)&&(bit[2]==0)&&(bit[3]==1))
+ ){
+ //check in-degree
+ if(!((bit[4]==1)&&(bit[5]==0)&&(bit[6]==0)&&(bit[7]==0))
+ && !((bit[4]==0)&&(bit[5]==1)&&(bit[6]==0)&&(bit[7]==0))
+ && !((bit[4]==0)&&(bit[5]==0)&&(bit[6]==1)&&(bit[7]==0))
+ && !((bit[4]==0)&&(bit[5]==0)&&(bit[6]==0)&&(bit[7]==1))
+ )
+ return true;
+ else
+ return false;
+ }
+ else
+ return false;
+ }
+ /**
+ * Rear Vertex: out-degree != 1, in-degree = 1
+ * @param vertexValue
+ */
+ public static boolean isRear(ByteWritable vertexValue){
+ byte value = vertexValue.get();
+ byte[] bit = new byte[8];
+ for(int i = 0; i < 8; i++)
+ bit[i] = (byte) ((value >> i) & 0x01);
+
+ //check out-degree
+ if(!((bit[0]==1)&&(bit[1]==0)&&(bit[2]==0)&&(bit[3]==0))
+ && !((bit[0]==0)&&(bit[1]==1)&&(bit[2]==0)&&(bit[3]==0))
+ && !((bit[0]==0)&&(bit[1]==0)&&(bit[2]==1)&&(bit[3]==0))
+ && !((bit[0]==0)&&(bit[1]==0)&&(bit[2]==0)&&(bit[3]==1))
+ ){
+ //check in-degree
+ if(((bit[4]==1)&&(bit[5]==0)&&(bit[6]==0)&&(bit[7]==0))
+ || ((bit[4]==0)&&(bit[5]==1)&&(bit[6]==0)&&(bit[7]==0))
+ || ((bit[4]==0)&&(bit[5]==0)&&(bit[6]==1)&&(bit[7]==0))
+ || ((bit[4]==0)&&(bit[5]==0)&&(bit[6]==0)&&(bit[7]==1))
+ )
+ return true;
+ else
+ return false;
+ }
+ else
+ return false;
+ }
+ /**
+ * write Kmer to Sequence File for test
+ * @param arrayOfKeys
+ * @param arrayOfValues
+ * @param step
+ * @throws IOException
+ */
+ public void writeKmerToSequenceFile(ArrayList<BytesWritable> arrayOfKeys, ArrayList<ByteWritable> arrayOfValues, long step) throws IOException{
+
+ Configuration conf = new Configuration();
+ Path outDir = new Path(TMP_DIR, "out");
+ Path outFile = new Path(outDir, "B" + Long.toString(step));
+ FileSystem fileSys = FileSystem.get(conf);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ outFile, BytesWritable.class, ByteWritable.class,
+ CompressionType.NONE);
+
+ //wirte to sequence file
+ for(int i = 0; i < arrayOfKeys.size(); i++)
+ writer.append(arrayOfKeys.get(i), arrayOfValues.get(i));
+ writer.close();
+ }
+ /**
+ * check what kind of succeed node
+ * return 0:A 1:C 2:G 3:T 4:nothing
+ */
+ public static int findSucceedNode(byte vertexValue){
+ String firstBit = "00000001"; //A
+ String secondBit = "00000010"; //C
+ String thirdBit = "00000100"; //G
+ String fourthBit = "00001000"; //T
+ int first = BitwiseOperation.convertBinaryStringToByte(firstBit) & 0xff;
+ int second = BitwiseOperation.convertBinaryStringToByte(secondBit) & 0xff;
+ int third = BitwiseOperation.convertBinaryStringToByte(thirdBit) & 0xff;
+ int fourth = BitwiseOperation.convertBinaryStringToByte(fourthBit) & 0xff;
+ int value = vertexValue & 0xff;
+ int tmp = value & first;
+ if(tmp != 0)
+ return 0;
+ else{
+ tmp = value & second;
+ if(tmp != 0)
+ return 1;
+ else{
+ tmp = value & third;
+ if(tmp != 0)
+ return 2;
+ else{
+ tmp = value & fourth;
+ if(tmp != 0)
+ return 3;
+ else
+ return 4;
+ }
+ }
+ }
+ }
+ /**
+ * replace last two bits based on n
+ * Ex. 01 10 00(nothing) -> 01 10 00(A)/01(C)/10(G)/11(T)
+ */
+ public static byte[] replaceLastTwoBits(byte[] vertexId, int n){
+ String binaryStringVertexId = BitwiseOperation.convertBytesToBinaryStringKmer(vertexId, 3);
+ String resultString = "";
+ for(int i = 0; i < binaryStringVertexId.length()-2; i++)
+ resultString += binaryStringVertexId.charAt(i);
+ switch(n){
+ case 0:
+ resultString += "00";
+ break;
+ case 1:
+ resultString += "01";
+ break;
+ case 2:
+ resultString += "10";
+ break;
+ case 3:
+ resultString += "11";
+ break;
+ default:
+ break;
+ }
+
+ return BitwiseOperation.convertBinaryStringToBytes(resultString);
+ }
+ /**
+ * find the vertexId of the destination node
+ */
+ public static byte[] getDestVertexId(byte[] sourceVertexId, byte vertexValue){
+ byte[] destVertexId = BitwiseOperation.shiftBitsLeft(sourceVertexId, 2);
+ return replaceLastTwoBits(destVertexId, findSucceedNode(vertexValue));
+ }
+ /**
+ * update the chain vertexId
+ */
+ public static byte[] updateChainVertexId(byte[] chainVertexId, int lengthOfChainVertex, byte[] newVertexId){
+ return BitwiseOperation.addLastTwoBits(chainVertexId,lengthOfChainVertex,BitwiseOperation.getLastTwoBits(newVertexId,k));
+ }
+ /**
+ * get the first kmer from chainVertexId
+ */
+ public static byte[] getFirstKmer(byte[] chainVertexId){
+ String originalVertexId = BitwiseOperation.convertBytesToBinaryString(chainVertexId);
+ return BitwiseOperation.convertBinaryStringToBytes(originalVertexId.substring(0,k-1));
+ }
+ /**
+ * get the last kmer from chainVertexId
+ */
+ public static byte[] getLastKmer(byte[] chainVertexId, int lengthOfChainVertex){
+ String originalVertexId = BitwiseOperation.convertBytesToBinaryString(chainVertexId);
+ return BitwiseOperation.convertBinaryStringToBytes(originalVertexId.substring(lengthOfChainVertex-1-k+1,lengthOfChainVertex-1));
+ }
+ /**
+ * read vertexId from RecordReader
+ */
+ public static BytesWritable readVertexIdFromRecordReader(BytesWritable currentKey){
+ String finalBinaryString = BitwiseOperation.convertBytesToBinaryStringKmer(currentKey.getBytes(),k);
+ return new BytesWritable(BitwiseOperation.convertBinaryStringToBytes(finalBinaryString));
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java
new file mode 100644
index 0000000..2325ac3
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java
@@ -0,0 +1,165 @@
+package edu.uci.ics.pregelix;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.io.MessageWritable;
+import edu.uci.ics.pregelix.hdfs.HDFSOperation;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: MessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+public class MergeGraphVertex extends Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
+
+ private byte[] tmpSourceVertextId;
+ private byte[] tmpDestVertexId;
+ private byte[] tmpChainVertexId;
+ private MessageWritable tmpMsg = new MessageWritable();
+ public static final int k = 3; //kmer, k = 3
+ /**
+ * For test, in compute method, make each vertexValue shift 1 to left.
+ * It will be modified when going forward to next step.
+ */
+ @Override
+ public void compute(Iterator<MessageWritable> msgIterator) {
+ if (getSuperstep() == 1) {
+ if(GraphVertexOperation.isHead(getVertexValue())){
+ tmpSourceVertextId = getVertexId().getBytes();
+ tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId, getVertexValue().get());
+ tmpMsg.setSourceVertexIdOrNeighberInfo(tmpSourceVertextId);
+ tmpChainVertexId = new byte[0];
+ tmpMsg.setChainVertexId(tmpChainVertexId);
+ sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+ }
+ }
+ //path node sends message back to head node
+ else if(getSuperstep()%2 == 0){
+ if(msgIterator.hasNext()){
+ if(GraphVertexOperation.isPathVertex(getVertexValue())){
+ tmpMsg = msgIterator.next();
+ tmpSourceVertextId = tmpMsg.getSourceVertexIdOrNeighberInfo();
+ byte[] tmpBytes = GraphVertexOperation.getDestVertexId(getVertexId().getBytes(), getVertexValue().get());
+ tmpMsg.setSourceVertexIdOrNeighberInfo(tmpBytes); //set neighber
+ tmpChainVertexId = tmpMsg.getChainVertexId();
+ if(tmpChainVertexId.length == 0){
+ tmpMsg.setChainVertexId(getVertexId().getBytes());
+ tmpMsg.setLengthOfChain(k);
+ }
+ else{
+ tmpMsg.setChainVertexId(GraphVertexOperation.updateChainVertexId(tmpChainVertexId,tmpMsg.getLengthOfChain(),getVertexId().getBytes()));
+ tmpMsg.incrementLength();
+ }
+ sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
+ }
+ else if(GraphVertexOperation.isRear(getVertexValue())){
+ tmpMsg = msgIterator.next();
+ tmpSourceVertextId = tmpMsg.getSourceVertexIdOrNeighberInfo();
+ tmpMsg.setSourceVertexIdOrNeighberInfo(getVertexId().getBytes());
+ tmpMsg.setRear(true);
+ sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
+ }
+ else voteToHalt();
+ }
+ }
+ //head node sends message to path node
+ else if(getSuperstep()%2 == 1){
+ while (msgIterator.hasNext()){
+ tmpMsg = msgIterator.next();
+ if(!tmpMsg.isRear()){
+ tmpSourceVertextId = getVertexId().getBytes();
+ tmpDestVertexId = tmpMsg.getSourceVertexIdOrNeighberInfo();
+ tmpMsg.setSourceVertexIdOrNeighberInfo(tmpSourceVertextId);
+ sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+ }
+ else{
+
+ try {
+ HDFSOperation hdfsOperation = new HDFSOperation();
+ HDFSOperation.insertHDFSFile("testHDFS/chainVertex", tmpMsg.getLengthOfChain(), tmpMsg.getChainVertexId());
+ } catch (IOException e) { e.printStackTrace(); }
+ signalTerminate();
+ }
+ }
+ }
+ }
+
+ private void signalTerminate() {
+ Configuration conf = getContext().getConfiguration();
+ try {
+ IterationUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
+ writeMergeGraphResult(conf, true);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private void writeMergeGraphResult(Configuration conf, boolean terminate) {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "MergeGraph";
+ Path path = new Path(pathStr);
+ if (!dfs.exists(path)) {
+ FSDataOutputStream output = dfs.create(path, true);
+ output.writeBoolean(terminate);
+ output.flush();
+ output.close();
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(MergeGraphVertex.class.getSimpleName());
+ job.setVertexClass(MergeGraphVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
+ job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(ByteWritable.class);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/ConvertToSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/ConvertToSequenceFile.java
index e7c022d..304d764 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/ConvertToSequenceFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/ConvertToSequenceFile.java
@@ -33,7 +33,7 @@
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setInputFormatClass(TextInputFormat.class);
- TextInputFormat.addInputPath(job, new Path("folder/test.dat"));
+ TextInputFormat.addInputPath(job, new Path("data/webmap/part-00000"));
SequenceFileOutputFormat.setOutputPath(job, new Path("folder_seq"));
// submit and wait for completion
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
index 66676ba..e52f979 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
@@ -1,6 +1,6 @@
package edu.uci.ics.pregelix.SequenceFile;
-import java.math.BigInteger;
+import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
@@ -8,29 +8,35 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.pregelix.bitwise.BitwiseOperation;
public class GenerateSequenceFile {
static private final Path TMP_DIR = new Path(
GenerateSequenceFile.class.getSimpleName() + "_TMP");
+ private static Path outDir = new Path(TMP_DIR, "out");
+ private final static int k = 3;
-
-
- public static void main(String[] argv) throws Exception {
-
- //write output to a file
+ /**
+ * create test.dat
+ * A - ACG - A 000110 00010001 06 11
+ * C - ACT - C 000111 00100010 07 22
+ * G - CGT - G 011011 01000100 1B 44
+ * T - GTC - T 101101 10001000 2D 88
+ */
+ public static void createTestDat() throws IOException{
+ //write output to a file
Configuration conf = new Configuration();
- Path outDir = new Path(TMP_DIR, "out");
- Path outFile = new Path(outDir, "reduce-out");
+ Path outFile = new Path(outDir, "test-out.dat");
FileSystem fileSys = FileSystem.get(conf);
SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
outFile, BytesWritable.class, ByteWritable.class,
CompressionType.NONE);
+
//Generate <key,value> <BytesWritable, ByteWritable>
byte[] key = hexStringToByteArray("06"); //000110
byte[] value = hexStringToByteArray("11"); //00010001
@@ -69,28 +75,177 @@
for(int i = 0; i < arrayOfKeys.size(); i++)
writer.append(arrayOfKeys.get(i), arrayOfValues.get(i));
writer.close();
-
+
//read outputs
- Path inFile = new Path(outDir, "reduce-out");
+ Path inFile = new Path(outDir, "test-out.dat");
BytesWritable outKey = new BytesWritable();
ByteWritable outValue = new ByteWritable();
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
try {
- reader.next(outKey, outValue);
+ reader.next(outKey, outValue);
System.out.println(outKey.getBytes());
System.out.println(outValue.get());
} finally {
reader.close();
}
+ }
+
+ /**
+ * create a mergeTest SequenceFile
+ * CAG - AGC - GCG - CGT - GTA - TAT - ATA
+ * GAG ATC
+ *
+ * CAG 010010 00000010
+ * AGC 001001 01100100
+ * GCG 100110 00011000
+ * CGT 011011 01000001
+ * GTA 101100 00101000
+ * TAT 110011 01000011
+ * ATA 001100 10000000
+ * GAG 100010 00000010
+ * ATC 001101 10000000
+ */
+ public static void createMergeTest() throws IOException{
+ //write output to a file
+ Configuration conf = new Configuration();
+ Path outFile = new Path(outDir, "mergeTest");
+ FileSystem fileSys = FileSystem.get(conf);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ outFile, BytesWritable.class, ByteWritable.class,
+ CompressionType.NONE);
+
+
+ //Generate <key,value> <BytesWritable, ByteWritable>
+ // 1
+ String tmpKey = "010010";
+ byte[] key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ String tmpValue = "00000010";
+ byte value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ BytesWritable keyWritable = new BytesWritable(key);
+ ByteWritable valueWritable = new ByteWritable(value);
+
+ ArrayList<BytesWritable> arrayOfKeys = new ArrayList<BytesWritable>();
+ arrayOfKeys.add(keyWritable);
+ ArrayList<ByteWritable> arrayOfValues = new ArrayList<ByteWritable>();
+ arrayOfValues.add(valueWritable);
+
+ // 2
+ tmpKey = "001001";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "01100100";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ // 3
+ tmpKey = "100110";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "00011000";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ // 4
+ tmpKey = "011011";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "01000001";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ // 5
+ tmpKey = "101100";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "00101000";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ // 6
+ tmpKey = "110011";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "01000011";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ // 7
+ tmpKey = "001100";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "10000000";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ // 8
+ tmpKey = "100010";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "00000010";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ // 9
+ tmpKey = "001101";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "10000000";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ //wirte to sequence file
+ for(int i = 0; i < arrayOfKeys.size(); i++)
+ writer.append(arrayOfKeys.get(i), arrayOfValues.get(i));
+ writer.close();
+
+ //read outputs
+ Path inFile = new Path(outDir, "mergeTest");
+ BytesWritable outKey = new BytesWritable();
+ ByteWritable outValue = new ByteWritable();
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
+ int iteration = 1;
+ try {
+ while(reader.next(outKey, outValue)){
+ System.out.println(iteration);
+ System.out.println("key: " + BitwiseOperation.convertBytesToBinaryStringKmer(outKey.getBytes(),k));
+ System.out.println("value: " + BitwiseOperation.convertByteToBinaryString(outValue.get()));
+ System.out.println();
+ iteration++;
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
+ public static void main(String[] argv) throws Exception {
+ //createTestDat();
+ createMergeTest();
+ createTestDat();
}
public static byte[] hexStringToByteArray(String s) {
- int len = s.length();
- byte[] data = new byte[len / 2];
- for (int i = 0; i < len; i += 2) {
- data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
- + Character.digit(s.charAt(i+1), 16));
- }
- return data;
- }
+ int len = s.length();
+ byte[] data = new byte[len / 2];
+ for (int i = 0; i < len; i += 2) {
+ data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
+ + Character.digit(s.charAt(i+1), 16));
+ }
+ return data;
+ }
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/bitwise/BitwiseOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/bitwise/BitwiseOperation.java
new file mode 100644
index 0000000..3e4db6e
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/bitwise/BitwiseOperation.java
@@ -0,0 +1,103 @@
+package edu.uci.ics.pregelix.bitwise;
+
+public class BitwiseOperation {
+
+ public static byte[] shiftBitsLeft(byte[] bytes, final int leftShifts) {
+ assert leftShifts >= 1 && leftShifts <= 7;
+
+ byte[] resultBytes = new byte[bytes.length];
+ for(int i = 0; i < bytes.length; i++)
+ resultBytes[i] = bytes[i];
+ final int rightShifts = 8 - leftShifts;
+
+ byte previousByte = resultBytes[bytes.length - 1]; // keep the byte before modification
+ resultBytes[resultBytes.length - 1] = (byte) (((resultBytes[resultBytes.length - 1] & 0xff) << leftShifts));
+ for (int i = resultBytes.length - 2; i >= 0; i--) {
+ byte tmp = resultBytes[i];
+ resultBytes[i] = (byte) (((resultBytes[i] & 0xff) << leftShifts) | ((previousByte & 0xff) >> rightShifts));
+ previousByte = tmp;
+ }
+ return resultBytes;
+ }
+
+ public static byte[] shiftBitsRight(byte[] bytes, final int rightShifts) {
+ assert rightShifts >= 1 && rightShifts <= 7;
+
+ byte[] resultBytes = new byte[bytes.length];
+ for(int i = 0; i < bytes.length; i++)
+ resultBytes[i] = bytes[i];
+ final int leftShifts = 8 - rightShifts;
+
+ byte previousByte = resultBytes[0]; // keep the byte before modification
+ resultBytes[0] = (byte) (((resultBytes[0] & 0xff) >> rightShifts));
+ for (int i = 1; i < resultBytes.length; i++) {
+ byte tmp = resultBytes[i];
+ resultBytes[i] = (byte) (((resultBytes[i] & 0xff) >> rightShifts) | ((previousByte & 0xff) << leftShifts));
+ previousByte = tmp;
+ }
+ return resultBytes;
+ }
+
+ public static byte convertBinaryStringToByte(String input){
+ int tmpInt = Integer.parseInt(input,2);
+ byte tmpByte = (byte) tmpInt;
+ return tmpByte;
+ }
+
+ public static byte[] convertBinaryStringToBytes(String input){
+ input = complementString(input);
+ int numOfBytes = input.length() / 8;
+ byte[] bytes = new byte[numOfBytes];
+ for(int i = 0; i < numOfBytes; ++i)
+ {
+ bytes[i] = convertBinaryStringToByte(input.substring(8 * i, 8 * i + 8));
+ }
+ return bytes;
+ }
+
+ public static String complementString(String input){
+ int remaining = input.length() % 8;
+ if(remaining == 0)
+ return input;
+ for(int i = 0; i < 8 - remaining; i++)
+ input += "0";
+ return input;
+ }
+
+ public static String convertByteToBinaryString(byte b){
+ StringBuilder sb = new StringBuilder(Byte.SIZE);
+ for( int i = 0; i < Byte.SIZE; i++ )
+ sb.append((b << i % Byte.SIZE & 0x80) == 0 ? '0' : '1');
+ return sb.toString();
+ }
+
+ public static String convertBytesToBinaryString(byte[] bytes){
+ StringBuilder sb = new StringBuilder(bytes.length * Byte.SIZE);
+ for( int i = 0; i < Byte.SIZE * bytes.length; i++ )
+ sb.append((bytes[i / Byte.SIZE] << i % Byte.SIZE & 0x80) == 0 ? '0' : '1');
+ return sb.toString();
+ }
+
+ public static String convertBytesToBinaryStringKmer(byte[] bytes, int k){
+ StringBuilder sb = new StringBuilder(bytes.length * Byte.SIZE);
+ for( int i = 0; i < Byte.SIZE * bytes.length; i++ )
+ sb.append((bytes[i / Byte.SIZE] << i % Byte.SIZE & 0x80) == 0 ? '0' : '1');
+ return sb.toString().substring(0,2*k);
+ }
+
+ public static byte[] addLastTwoBits(byte[] bytes, int lengthOfChainVertex, String newBits){
+ String originalBytes = convertBytesToBinaryString(bytes).substring(0,2*lengthOfChainVertex);
+ originalBytes += newBits;
+ return convertBinaryStringToBytes(originalBytes);
+ }
+
+ public static String getLastTwoBits(byte[] bytes, int k){
+ String sb = convertBytesToBinaryString(bytes);
+ int num = k / 4;
+ int extraBit = k % 4;
+ if(extraBit >= 1)
+ return sb.substring(8*num + 2*(extraBit-1),8*num + 2*extraBit);
+ else
+ return sb.substring(8*num - 2, 8*num);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java
index 0dbd800..eed352d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java
@@ -9,72 +9,126 @@
public class MessageWritable implements WritableComparable<MessageWritable>{
/**
- * bytes stores the chains of connected DNA
+ * sourceVertexId stores source vertexId when headVertex sends the message
+ * stores neighber vertexValue when pathVertex sends the message
+ * chainVertexId stores the chains of connected DNA
* file stores the point to the file that stores the chains of connected DNA
*/
- private byte[] bytes;
+ private byte[] sourceVertexIdOrNeighberInfo;
+ private byte[] chainVertexId;
private File file;
+ private boolean isRear;
+ private int lengthOfChain;
+ private static int k = 3;
public MessageWritable(){
}
- public MessageWritable(byte[] bytes, File file){
- set(bytes,file);
- }
-
- public void set(byte[] bytes, File file){
- this.bytes = bytes;
+ public void set(byte[] sourceVertexIdOrNeighberInfo, byte[] chainVertexId, File file){
+ this.sourceVertexIdOrNeighberInfo = sourceVertexIdOrNeighberInfo;
+ this.chainVertexId = chainVertexId;
this.file = file;
+ this.isRear = false;
+ this.lengthOfChain = 0;
}
-
- public byte[] getBytes() {
- return bytes;
+
+ public byte[] getSourceVertexIdOrNeighberInfo() {
+ return sourceVertexIdOrNeighberInfo;
}
-
- public File getFile(){
+
+ public void setSourceVertexIdOrNeighberInfo(byte[] sourceVertexIdOrNeighberInfo) {
+ this.sourceVertexIdOrNeighberInfo = sourceVertexIdOrNeighberInfo;
+ }
+
+ public byte[] getChainVertexId() {
+ return chainVertexId;
+ }
+
+ public void setChainVertexId(byte[] chainVertexId) {
+ this.chainVertexId = chainVertexId;
+ }
+
+ public File getFile() {
return file;
}
+ public void setFile(File file) {
+ this.file = file;
+ }
+
+ public boolean isRear() {
+ return isRear;
+ }
+
+ public void setRear(boolean isRear) {
+ this.isRear = isRear;
+ }
+
+ public int getLengthOfChain() {
+ return lengthOfChain;
+ }
+
+ public void setLengthOfChain(int lengthOfChain) {
+ this.lengthOfChain = lengthOfChain;
+ }
+
+ public void incrementLength(){
+ this.lengthOfChain++;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
- out.write(bytes);
- out.writeUTF(file.getAbsolutePath());
+ out.writeInt(lengthOfChain);
+ if(lengthOfChain != 0)
+ out.write(chainVertexId);
+ out.write(sourceVertexIdOrNeighberInfo);
+ out.writeBoolean(isRear);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
- in.readFully(bytes);
- String absolutePath = in.readUTF();
- file = new File(absolutePath);
+ lengthOfChain = in.readInt();
+ if(lengthOfChain != 0){
+ chainVertexId = new byte[(lengthOfChain-1)/4 + 1];
+ in.readFully(chainVertexId);
+ }
+ else
+ chainVertexId = new byte[0];
+ if(lengthOfChain % 2 == 0)
+ sourceVertexIdOrNeighberInfo = new byte[(k-1)/4 + 1];
+ else
+ sourceVertexIdOrNeighberInfo = new byte[1];
+ in.readFully(sourceVertexIdOrNeighberInfo);
+ isRear = in.readBoolean();
}
@Override
public int hashCode() {
int hashCode = 0;
- for(int i = 0; i < bytes.length; i++)
- hashCode = (int)bytes[i];
+ for(int i = 0; i < chainVertexId.length; i++)
+ hashCode = (int)chainVertexId[i];
return hashCode;
}
@Override
public boolean equals(Object o) {
if (o instanceof MessageWritable) {
MessageWritable tp = (MessageWritable) o;
- return bytes == tp.bytes && file == tp.file;
+ return chainVertexId == tp.chainVertexId && file == tp.file;
}
return false;
}
@Override
public String toString() {
- return bytes.toString() + "\t" + file.getAbsolutePath();
+ return chainVertexId.toString() + "\t" + file.getAbsolutePath();
}
@Override
public int compareTo(MessageWritable tp) {
// TODO Auto-generated method stub
int cmp;
- if (bytes == tp.bytes)
+ if (chainVertexId == tp.chainVertexId)
cmp = 0;
else
cmp = 1;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/hdfs/HDFSOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/hdfs/HDFSOperation.java
new file mode 100644
index 0000000..ea972ef
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/hdfs/HDFSOperation.java
@@ -0,0 +1,123 @@
+package edu.uci.ics.pregelix.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class HDFSOperation {
+
+ private static Configuration conf;
+ private static FileSystem hdfs;
+ private static Path path;
+
+ public HDFSOperation() throws IOException{
+ conf = new Configuration();
+ hdfs = FileSystem.get(conf);
+ path = null;
+ }
+
+ public static boolean insertHDFSFile(String fileName, int length, byte[] buffer) throws IOException{
+ path = new Path(fileName);
+ if (!hdfs.exists(path))
+ createHDFSFile(fileName,length,buffer);
+ else
+ appendHDFSFile(fileName,length,buffer);
+ return true;
+ }
+
+ public static boolean createHDFSFile(String fileName, int length, byte[] buffer) throws IOException{
+ path = new Path(fileName);
+ if (hdfs.exists(path)){
+ System.out.println("Output already exists");
+ return false;
+ }
+ /*if (!hdfs.isFile(path)){
+ System.out.println("Output should be a file");
+ return false;
+ }*/
+ FSDataOutputStream outputStream = hdfs.create(path);
+ outputStream.writeInt(length);
+ outputStream.write(buffer);
+ outputStream.close();
+ return true;
+ }
+
+ public static boolean appendHDFSFile(String fileName, int length, byte[] buffer) throws IOException{
+ path = new Path(fileName);
+ if (!hdfs.exists(path)){
+ System.out.println("Output not found");
+ return false;
+ }
+ if (!hdfs.isFile(path)){
+ System.out.println("Output should be a file");
+ return false;
+ }
+ FSDataOutputStream outputStream = hdfs.append(path);
+ outputStream.writeInt(length);
+ outputStream.write(buffer);
+ outputStream.close();
+ return true;
+ }
+
+ public static boolean deleteHDFSFile(String fileName) throws IOException{
+ path = new Path(fileName);
+ if (!hdfs.exists(path)){
+ System.out.println("Input file not found");
+ return false;
+ }
+ if (!hdfs.isFile(path)){
+ System.out.println("Input should be a file");
+ return false;
+ }
+ return hdfs.delete(path,true);
+ }
+
+ public static boolean copyFromLocalFile(String srcFile, String dstFile) throws IOException{
+ Path srcPath = new Path(srcFile);
+ path = new Path(dstFile);
+ if (!hdfs.exists(path)){
+ System.out.println("Input file not found");
+ return false;
+ }
+ if (!hdfs.isFile(path)){
+ System.out.println("Input should be a file");
+ return false;
+ }
+ hdfs.copyFromLocalFile(srcPath, path);
+ return true;
+ }
+
+ public static void testReadAndWriteHDFS() throws Exception{
+ String inFileName = "testHDFS/testInput";
+ String outFileName = "testHDFS/testOutput";
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+ Path inFile = new Path(inFileName);
+ Path outFile = new Path(outFileName);
+ if (!fs.exists(inFile)){
+ System.out.println("Input file not found");
+ return;
+ }
+ if (!fs.isFile(inFile)){
+ System.out.println("Input should be a file");
+ return;
+ }
+ if (fs.exists(outFile)){
+ System.out.println("Output already exists");
+ return;
+ }
+ FSDataInputStream in = fs.open(inFile);
+ FSDataOutputStream out = fs.create(outFile);
+ byte[] buffer = new byte[1024];
+ int bytesRead = 0;
+ while ((bytesRead = in.read(buffer)) > 0) {
+ out.write(buffer, 0, bytesRead);
+ }
+ in.close();
+ out.close();
+ }
+}