Modify to SequenceFile
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2952 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-pregelix/data/webmap/reduce-out b/genomix/genomix-pregelix/data/webmap/reduce-out
new file mode 100755
index 0000000..7266be5
--- /dev/null
+++ b/genomix/genomix-pregelix/data/webmap/reduce-out
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java
new file mode 100644
index 0000000..bcf7214
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java
@@ -0,0 +1,76 @@
+package edu.uci.ics.pregelix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.binary.BinaryVertexInputFormat;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.MessageWritable;
+
+public class BinaryLoadGraphInputFormat extends
+ BinaryVertexInputFormat<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
+
+ /**
+ * Format INPUT
+ */
+ @Override
+ public VertexReader<BytesWritable, ByteWritable, NullWritable, MessageWritable> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
+ }
+
+ @SuppressWarnings("rawtypes")
+ class BinaryLoadGraphReader extends
+ BinaryVertexReader<BytesWritable, ByteWritable, NullWritable, MessageWritable> {
+ private final static String separator = " ";
+ private Vertex vertex;
+ private BytesWritable vertexId = new BytesWritable();
+ private ByteWritable vertexValue = new ByteWritable();
+
+ public BinaryLoadGraphReader(RecordReader<BytesWritable,ByteWritable> recordReader) {
+ super(recordReader);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable> getCurrentVertex() throws IOException,
+ InterruptedException {
+ if (vertex == null)
+ vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+
+
+ if(getRecordReader() != null){
+ /**
+ * set the src vertex id
+ */
+ vertexId.set(getRecordReader().getCurrentKey());
+ vertex.setVertexId(vertexId);
+
+ /**
+ * set the vertex value
+ */
+ vertexValue.set(getRecordReader().getCurrentValue().get());
+ vertex.setVertexValue(vertexValue);
+ }
+
+ return vertex;
+ }
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphOutputFormat.java
new file mode 100644
index 0000000..9b1a579
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphOutputFormat.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.pregelix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.binary.BinaryVertexOutputFormat;
+
+public class BinaryLoadGraphOutputFormat extends
+ BinaryVertexOutputFormat<BytesWritable, ByteWritable, NullWritable> {
+
+ @Override
+ public VertexWriter<BytesWritable, ByteWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<BytesWritable, ByteWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ return new BinaryLoadGraphVertexWriter(recordWriter);
+ }
+
+ /**
+ * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
+ */
+ public static class BinaryLoadGraphVertexWriter extends
+ BinaryVertexWriter<BytesWritable, ByteWritable, NullWritable> {
+ public BinaryLoadGraphVertexWriter(RecordWriter<BytesWritable, ByteWritable> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<BytesWritable, ByteWritable, NullWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
+ }
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LoadGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LoadGraphVertex.java
index 4af68fb..9f757c5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LoadGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LoadGraphVertex.java
@@ -102,8 +102,19 @@
public static void main(String[] args) throws Exception {
PregelixJob job = new PregelixJob(LoadGraphVertex.class.getSimpleName());
job.setVertexClass(LoadGraphVertex.class);
- job.setVertexInputFormatClass(TextLoadGraphInputFormat.class);
- job.setVertexOutputFormatClass(SimpleLoadGraphVertexOutputFormat.class);
+ /**
+ * TextInput and TextOutput
+ * job.setVertexInputFormatClass(TextLoadGraphInputFormat.class);
+ * job.setVertexOutputFormatClass(SimpleLoadGraphVertexOutputFormat.class);
+ */
+
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
+ job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(ByteWritable.class);
Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/ConvertToSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/ConvertToSequenceFile.java
new file mode 100644
index 0000000..e7c022d
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/ConvertToSequenceFile.java
@@ -0,0 +1,42 @@
+package edu.uci.ics.pregelix.SequenceFile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+public class ConvertToSequenceFile {
+ public static void main(String[] args) throws IOException,
+ InterruptedException, ClassNotFoundException {
+
+ Configuration conf = new Configuration();
+ Job job = new Job(conf);
+ job.setJobName("Convert Text");
+ job.setJarByClass(Mapper.class);
+
+ job.setMapperClass(Mapper.class);
+ job.setReducerClass(Reducer.class);
+
+ // increase if you need sorting or a special number of files
+ job.setNumReduceTasks(0);
+
+ job.setOutputKeyClass(LongWritable.class);
+ job.setOutputValueClass(Text.class);
+
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setInputFormatClass(TextInputFormat.class);
+
+ TextInputFormat.addInputPath(job, new Path("folder/test.dat"));
+ SequenceFileOutputFormat.setOutputPath(job, new Path("folder_seq"));
+
+ // submit and wait for completion
+ job.waitForCompletion(true);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
new file mode 100644
index 0000000..66676ba
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
@@ -0,0 +1,96 @@
+package edu.uci.ics.pregelix.SequenceFile;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.JobConf;
+
+public class GenerateSequenceFile {
+
+ static private final Path TMP_DIR = new Path(
+ GenerateSequenceFile.class.getSimpleName() + "_TMP");
+
+
+
+ public static void main(String[] argv) throws Exception {
+
+ //write output to a file
+ Configuration conf = new Configuration();
+ Path outDir = new Path(TMP_DIR, "out");
+ Path outFile = new Path(outDir, "reduce-out");
+ FileSystem fileSys = FileSystem.get(conf);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ outFile, BytesWritable.class, ByteWritable.class,
+ CompressionType.NONE);
+
+ //Generate <key,value> <BytesWritable, ByteWritable>
+ byte[] key = hexStringToByteArray("06"); //000110
+ byte[] value = hexStringToByteArray("11"); //00010001
+ System.out.println(Integer.toHexString(key[0]));
+ System.out.println(Integer.toHexString(value[0]));
+ BytesWritable keyWritable = new BytesWritable(key);
+ ByteWritable valueWritable = new ByteWritable(value[0]);
+
+ ArrayList<BytesWritable> arrayOfKeys = new ArrayList<BytesWritable>();
+ arrayOfKeys.add(keyWritable);
+ ArrayList<ByteWritable> arrayOfValues = new ArrayList<ByteWritable>();
+ arrayOfValues.add(valueWritable);
+
+ key = hexStringToByteArray("07"); //000111
+ value = hexStringToByteArray("22"); //00100010
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value[0]);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ key = hexStringToByteArray("1B"); //011010
+ value = hexStringToByteArray("44"); //01000100
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value[0]);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ key = hexStringToByteArray("2D"); //100011
+ value = hexStringToByteArray("88"); //10001000
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value[0]);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ //wirte to sequence file
+ for(int i = 0; i < arrayOfKeys.size(); i++)
+ writer.append(arrayOfKeys.get(i), arrayOfValues.get(i));
+ writer.close();
+
+ //read outputs
+ Path inFile = new Path(outDir, "reduce-out");
+ BytesWritable outKey = new BytesWritable();
+ ByteWritable outValue = new ByteWritable();
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
+ try {
+ reader.next(outKey, outValue);
+ System.out.println(outKey.getBytes());
+ System.out.println(outValue.get());
+ } finally {
+ reader.close();
+ }
+ }
+
+ public static byte[] hexStringToByteArray(String s) {
+ int len = s.length();
+ byte[] data = new byte[len / 2];
+ for (int i = 0; i < len; i += 2) {
+ data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
+ + Character.digit(s.charAt(i+1), 16));
+ }
+ return data;
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/api/io/binary/BinaryVertexInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/api/io/binary/BinaryVertexInputFormat.java
new file mode 100644
index 0000000..d3354bd
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/api/io/binary/BinaryVertexInputFormat.java
@@ -0,0 +1,107 @@
+package edu.uci.ics.pregelix.api.io.binary;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+
+public class BinaryVertexInputFormat <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+ extends VertexInputFormat<I, V, E, M>{
+
+ /** Uses the SequenceFileInputFormat to do everything */
+ protected SequenceFileInputFormat binaryInputFormat = new SequenceFileInputFormat();
+
+ /**
+ * Abstract class to be implemented by the user based on their specific
+ * vertex input. Easiest to ignore the key value separator and only use key
+ * instead.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+ public static abstract class BinaryVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+ implements VertexReader<I, V, E, M> {
+ /** Internal line record reader */
+ private final RecordReader<BytesWritable,ByteWritable> lineRecordReader;
+ /** Context passed to initialize */
+ private TaskAttemptContext context;
+
+ /**
+ * Initialize with the LineRecordReader.
+ *
+ * @param recordReader
+ * Line record reader from SequenceFileInputFormat
+ */
+ public BinaryVertexReader(RecordReader<BytesWritable, ByteWritable> recordReader) {
+ this.lineRecordReader = recordReader;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ lineRecordReader.initialize(inputSplit, context);
+ this.context = context;
+ }
+
+ @Override
+ public void close() throws IOException {
+ lineRecordReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return lineRecordReader.getProgress();
+ }
+
+ /**
+ * Get the line record reader.
+ *
+ * @return Record reader to be used for reading.
+ */
+ protected RecordReader<BytesWritable,ByteWritable> getRecordReader() {
+ return lineRecordReader;
+ }
+
+ /**
+ * Get the context.
+ *
+ * @return Context passed to initialize.
+ */
+ protected TaskAttemptContext getContext() {
+ return context;
+ }
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+ // Ignore the hint of numWorkers here since we are using SequenceFileInputFormat
+ // to do this for us
+ return binaryInputFormat.getSplits(context);
+ }
+
+ @Override
+ public VertexReader<I, V, E, M> createVertexReader(InputSplit split,
+ TaskAttemptContext context) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+
+
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/api/io/binary/BinaryVertexOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/api/io/binary/BinaryVertexOutputFormat.java
new file mode 100644
index 0000000..399cbca
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/api/io/binary/BinaryVertexOutputFormat.java
@@ -0,0 +1,102 @@
+package edu.uci.ics.pregelix.api.io.binary;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+/**
+ * Abstract class that users should subclass to use their own text based vertex
+ * output format.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class BinaryVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable>
+ extends VertexOutputFormat<I, V, E> {
+ /** Uses the SequenceFileOutputFormat to do everything */
+ protected SequenceFileOutputFormat binaryOutputFormat = new SequenceFileOutputFormat();
+
+ /**
+ * Abstract class to be implemented by the user based on their specific
+ * vertex output. Easiest to ignore the key value separator and only use key
+ * instead.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+ public static abstract class BinaryVertexWriter<I extends WritableComparable, V extends Writable, E extends Writable>
+ implements VertexWriter<I, V, E> {
+ /** Context passed to initialize */
+ private TaskAttemptContext context;
+ /** Internal line record writer */
+ private final RecordWriter<BytesWritable, ByteWritable> lineRecordWriter;
+
+ /**
+ * Initialize with the LineRecordWriter.
+ *
+ * @param lineRecordWriter
+ * Line record writer from SequenceFileOutputFormat
+ */
+ public BinaryVertexWriter(RecordWriter<BytesWritable, ByteWritable> lineRecordWriter) {
+ this.lineRecordWriter = lineRecordWriter;
+ }
+
+ @Override
+ public void initialize(TaskAttemptContext context) throws IOException {
+ this.context = context;
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ lineRecordWriter.close(context);
+ }
+
+ /**
+ * Get the line record writer.
+ *
+ * @return Record writer to be used for writing.
+ */
+ public RecordWriter<BytesWritable, ByteWritable> getRecordWriter() {
+ return lineRecordWriter;
+ }
+
+ /**
+ * Get the context.
+ *
+ * @return Context passed to initialize.
+ */
+ public TaskAttemptContext getContext() {
+ return context;
+ }
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+ binaryOutputFormat.checkOutputSpecs(context);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+ return binaryOutputFormat.getOutputCommitter(context);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
index 2af1688..0dff613 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
@@ -41,7 +41,6 @@
*/
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
-
genLoadGraph();
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java
index 53c2d48..156c910 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java
@@ -40,7 +40,7 @@
private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
private static final String FILE_EXTENSION_OF_RESULTS = "result";
- private static final String DATA_PATH = "data/webmap/test.dat";
+ private static final String DATA_PATH = "data/webmap/test.dat";//test.dat
private static final String HDFS_PATH = "/webmap/";
private static final String HYRACKS_APP_NAME = "pregelix";