add the interface for global aggregator
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2061 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
new file mode 100644
index 0000000..9920b4c
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
@@ -0,0 +1,30 @@
+package edu.uci.ics.pregelix.api.graph;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+@SuppressWarnings("rawtypes")
+public interface GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, T extends Writable> {
+ /**
+ * initialize combiner
+ */
+ public void init();
+
+ /**
+ * step call
+ *
+ * @param vertexIndex
+ * @param msg
+ * @throws IOException
+ */
+ public void step(Vertex<I, V, E, M> v) throws IOException;
+
+ /**
+ * finish aggregate
+ *
+ * @return the final aggregate value
+ */
+ public T finish();
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
index d6606c6..aa389e2 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
@@ -23,8 +23,8 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunctionFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
-import edu.uci.ics.pregelix.runtime.base.IAggregateFunctionFactory;
import edu.uci.ics.pregelix.runtime.simpleagg.AccumulatingAggregatorFactory;
import edu.uci.ics.pregelix.runtime.simpleagg.AggregationFunctionFactory;
@@ -75,7 +75,7 @@
IAggregateFunctionFactory aggFuncFactory = new AggregationFunctionFactory(new ConfigurationFactory(conf),
isFinal);
IAggregatorDescriptorFactory aggregatorFactory = new AccumulatingAggregatorFactory(
- new IAggregateFunctionFactory[] { aggFuncFactory }, new int[] { 0 }, new int[] {});
+ new IAggregateFunctionFactory[] { aggFuncFactory });
return aggregatorFactory;
}
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunction.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
similarity index 95%
rename from pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunction.java
rename to pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
index 4cd8f52..c05e9b9 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunction.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunction.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.pregelix.runtime.base;
+package edu.uci.ics.pregelix.dataflow.std.base;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunctionFactory.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
similarity index 94%
rename from pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunctionFactory.java
rename to pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
index b8f70b6..4be0bed 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/base/IAggregateFunctionFactory.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.pregelix.runtime.base;
+package edu.uci.ics.pregelix.dataflow.std.base;
import java.io.Serializable;
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index 659100a..8f63b6e 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -26,15 +26,15 @@
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.pregelix.runtime.base.IAggregateFunction;
-import edu.uci.ics.pregelix.runtime.base.IAggregateFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunctionFactory;
public class AccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private IAggregateFunctionFactory[] aggFactories;
- public AccumulatingAggregatorFactory(IAggregateFunctionFactory[] aggFactories, int[] keys, int[] fdColumns) {
+ public AccumulatingAggregatorFactory(IAggregateFunctionFactory[] aggFactories) {
this.aggFactories = aggFactories;
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
index d6352ed..e4697ab 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -34,7 +34,7 @@
import edu.uci.ics.pregelix.api.graph.VertexCombiner;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.runtime.base.IAggregateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class AggregationFunction implements IAggregateFunction {
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
index e45ece0..b6402ef 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
@@ -20,8 +20,8 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.runtime.base.IAggregateFunction;
-import edu.uci.ics.pregelix.runtime.base.IAggregateFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunctionFactory;
public class AggregationFunctionFactory implements IAggregateFunctionFactory {
private static final long serialVersionUID = 1L;