Modify to SequenceFile

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2952 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-pregelix/data/webmap/reduce-out b/genomix/genomix-pregelix/data/webmap/reduce-out
new file mode 100755
index 0000000..7266be5
--- /dev/null
+++ b/genomix/genomix-pregelix/data/webmap/reduce-out
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java
new file mode 100644
index 0000000..bcf7214
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java
@@ -0,0 +1,76 @@
+package edu.uci.ics.pregelix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.binary.BinaryVertexInputFormat;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.MessageWritable;
+
+public class BinaryLoadGraphInputFormat extends
+	BinaryVertexInputFormat<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
+
+	/**
+	 * Format INPUT
+	 */
+    @Override
+    public VertexReader<BytesWritable, ByteWritable, NullWritable, MessageWritable> createVertexReader(
+            InputSplit split, TaskAttemptContext context) throws IOException {
+        return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
+    }
+    
+    @SuppressWarnings("rawtypes")
+    class BinaryLoadGraphReader extends
+            BinaryVertexReader<BytesWritable, ByteWritable, NullWritable, MessageWritable> {
+        private final static String separator = " ";
+        private Vertex vertex;
+        private BytesWritable vertexId = new BytesWritable();
+        private ByteWritable vertexValue = new ByteWritable();
+
+        public BinaryLoadGraphReader(RecordReader<BytesWritable,ByteWritable> recordReader) {
+            super(recordReader);
+        }
+
+        @Override
+        public boolean nextVertex() throws IOException, InterruptedException {
+            return getRecordReader().nextKeyValue();
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable> getCurrentVertex() throws IOException,
+                InterruptedException {
+            if (vertex == null)
+                vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+
+            vertex.getMsgList().clear();
+            vertex.getEdges().clear();
+            
+            
+            if(getRecordReader() != null){
+	            /**
+	             * set the src vertex id
+	             */
+	            vertexId.set(getRecordReader().getCurrentKey());
+	            vertex.setVertexId(vertexId);
+	            
+	            /**
+	             * set the vertex value
+	             */
+	            vertexValue.set(getRecordReader().getCurrentValue().get()); 
+	            vertex.setVertexValue(vertexValue);
+            }
+            
+            return vertex;
+        }
+    }
+	
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphOutputFormat.java
new file mode 100644
index 0000000..9b1a579
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphOutputFormat.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.pregelix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.binary.BinaryVertexOutputFormat;
+
+public class BinaryLoadGraphOutputFormat extends 
+	BinaryVertexOutputFormat<BytesWritable, ByteWritable, NullWritable> {
+
+        @Override
+        public VertexWriter<BytesWritable, ByteWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            RecordWriter<BytesWritable, ByteWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+            return new BinaryLoadGraphVertexWriter(recordWriter);
+        }
+        
+        /**
+         * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
+         */
+        public static class BinaryLoadGraphVertexWriter extends
+                BinaryVertexWriter<BytesWritable, ByteWritable, NullWritable> {
+            public BinaryLoadGraphVertexWriter(RecordWriter<BytesWritable, ByteWritable> lineRecordWriter) {
+                super(lineRecordWriter);
+            }
+
+            @Override
+            public void writeVertex(Vertex<BytesWritable, ByteWritable, NullWritable, ?> vertex) throws IOException,
+                    InterruptedException {
+                getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
+            }
+        }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LoadGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LoadGraphVertex.java
index 4af68fb..9f757c5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LoadGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LoadGraphVertex.java
@@ -102,8 +102,19 @@
 	public static void main(String[] args) throws Exception {
         PregelixJob job = new PregelixJob(LoadGraphVertex.class.getSimpleName());
         job.setVertexClass(LoadGraphVertex.class);
-        job.setVertexInputFormatClass(TextLoadGraphInputFormat.class);
-        job.setVertexOutputFormatClass(SimpleLoadGraphVertexOutputFormat.class);
+        /**
+         * TextInput and TextOutput
+         * job.setVertexInputFormatClass(TextLoadGraphInputFormat.class);
+         * job.setVertexOutputFormatClass(SimpleLoadGraphVertexOutputFormat.class); 
+         */
+        
+        /**
+         * BinaryInput and BinaryOutput
+         */
+        job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class); 
+        job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class); 
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(ByteWritable.class);
         Client.run(args, job);
 	}
 }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/ConvertToSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/ConvertToSequenceFile.java
new file mode 100644
index 0000000..e7c022d
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/ConvertToSequenceFile.java
@@ -0,0 +1,42 @@
+package edu.uci.ics.pregelix.SequenceFile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+public class ConvertToSequenceFile {
+	public static void main(String[] args) throws IOException,
+    InterruptedException, ClassNotFoundException {
+
+		Configuration conf = new Configuration();
+		Job job = new Job(conf);
+		job.setJobName("Convert Text");
+		job.setJarByClass(Mapper.class);
+		
+		job.setMapperClass(Mapper.class);
+		job.setReducerClass(Reducer.class);
+		
+		// increase if you need sorting or a special number of files
+		job.setNumReduceTasks(0);
+		
+		job.setOutputKeyClass(LongWritable.class);
+		job.setOutputValueClass(Text.class);
+		
+		job.setOutputFormatClass(SequenceFileOutputFormat.class);
+		job.setInputFormatClass(TextInputFormat.class);
+		
+		TextInputFormat.addInputPath(job, new Path("folder/test.dat"));
+		SequenceFileOutputFormat.setOutputPath(job, new Path("folder_seq"));
+		
+		// submit and wait for completion
+		job.waitForCompletion(true);
+	}
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
new file mode 100644
index 0000000..66676ba
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
@@ -0,0 +1,96 @@
+package edu.uci.ics.pregelix.SequenceFile;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.JobConf;
+
+public class GenerateSequenceFile {
+	
+	static private final Path TMP_DIR = new Path(
+			GenerateSequenceFile.class.getSimpleName() + "_TMP");
+	
+	
+	
+	 public static void main(String[] argv) throws Exception {
+		 
+	     //write output to a file
+		 Configuration conf = new Configuration();
+	     Path outDir = new Path(TMP_DIR, "out");
+	     Path outFile = new Path(outDir, "reduce-out");
+	     FileSystem fileSys = FileSystem.get(conf);
+	     SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+	         outFile, BytesWritable.class, ByteWritable.class, 
+	         CompressionType.NONE);
+	     
+		 //Generate <key,value>  <BytesWritable, ByteWritable>
+		 byte[] key = hexStringToByteArray("06"); //000110
+		 byte[] value = hexStringToByteArray("11"); //00010001
+		 System.out.println(Integer.toHexString(key[0]));
+		 System.out.println(Integer.toHexString(value[0]));
+		 BytesWritable keyWritable = new BytesWritable(key);
+		 ByteWritable valueWritable = new ByteWritable(value[0]);
+	     
+	     ArrayList<BytesWritable> arrayOfKeys = new ArrayList<BytesWritable>();
+	     arrayOfKeys.add(keyWritable);
+	     ArrayList<ByteWritable> arrayOfValues = new ArrayList<ByteWritable>();
+	     arrayOfValues.add(valueWritable);
+	     
+	     key = hexStringToByteArray("07"); //000111
+	     value = hexStringToByteArray("22"); //00100010
+	     keyWritable = new BytesWritable(key);
+	     valueWritable = new ByteWritable(value[0]);
+	     arrayOfKeys.add(keyWritable);
+	     arrayOfValues.add(valueWritable);
+	     
+	     key = hexStringToByteArray("1B"); //011010
+	     value = hexStringToByteArray("44"); //01000100
+	     keyWritable = new BytesWritable(key);
+	     valueWritable = new ByteWritable(value[0]);
+	     arrayOfKeys.add(keyWritable);
+	     arrayOfValues.add(valueWritable);
+	     
+	     key = hexStringToByteArray("2D"); //100011
+	     value = hexStringToByteArray("88"); //10001000
+	     keyWritable = new BytesWritable(key);
+	     valueWritable = new ByteWritable(value[0]);
+	     arrayOfKeys.add(keyWritable);
+	     arrayOfValues.add(valueWritable);
+	     
+	     //wirte to sequence file
+	     for(int i = 0; i < arrayOfKeys.size(); i++)
+	    	 writer.append(arrayOfKeys.get(i), arrayOfValues.get(i));
+	     writer.close();
+	      
+	     //read outputs
+	     Path inFile = new Path(outDir, "reduce-out");
+	     BytesWritable outKey = new BytesWritable();
+	     ByteWritable outValue = new ByteWritable();
+	     SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
+	     try {
+	       reader.next(outKey, outValue);
+		     System.out.println(outKey.getBytes());
+		     System.out.println(outValue.get());
+	     } finally {
+	       reader.close();
+	     }
+	 }
+	 
+	 public static byte[] hexStringToByteArray(String s) {
+		    int len = s.length();
+		    byte[] data = new byte[len / 2];
+		    for (int i = 0; i < len; i += 2) {
+		        data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
+		                             + Character.digit(s.charAt(i+1), 16));
+		    }
+		    return data;
+		}
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/api/io/binary/BinaryVertexInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/api/io/binary/BinaryVertexInputFormat.java
new file mode 100644
index 0000000..d3354bd
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/api/io/binary/BinaryVertexInputFormat.java
@@ -0,0 +1,107 @@
+package edu.uci.ics.pregelix.api.io.binary;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+
+public class BinaryVertexInputFormat <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+	extends VertexInputFormat<I, V, E, M>{
+	
+    /** Uses the SequenceFileInputFormat to do everything */
+	protected SequenceFileInputFormat binaryInputFormat = new SequenceFileInputFormat();
+    
+    /**
+     * Abstract class to be implemented by the user based on their specific
+     * vertex input. Easiest to ignore the key value separator and only use key
+     * instead.
+     * 
+     * @param <I>
+     *            Vertex index value
+     * @param <V>
+     *            Vertex value
+     * @param <E>
+     *            Edge value
+     */
+    public static abstract class BinaryVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+            implements VertexReader<I, V, E, M> {
+        /** Internal line record reader */
+        private final RecordReader<BytesWritable,ByteWritable> lineRecordReader;
+        /** Context passed to initialize */
+        private TaskAttemptContext context;
+
+        /**
+         * Initialize with the LineRecordReader.
+         * 
+         * @param recordReader
+         *            Line record reader from SequenceFileInputFormat
+         */
+        public BinaryVertexReader(RecordReader<BytesWritable, ByteWritable> recordReader) {
+            this.lineRecordReader = recordReader;
+        }
+
+        @Override
+        public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
+                InterruptedException {
+            lineRecordReader.initialize(inputSplit, context);
+            this.context = context;
+        }
+
+        @Override
+        public void close() throws IOException {
+            lineRecordReader.close();
+        }
+
+        @Override
+        public float getProgress() throws IOException, InterruptedException {
+            return lineRecordReader.getProgress();
+        }
+
+        /**
+         * Get the line record reader.
+         * 
+         * @return Record reader to be used for reading.
+         */
+        protected RecordReader<BytesWritable,ByteWritable> getRecordReader() {
+            return lineRecordReader;
+        }
+
+        /**
+         * Get the context.
+         * 
+         * @return Context passed to initialize.
+         */
+        protected TaskAttemptContext getContext() {
+            return context;
+        }
+    }
+
+    @Override
+    public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+        // Ignore the hint of numWorkers here since we are using SequenceFileInputFormat
+        // to do this for us
+        return binaryInputFormat.getSplits(context);
+    }
+
+	@Override
+	public VertexReader<I, V, E, M> createVertexReader(InputSplit split,
+			TaskAttemptContext context) throws IOException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+
+
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/api/io/binary/BinaryVertexOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/api/io/binary/BinaryVertexOutputFormat.java
new file mode 100644
index 0000000..399cbca
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/api/io/binary/BinaryVertexOutputFormat.java
@@ -0,0 +1,102 @@
+package edu.uci.ics.pregelix.api.io.binary;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+/**
+ * Abstract class that users should subclass to use their own text based vertex
+ * output format.
+ * 
+ * @param <I>
+ *            Vertex index value
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class BinaryVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable>
+        extends VertexOutputFormat<I, V, E> {
+    /** Uses the SequenceFileOutputFormat to do everything */
+	protected SequenceFileOutputFormat binaryOutputFormat = new SequenceFileOutputFormat();
+
+    /**
+     * Abstract class to be implemented by the user based on their specific
+     * vertex output. Easiest to ignore the key value separator and only use key
+     * instead.
+     * 
+     * @param <I>
+     *            Vertex index value
+     * @param <V>
+     *            Vertex value
+     * @param <E>
+     *            Edge value
+     */
+    public static abstract class BinaryVertexWriter<I extends WritableComparable, V extends Writable, E extends Writable>
+            implements VertexWriter<I, V, E> {
+        /** Context passed to initialize */
+        private TaskAttemptContext context;
+        /** Internal line record writer */
+        private final RecordWriter<BytesWritable, ByteWritable> lineRecordWriter;
+
+        /**
+         * Initialize with the LineRecordWriter.
+         * 
+         * @param lineRecordWriter
+         *            Line record writer from SequenceFileOutputFormat
+         */
+        public BinaryVertexWriter(RecordWriter<BytesWritable, ByteWritable> lineRecordWriter) {
+            this.lineRecordWriter = lineRecordWriter;
+        }
+
+        @Override
+        public void initialize(TaskAttemptContext context) throws IOException {
+            this.context = context;
+        }
+
+        @Override
+        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+            lineRecordWriter.close(context);
+        }
+
+        /**
+         * Get the line record writer.
+         * 
+         * @return Record writer to be used for writing.
+         */
+        public RecordWriter<BytesWritable, ByteWritable> getRecordWriter() {
+            return lineRecordWriter;
+        }
+
+        /**
+         * Get the context.
+         * 
+         * @return Context passed to initialize.
+         */
+        public TaskAttemptContext getContext() {
+            return context;
+        }
+    }
+
+    @Override
+    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+    	binaryOutputFormat.checkOutputSpecs(context);
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+        return binaryOutputFormat.getOutputCommitter(context);
+    }
+}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
index 2af1688..0dff613 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
@@ -41,7 +41,6 @@
 	 */
 	public static void main(String[] args) throws IOException {
 		// TODO Auto-generated method stub
-
 		genLoadGraph();
 	}
 
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java
index 53c2d48..156c910 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java
@@ -40,7 +40,7 @@
 	private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
 	private static final String FILE_EXTENSION_OF_RESULTS = "result";
 
-	private static final String DATA_PATH = "data/webmap/test.dat";
+	private static final String DATA_PATH = "data/webmap/test.dat";//test.dat
 	private static final String HDFS_PATH = "/webmap/";
 	
 	private static final String HYRACKS_APP_NAME = "pregelix";