rename a few classes
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2075 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
similarity index 62%
rename from pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexCombiner.java
rename to pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
index 99f3f56..fd540fd 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
@@ -20,6 +20,8 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
/**
* interface to implement for combining of messages sent to the same vertex.
*
@@ -27,26 +29,40 @@
* @param <M extends Writable> message data
*/
@SuppressWarnings("rawtypes")
-public interface VertexCombiner<I extends WritableComparable, M extends Writable> {
+public abstract class MessageCombiner<I extends WritableComparable, M extends Writable, P extends Writable> {
/**
* initialize combiner
*/
- public void init();
+ public abstract void init(MsgList providedMsgList);
/**
- * step call
+ * step call for local combiner
*
* @param vertexIndex
* @param msg
* @throws IOException
*/
- public void step(I vertexIndex, M msg) throws IOException;
+ public abstract void step(I vertexIndex, M msg) throws HyracksDataException;
/**
- * finish aggregate
+ * step call for global combiner
+ *
+ * @param vertexIndex
+ * @param msg
+ * @throws IOException
+ */
+ public abstract void step(P partialAggregate) throws HyracksDataException;
+
+ /**
+ * finish partial combiner
+ */
+ public abstract P finishPartial();
+
+ /**
+ * finish final combiner
*
* @return Message
*/
- public M finish();
+ public abstract MsgList<M> finishFinal();
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
index fcff837..734b1af 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
@@ -32,7 +32,7 @@
private static final long serialVersionUID = 100L;
/**
- * Default constructor.
+ * Default constructor.s
*/
public MsgList() {
super();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 73b434a..e24439f 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -21,11 +21,10 @@
import org.apache.hadoop.mapreduce.Job;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.graph.VertexCombiner;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
-import edu.uci.ics.pregelix.api.util.GlobalCountAggregator;
/**
* This class represents a Pregelix job.
@@ -38,7 +37,7 @@
/** VertexOutputFormat class - optional */
public static final String VERTEX_OUTPUT_FORMAT_CLASS = "pregelix.vertexOutputFormatClass";
/** Vertex combiner class - optional */
- public static final String VERTEX_COMBINER_CLASS = "pregelix.combinerClass";
+ public static final String Message_COMBINER_CLASS = "pregelix.combinerClass";
/** Global aggregator class - optional */
public static final String GLOBAL_AGGREGATOR_CLASS = "pregelix.aggregatorClass";
/** Vertex resolver class - optional */
@@ -51,6 +50,8 @@
public static final String EDGE_VALUE_CLASS = "pregelix.edgeValueClass";
/** Message value class */
public static final String MESSAGE_VALUE_CLASS = "pregelix.messageValueClass";
+ /** Partial combiner value class */
+ public static final String PARTIAL_COMBINE_VALUE_CLASS = "pregelix.partialCombinedValueClass";
/** Partial aggregate value class */
public static final String PARTIAL_AGGREGATE_VALUE_CLASS = "pregelix.partialAggregateValueClass";
/** Final aggregate value class */
@@ -69,7 +70,6 @@
*/
public PregelixJob(String jobName) throws IOException {
super(new Configuration(), jobName);
- getConfiguration().setClass(GLOBAL_AGGREGATOR_CLASS, GlobalCountAggregator.class, GlobalAggregator.class);
}
/**
@@ -122,7 +122,7 @@
* Determines how vertex messages are combined
*/
final public void setVertexCombinerClass(Class<?> vertexCombinerClass) {
- getConfiguration().setClass(VERTEX_COMBINER_CLASS, vertexCombinerClass, VertexCombiner.class);
+ getConfiguration().setClass(Message_COMBINER_CLASS, vertexCombinerClass, MessageCombiner.class);
}
/**
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
index 0e59c8d9..265fd3e 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
@@ -16,6 +16,7 @@
package edu.uci.ics.pregelix.api.util;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
@@ -27,6 +28,8 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+
/**
* A Writable for ListArray containing instances of a class.
*/
@@ -41,6 +44,14 @@
private List<M> pool = new ArrayList<M>();
/** how many instance in the pool has been used */
private int used = 0;
+ /** intermediate buffer for copy data element */
+ private final ArrayBackedValueStorage intermediateBuffer = new ArrayBackedValueStorage();
+ /** intermediate data output */
+ private final DataOutput intermediateOutput = intermediateBuffer.getDataOutput();
+ /** input stream */
+ private final ResetableByteArrayInputStream inputStream = new ResetableByteArrayInputStream();
+ /** data input */
+ private final DataInput dataInput = new DataInputStream(inputStream);
/**
* Using the default constructor requires that the user implement
@@ -51,6 +62,49 @@
}
/**
+ * clear all the elements
+ */
+ public void clearElements() {
+ this.used = 0;
+ this.clear();
+ }
+
+ /**
+ * Add all elements from another list
+ *
+ * @param list
+ * the list of M
+ * @return true if successful, else false
+ */
+ public boolean addAllElements(List<M> list) {
+ for (int i = 0; i < list.size(); i++) {
+ addElement(list.get(i));
+ }
+ return true;
+ }
+
+ /**
+ * Add an element
+ *
+ * @param element
+ * M
+ * @return true if successful, else false
+ */
+ public boolean addElement(M element) {
+ try {
+ intermediateBuffer.reset();
+ element.write(intermediateOutput);
+ inputStream.setByteArray(intermediateBuffer.getByteArray(), 0);
+ M value = allocateValue();
+ value.readFields(dataInput);
+ add(value);
+ return true;
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
* This constructor allows setting the refClass during construction.
*
* @param refClass
@@ -109,6 +163,9 @@
public final void setConf(Configuration conf) {
this.conf = conf;
+ if (this.refClass == null) {
+ setClass();
+ }
}
private M allocateValue() {
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 9d1084f..d6bf24f 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -21,8 +21,9 @@
import org.apache.hadoop.util.ReflectionUtils;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.graph.VertexCombiner;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -96,17 +97,17 @@
}
/**
- * Get the user's subclassed {@link VertexCombiner}.
+ * Get the user's subclassed {@link MessageCombiner}.
*
* @param conf
* Configuration to check
* @return User's vertex combiner class
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable, M extends Writable> Class<? extends VertexCombiner<I, M>> getVertexCombinerClass(
+ public static <I extends WritableComparable, M extends Writable, P extends Writable> Class<? extends MessageCombiner<I, M, P>> getMessageCombinerClass(
Configuration conf) {
- return (Class<? extends VertexCombiner<I, M>>) conf.getClass(PregelixJob.VERTEX_COMBINER_CLASS, null,
- VertexCombiner.class);
+ return (Class<? extends MessageCombiner<I, M, P>>) conf.getClass(PregelixJob.Message_COMBINER_CLASS,
+ DefaultMessageCombiner.class, MessageCombiner.class);
}
/**
@@ -120,7 +121,7 @@
public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> Class<? extends GlobalAggregator<I, V, E, M, P, F>> getGlobalAggregatorClass(
Configuration conf) {
return (Class<? extends GlobalAggregator<I, V, E, M, P, F>>) conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS,
- GlobalAggregator.class);
+ GlobalCountAggregator.class, GlobalAggregator.class);
}
/**
@@ -131,9 +132,9 @@
* @return Instantiated user vertex combiner class
*/
@SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, M extends Writable> VertexCombiner<I, M> createVertexCombiner(
+ public static <I extends WritableComparable, M extends Writable, P extends Writable> MessageCombiner<I, M, P> createMessageCombiner(
Configuration conf) {
- Class<? extends VertexCombiner<I, M>> vertexCombinerClass = getVertexCombinerClass(conf);
+ Class<? extends MessageCombiner<I, M, P>> vertexCombinerClass = getMessageCombinerClass(conf);
return ReflectionUtils.newInstance(vertexCombinerClass, conf);
}
@@ -301,6 +302,20 @@
}
/**
+ * Get the user's subclassed combiner's partial combine value class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's global aggregate value class
+ */
+ @SuppressWarnings("unchecked")
+ public static <M extends Writable> Class<M> getPartialCombineValueClass(Configuration conf) {
+ if (conf == null)
+ conf = defaultConf;
+ return (Class<M>) conf.getClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, Writable.class);
+ }
+
+ /**
* Get the user's subclassed global aggregator's global value class.
*
* @param conf
@@ -333,7 +348,7 @@
}
/**
- * Create a user aggregate value
+ * Create a user partial aggregate value
*
* @param conf
* Configuration to check
@@ -351,6 +366,30 @@
}
/**
+ * Create a user partial combine value
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user aggregate value
+ */
+ @SuppressWarnings("rawtypes")
+ public static <M extends Writable> M createPartialCombineValue(Configuration conf) {
+ Class<M> aggregateValueClass = getPartialCombineValueClass(conf);
+ try {
+ M instance = aggregateValueClass.newInstance();
+ if (instance instanceof MsgList) {
+ // set conf for msg list, if the value type is msglist
+ ((MsgList) instance).setConf(conf);
+ }
+ return instance;
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+ }
+ }
+
+ /**
* Create a user aggregate value
*
* @param conf
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
new file mode 100755
index 0000000..5702642
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.InputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResetableByteArrayInputStream extends InputStream {
+ private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayInputStream.class.getName());
+
+ private byte[] data;
+ private int position;
+
+ public ResetableByteArrayInputStream() {
+ }
+
+ public void setByteArray(byte[] data, int position) {
+ this.data = data;
+ this.position = position;
+ }
+
+ @Override
+ public int read() {
+ int remaining = data.length - position;
+ int value = remaining > 0 ? (data[position++] & 0xff) : -1;
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
+ }
+ return value;
+ }
+
+ @Override
+ public int read(byte[] bytes, int offset, int length) {
+ int remaining = data.length - position;
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
+ + length + " position: " + position);
+ }
+ if (remaining == 0) {
+ return -1;
+ }
+ int l = Math.min(length, remaining);
+ System.arraycopy(data, position, bytes, offset, l);
+ position += l;
+ return l;
+ }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayOutputStream.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayOutputStream.java
new file mode 100755
index 0000000..307a3ae
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayOutputStream.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResetableByteArrayOutputStream extends OutputStream {
+ private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayOutputStream.class.getName());
+
+ private byte[] data;
+ private int position;
+
+ public ResetableByteArrayOutputStream() {
+ }
+
+ public void setByteArray(byte[] data, int position) {
+ this.data = data;
+ this.position = position;
+ }
+
+ @Override
+ public void write(int b) {
+ int remaining = data.length - position;
+ if (position + 1 > data.length - 1)
+ throw new IndexOutOfBoundsException();
+ data[position] = (byte) b;
+ position++;
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("write(): value: " + b + " remaining: " + remaining + " position: " + position);
+ }
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int length) {
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("write(bytes[], int, int) offset: " + offset + " length: " + length + " position: "
+ + position);
+ }
+ if (position + length > data.length - 1)
+ throw new IndexOutOfBoundsException();
+ System.arraycopy(bytes, offset, data, position, length);
+ position += length;
+ }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 2dddccc..e94331d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -60,6 +60,7 @@
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -129,6 +130,13 @@
Type finalAggregateValueType = argTypes.get(5);
conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, (Class<?>) finalAggregateValueType, Writable.class);
}
+
+ Class combinerClass = conf.getClass(PregelixJob.Message_COMBINER_CLASS, MessageCombiner.class);
+ if (!combinerClass.equals(MessageCombiner.class)) {
+ List<Type> argTypes = ReflectionUtils.getTypeArguments(MessageCombiner.class, combinerClass);
+ Type partialCombineValueType = argTypes.get(2);
+ conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, (Class<?>) partialCombineValueType, Writable.class);
+ }
}
public String getJobId() {
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
index c166903..c0e87d3 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
@@ -23,9 +23,11 @@
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.graph.VertexCombiner;
import edu.uci.ics.pregelix.api.io.VertexWriter;
import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
@@ -41,27 +43,44 @@
/**
* Test whether combiner is called to get the minimum ID in the cluster
*/
- public static class SimpleMinCombiner implements VertexCombiner<VLongWritable, VLongWritable> {
+ public static class SimpleMinCombiner extends MessageCombiner<VLongWritable, VLongWritable, VLongWritable> {
private long min = Long.MAX_VALUE;
private VLongWritable agg = new VLongWritable();
+ private MsgList<VLongWritable> msgList;
@Override
- public void step(VLongWritable vertexIndex, VLongWritable msg) throws IOException {
+ public void step(VLongWritable vertexIndex, VLongWritable msg) throws HyracksDataException {
long value = msg.get();
if (min > value)
min = value;
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
- public void init() {
+ public void init(MsgList msgList) {
min = Long.MAX_VALUE;
+ this.msgList = msgList;
}
@Override
- public VLongWritable finish() {
+ public void step(VLongWritable partialAggregate) throws HyracksDataException {
+ if (min > partialAggregate.get())
+ min = partialAggregate.get();
+ }
+
+ @Override
+ public VLongWritable finishPartial() {
agg.set(min);
return agg;
}
+
+ @Override
+ public MsgList<VLongWritable> finishFinal() {
+ agg.set(min);
+ msgList.clear();
+ msgList.add(agg);
+ return msgList;
+ }
}
private VLongWritable outputValue = new VLongWritable();
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index 2dd639d..0a6402c 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -30,8 +30,10 @@
import com.google.common.collect.Maps;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.graph.VertexCombiner;
import edu.uci.ics.pregelix.api.io.VertexReader;
import edu.uci.ics.pregelix.api.io.VertexWriter;
import edu.uci.ics.pregelix.api.io.generated.GeneratedVertexInputFormat;
@@ -56,24 +58,40 @@
/**
* Test whether combiner is called by summing up the messages.
*/
- public static class SimpleSumCombiner implements VertexCombiner<VLongWritable, DoubleWritable> {
+ public static class SimpleSumCombiner extends MessageCombiner<VLongWritable, DoubleWritable, DoubleWritable> {
private double sum = 0.0;
private DoubleWritable agg = new DoubleWritable();
+ private MsgList<DoubleWritable> msgList;
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void init(MsgList msgList) {
+ sum = 0.0;
+ this.msgList = msgList;
+ }
@Override
- public void step(VLongWritable vertexIndex, DoubleWritable msg) throws IOException {
+ public void step(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
sum += msg.get();
}
@Override
- public void init() {
- sum = 0.0;
+ public DoubleWritable finishPartial() {
+ agg.set(sum);
+ return agg;
}
@Override
- public DoubleWritable finish() {
+ public void step(DoubleWritable partialAggregate) throws HyracksDataException {
+ sum += partialAggregate.get();
+ }
+
+ @Override
+ public MsgList<DoubleWritable> finishFinal() {
agg.set(sum);
- return agg;
+ msgList.clear();
+ msgList.add(agg);
+ return msgList;
}
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
index a5eee58..b039c08 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
@@ -15,7 +15,6 @@
package edu.uci.ics.pregelix.example;
-import java.io.IOException;
import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -23,9 +22,11 @@
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.graph.VertexCombiner;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
import edu.uci.ics.pregelix.example.client.Client;
@@ -39,27 +40,45 @@
/**
* Test whether combiner is called by summing up the messages.
*/
- public static class SimpleMinCombiner implements VertexCombiner<VLongWritable, DoubleWritable> {
+ public static class SimpleMinCombiner extends MessageCombiner<VLongWritable, DoubleWritable, DoubleWritable> {
private double min = Double.MAX_VALUE;
private DoubleWritable agg = new DoubleWritable();
+ private MsgList<DoubleWritable> msgList;
@Override
- public void step(VLongWritable vertexIndex, DoubleWritable msg) throws IOException {
+ public void step(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
double value = msg.get();
if (min > value)
min = value;
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
- public void init() {
+ public void init(MsgList msgList) {
min = Double.MAX_VALUE;
+ this.msgList = msgList;
}
@Override
- public DoubleWritable finish() {
+ public DoubleWritable finishPartial() {
agg.set(min);
return agg;
}
+
+ @Override
+ public void step(DoubleWritable partialAggregate) throws HyracksDataException {
+ double value = partialAggregate.get();
+ if (min > value)
+ min = value;
+ }
+
+ @Override
+ public MsgList<DoubleWritable> finishFinal() {
+ agg.set(min);
+ msgList.clear();
+ msgList.add(agg);
+ return msgList;
+ }
}
private DoubleWritable outputValue = new DoubleWritable();
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
index 22ae6cf..e953261 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
@@ -124,6 +124,7 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index 50662f9..ffff302 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -124,6 +124,7 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
index e425b38..5eba09a 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
@@ -124,6 +124,7 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index b51bd98..76905cd 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -124,6 +124,7 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index a9e43bd..3f8a04f 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -124,6 +124,7 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
index 3719247..f9c4bcd 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
@@ -125,6 +125,7 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
index 90caf6b..c415cba 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
@@ -125,6 +125,7 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
index e4697ab..071bc17 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -30,8 +30,8 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.MsgList;
-import edu.uci.ics.pregelix.api.graph.VertexCombiner;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
@@ -41,7 +41,7 @@
private final Configuration conf;
private final boolean isFinalStage;
private final DataOutput output;
- private VertexCombiner combiner;
+ private MessageCombiner combiner;
private ByteBufferInputStream keyInputStream = new ByteBufferInputStream();
private ByteBufferInputStream valueInputStream = new ByteBufferInputStream();
private DataInput keyInput = new DataInputStream(keyInputStream);
@@ -59,17 +59,15 @@
this.isFinalStage = isFinalStage;
msgList.setConf(this.conf);
- combiner = BspUtils.createVertexCombiner(conf);
+ combiner = BspUtils.createMessageCombiner(conf);
key = BspUtils.createVertexIndex(conf);
- value = BspUtils.createMessageValue(conf);
- combinedResult = BspUtils.createMessageValue(conf);
+ value = !isFinalStage ? BspUtils.createMessageValue(conf) : BspUtils.createPartialCombineValue(conf);
}
@Override
public void init() throws HyracksDataException {
- msgList.clear();
keyRead = false;
- combiner.init();
+ combiner.init(msgList);
}
@Override
@@ -89,10 +87,14 @@
try {
if (!keyRead) {
key.readFields(keyInput);
- keyRead = false;
+ keyRead = true;
}
value.readFields(valueInput);
- combiner.step(key, value);
+ if (!isFinalStage) {
+ combiner.step(key, value);
+ } else {
+ combiner.step(value);
+ }
} catch (IOException e) {
throw new HyracksDataException(e);
}
@@ -102,13 +104,12 @@
@Override
public void finish() throws HyracksDataException {
try {
- combinedResult = combiner.finish();
if (!isFinalStage) {
- combinedResult.write(output);
+ combinedResult = combiner.finishPartial();
} else {
- msgList.add(combinedResult);
- msgList.write(output);
+ combinedResult = combiner.finishFinal();
}
+ combinedResult.write(output);
} catch (IOException e) {
throw new HyracksDataException(e);
}