add missing files of the maximal clique example
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2692 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java
new file mode 100644
index 0000000..b07017a
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/AdjacencyListWritable.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.maximalclique;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * The adjacency list is the value part of the mapper output
+ */
+public class AdjacencyListWritable implements Writable {
+
+ private VLongWritable sourceVertex = new VLongWritable();
+ private Set<VLongWritable> destinationVertexes = new TreeSet<VLongWritable>();
+
+ public AdjacencyListWritable() {
+ }
+
+ public void reset() {
+ this.destinationVertexes.clear();
+ }
+
+ public void setSource(VLongWritable source) {
+ this.sourceVertex = source;
+ }
+
+ public void addNeighbor(VLongWritable neighbor) {
+ destinationVertexes.add(neighbor);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ sourceVertex = new VLongWritable();
+ destinationVertexes.clear();
+ sourceVertex.readFields(input);
+ int numberOfNeighbors = input.readInt();
+ for (int i = 0; i < numberOfNeighbors; i++) {
+ VLongWritable neighbor = new VLongWritable();
+ neighbor.readFields(input);
+ destinationVertexes.add(neighbor);
+ }
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ sourceVertex.write(output);
+ output.writeInt(destinationVertexes.size());
+ for (VLongWritable dest : destinationVertexes) {
+ dest.write(output);
+ }
+ }
+
+ public AdjacencyListWritable clone() {
+ AdjacencyListWritable clone = new AdjacencyListWritable();
+ clone.setSource(sourceVertex);
+ for (VLongWritable dest : destinationVertexes) {
+ clone.addNeighbor(dest);
+ }
+ return clone;
+ }
+
+ public int numberOfNeighbors() {
+ return destinationVertexes.size();
+ }
+
+ public void removeNeighbor(VLongWritable v) {
+ destinationVertexes.remove(v);
+ }
+
+ public VLongWritable getSource() {
+ return sourceVertex;
+ }
+
+ public Iterator<VLongWritable> getNeighbors() {
+ return destinationVertexes.iterator();
+ }
+
+ public void cleanNonMatch(Collection<VLongWritable> matches) {
+ destinationVertexes.retainAll(matches);
+ }
+
+ public boolean isNeighbor(VLongWritable v) {
+ return destinationVertexes.contains(v);
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/CliquesWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/CliquesWritable.java
new file mode 100644
index 0000000..940fe2b
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/CliquesWritable.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.maximalclique;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class CliquesWritable implements Writable {
+
+ private List<VLongWritable> cliques = new ArrayList<VLongWritable>();
+ private int sizeOfClique = 0;
+
+ public CliquesWritable(List<VLongWritable> cliques, int sizeOfClique) {
+ this.cliques = cliques;
+ this.sizeOfClique = sizeOfClique;
+ }
+
+ public CliquesWritable() {
+
+ }
+
+ public void setClusterSize(int sizeOfClique) {
+ this.sizeOfClique = sizeOfClique;
+ }
+
+ public void setCliques(List<VLongWritable> cliques) {
+ this.cliques = cliques;
+ }
+
+ public int getSizeOfClique() {
+ return sizeOfClique;
+ }
+
+ public void reset() {
+ this.cliques.clear();
+ this.sizeOfClique = 0;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ cliques.clear();
+ int numCliques = input.readInt();
+ if (numCliques < 0) {
+ sizeOfClique = 0;
+ return;
+ }
+ sizeOfClique = input.readInt();
+ for (int i = 0; i < numCliques; i++) {
+ for (int j = 0; j < sizeOfClique; j++) {
+ VLongWritable vid = new VLongWritable();
+ vid.readFields(input);
+ cliques.add(vid);
+ }
+ }
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ if (sizeOfClique <= 0) {
+ output.writeInt(-1);
+ return;
+ }
+ output.writeInt(cliques.size() / sizeOfClique);
+ output.writeInt(sizeOfClique);
+
+ for (int i = 0; i < cliques.size(); i++) {
+ cliques.get(i).write(output);
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (sizeOfClique == 0)
+ return "";
+ StringBuffer sb = new StringBuffer();
+ int numCliques = cliques.size() / sizeOfClique;
+ for (int i = 0; i < numCliques; i++) {
+ for (int j = 0; j < sizeOfClique - 1; j++) {
+ sb.append(cliques.get(j));
+ sb.append(",");
+ }
+ sb.append(cliques.get(sizeOfClique - 1));
+ sb.append(";");
+ }
+ return sb.toString();
+ }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java
new file mode 100644
index 0000000..c128a92
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.maximalclique;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class MaximalCliqueVertex extends Vertex<VLongWritable, CliquesWritable, NullWritable, AdjacencyListWritable> {
+
+ private Map<VLongWritable, AdjacencyListWritable> map = new TreeMap<VLongWritable, AdjacencyListWritable>();
+ private List<VLongWritable> vertexList = new ArrayList<VLongWritable>();
+ private Map<VLongWritable, Integer> invertedMap = new TreeMap<VLongWritable, Integer>();
+ private int largestCliqueSizeSoFar = 0;
+ private List<BitSet> currentMaximalCliques = new ArrayList<BitSet>();
+ private CliquesWritable tmpValue = new CliquesWritable();
+ private List<VLongWritable> cliques = new ArrayList<VLongWritable>();
+
+ private void updateCurrentMaximalCliques(Iterator<AdjacencyListWritable> values) {
+ map.clear();
+ vertexList.clear();
+ invertedMap.clear();
+ currentMaximalCliques.clear();
+ cliques.clear();
+
+ // build the initial sub graph
+ while (values.hasNext()) {
+ AdjacencyListWritable adj = values.next();
+ map.put(adj.getSource(), adj);
+ }
+ VLongWritable srcId = getVertexId();
+ map.put(srcId, new AdjacencyListWritable());
+
+ // build the vertex list (vertex id in ascending order) and the inverted list of vertexes
+ int i = 0;
+ for (VLongWritable v : map.keySet()) {
+ vertexList.add(v);
+ invertedMap.put(v, i++);
+ }
+
+ //clean up adjacency list --- remove vertexes who are not neighbors of key
+ for (AdjacencyListWritable adj : map.values()) {
+ adj.cleanNonMatch(vertexList);
+ }
+
+ // get the h-index of the subgraph --- which is the maximum depth to explore
+ int[] neighborCounts = new int[map.size()];
+ i = 0;
+ for (AdjacencyListWritable adj : map.values()) {
+ neighborCounts[i++] = adj.numberOfNeighbors();
+ }
+ Arrays.sort(neighborCounts);
+ int h = 0;
+ for (i = neighborCounts.length - 1; i >= 0; i--) {
+ if (h >= neighborCounts[i]) {
+ break;
+ }
+ h++;
+ }
+ if (h < largestCliqueSizeSoFar) {
+ return;
+ }
+
+ //start depth-first search
+ BitSet cliqueSoFar = new BitSet(h);
+ for (VLongWritable v : vertexList) {
+ cliqueSoFar.set(invertedMap.get(v));
+ searchClique(h, cliqueSoFar, 1, v);
+ cliqueSoFar.clear();
+ }
+
+ //output local maximal cliques
+ for (BitSet clique : currentMaximalCliques) {
+ int keyIndex = invertedMap.get(srcId);
+ clique.set(keyIndex);
+ generateClique(clique);
+ tmpValue.setCliques(cliques);
+ tmpValue.setClusterSize(clique.cardinality());
+ }
+
+ //update the vertex state
+ setVertexValue(tmpValue);
+ }
+
+ // output a clique with the bitmap representation
+ private void generateClique(BitSet clique) {
+ for (int j = 0; j < clique.length();) {
+ j = clique.nextSetBit(j);
+ VLongWritable v = vertexList.get(j);
+ cliques.add(v);
+ j++;
+ }
+ }
+
+ private void searchClique(int maxDepth, BitSet cliqueSoFar, int depthSoFar, VLongWritable currentSource) {
+ if (depthSoFar > maxDepth) {
+ // update maximal clique info
+ updateMaximalClique(cliqueSoFar);
+ return;
+ }
+
+ AdjacencyListWritable adj = map.get(currentSource);
+ Iterator<VLongWritable> neighbors = adj.getNeighbors();
+ ++depthSoFar;
+ while (neighbors.hasNext()) {
+ VLongWritable neighbor = neighbors.next();
+ if (!isTested(neighbor, cliqueSoFar) && isClique(neighbor, cliqueSoFar)) {
+ //snapshot the clique
+ int cliqueLength = cliqueSoFar.length();
+ // expand the clique
+ cliqueSoFar.set(invertedMap.get(neighbor));
+ searchClique(maxDepth, cliqueSoFar, depthSoFar, neighbor);
+ // back to the snapshot clique
+ cliqueSoFar.set(cliqueLength, cliqueSoFar.length(), false);
+ }
+ }
+
+ // update maximal clique info
+ updateMaximalClique(cliqueSoFar);
+ }
+
+ // update the maximal clique to a larger one if it exists
+ private void updateMaximalClique(BitSet cliqueSoFar) {
+ int cliqueSize = cliqueSoFar.cardinality();
+ if (cliqueSize > largestCliqueSizeSoFar) {
+ currentMaximalCliques.clear();
+ currentMaximalCliques.add((BitSet) cliqueSoFar.clone());
+ largestCliqueSizeSoFar = cliqueSize;
+ } else if (cliqueSize == largestCliqueSizeSoFar) {
+ currentMaximalCliques.add((BitSet) cliqueSoFar.clone());
+ } else {
+ return;
+ }
+ }
+
+ //should we test the vertex newVertex?
+ private boolean isTested(VLongWritable newVertex, BitSet cliqueSoFar) {
+ int index = invertedMap.get(newVertex);
+ int largestSetIndex = cliqueSoFar.length() - 1;
+ if (index > largestSetIndex) {
+ // we only return cliques with vertexes in the ascending order
+ // hence, the new vertex must be larger than the largesetSetIndex in the clique
+ return false;
+ } else {
+ // otherwise, we think the vertex is "tested"
+ return true;
+ }
+ }
+
+ // will adding the newVertex yield a bigger clique?
+ private boolean isClique(VLongWritable newVertex, BitSet cliqueSoFar) {
+ AdjacencyListWritable adj = map.get(newVertex);
+ // check whether each existing vertex is in the neighbor set of newVertex
+ for (int i = 0; i < cliqueSoFar.length();) {
+ i = cliqueSoFar.nextSetBit(i);
+ VLongWritable v = vertexList.get(i);
+ if (!adj.isNeighbor(v)) {
+ return false;
+ }
+ i++;
+ }
+ return true;
+ }
+
+ @Override
+ public void compute(Iterator<AdjacencyListWritable> msgIterator) {
+ if (getSuperstep() == 1) {
+ sortEdges();
+ sendMsg(getEdges());
+ } else if (getSuperstep() == 2) {
+ updateCurrentMaximalCliques(msgIterator);
+ } else {
+ voteToHalt();
+ }
+ }
+
+ private void sendMsg(List<Edge<VLongWritable, NullWritable>> edges) {
+ for (int i = 0; i < edges.size(); i++) {
+ if (edges.get(i).getDestVertexId().get() < getVertexId().get()) {
+ // only add emit for the vertexes whose id is smaller than adjListLong[0]
+ // to avoid the duplicate removal step,
+ // because all the resulting cliques will have vertexes in the ascending order.
+ getVertexValue().reset();
+ AdjacencyListWritable msg = new AdjacencyListWritable();
+ msg.setSource(getVertexId());
+ for (int j = i + 1; j < edges.size(); j++) {
+ msg.addNeighbor(edges.get(j).getDestVertexId());
+ }
+ sendMsg(edges.get(i).getDestVertexId(), msg);
+ }
+ }
+ }
+
+ /**
+ * Maximal Clique VertexWriter
+ */
+ public static class MaximalCliqueVertexWriter extends
+ TextVertexWriter<VLongWritable, CliquesWritable, NullWritable> {
+ public MaximalCliqueVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<VLongWritable, CliquesWritable, NullWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+ new Text(vertex.getVertexValue().toString()));
+ }
+ }
+
+ /**
+ * output format for maximal clique
+ */
+ public static class MaximalCliqueVertexOutputFormat extends
+ TextVertexOutputFormat<VLongWritable, CliquesWritable, NullWritable> {
+
+ @Override
+ public VertexWriter<VLongWritable, CliquesWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+ return new MaximalCliqueVertexWriter(recordWriter);
+ }
+
+ }
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/TextMaximalCliqueInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/TextMaximalCliqueInputFormat.java
new file mode 100644
index 0000000..ec7b32c
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/TextMaximalCliqueInputFormat.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.maximalclique;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class TextMaximalCliqueInputFormat extends
+ TextVertexInputFormat<VLongWritable, CliquesWritable, NullWritable, AdjacencyListWritable> {
+
+ @Override
+ public VertexReader<VLongWritable, CliquesWritable, NullWritable, AdjacencyListWritable> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new TextMaximalCliqueGraphReader(textInputFormat.createRecordReader(split, context));
+ }
+}
+
+@SuppressWarnings("rawtypes")
+class TextMaximalCliqueGraphReader extends
+ TextVertexReader<VLongWritable, CliquesWritable, NullWritable, AdjacencyListWritable> {
+
+ private final static String separator = " ";
+ private Vertex vertex;
+ private VLongWritable vertexId = new VLongWritable();
+ private List<VLongWritable> pool = new ArrayList<VLongWritable>();
+ private int used = 0;
+
+ public TextMaximalCliqueGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+ super(lineRecordReader);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Vertex<VLongWritable, CliquesWritable, NullWritable, AdjacencyListWritable> getCurrentVertex()
+ throws IOException, InterruptedException {
+ used = 0;
+ if (vertex == null)
+ vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+
+ vertex.reset();
+ Text line = getRecordReader().getCurrentValue();
+ String[] fields = line.toString().split(separator);
+
+ if (fields.length > 0) {
+ /**
+ * set the src vertex id
+ */
+ long src = Long.parseLong(fields[0]);
+ vertexId.set(src);
+ vertex.setVertexId(vertexId);
+ long dest = -1L;
+
+ /**
+ * set up edges
+ */
+ for (int i = 1; i < fields.length; i++) {
+ dest = Long.parseLong(fields[i]);
+ VLongWritable destId = allocate();
+ destId.set(dest);
+ vertex.addEdge(destId, null);
+ }
+ }
+ return vertex;
+ }
+
+ private VLongWritable allocate() {
+ if (used >= pool.size()) {
+ VLongWritable value = new VLongWritable();
+ pool.add(value);
+ used++;
+ return value;
+ } else {
+ VLongWritable value = pool.get(used);
+ used++;
+ return value;
+ }
+ }
+}