add global aggregator support
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2066 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
index 9920b4c..329e48a 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
@@ -5,26 +5,43 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
@SuppressWarnings("rawtypes")
-public interface GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, T extends Writable> {
- /**
- * initialize combiner
- */
- public void init();
+public abstract class GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> {
+ /**
+ * initialize combiner
+ */
+ public abstract void init();
- /**
- * step call
- *
- * @param vertexIndex
- * @param msg
- * @throws IOException
- */
- public void step(Vertex<I, V, E, M> v) throws IOException;
+ /**
+ * step through all vertex at each slave partition
+ *
+ * @param vertexIndex
+ * @param msg
+ * @throws IOException
+ */
+ public abstract void step(Vertex<I, V, E, M> v) throws HyracksDataException;
- /**
- * finish aggregate
- *
- * @return the final aggregate value
- */
- public T finish();
+ /**
+ * step through all intermediate aggregate result
+ *
+ * @param partialResult partial aggregate value
+ */
+ public abstract void step(P partialResult);
+
+
+ /**
+ * finish partial aggregate
+ *
+ * @return the final aggregate value
+ */
+ public abstract P finishPartial();
+
+ /**
+ * finish final aggregate
+ *
+ * @return the final aggregate value
+ */
+ public abstract F finishFinal();
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 61c2cae..73b434a 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -25,6 +25,7 @@
import edu.uci.ics.pregelix.api.graph.VertexCombiner;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.util.GlobalCountAggregator;
/**
* This class represents a Pregelix job.
@@ -50,8 +51,10 @@
public static final String EDGE_VALUE_CLASS = "pregelix.edgeValueClass";
/** Message value class */
public static final String MESSAGE_VALUE_CLASS = "pregelix.messageValueClass";
- /** Aggregate value class */
- public static final String Aggregate_VALUE_CLASS = "pregelix.aggregateValueClass";
+ /** Partial aggregate value class */
+ public static final String PARTIAL_AGGREGATE_VALUE_CLASS = "pregelix.partialAggregateValueClass";
+ /** Final aggregate value class */
+ public static final String FINAL_AGGREGATE_VALUE_CLASS = "pregelix.finalAggregateValueClass";
/** num of vertices */
public static final String NUM_VERTICE = "pregelix.numVertices";
/** num of edges */
@@ -66,6 +69,7 @@
*/
public PregelixJob(String jobName) throws IOException {
super(new Configuration(), jobName);
+ getConfiguration().setClass(GLOBAL_AGGREGATOR_CLASS, GlobalCountAggregator.class, GlobalAggregator.class);
}
/**
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 2a25bdb..9d1084f 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -117,10 +117,10 @@
* @return User's vertex combiner class
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, T extends Writable> Class<? extends GlobalAggregator<I, V, E, M, T>> getGlobalAggregatorClass(
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> Class<? extends GlobalAggregator<I, V, E, M, P, F>> getGlobalAggregatorClass(
Configuration conf) {
- return (Class<? extends GlobalAggregator<I, V, E, M, T>>) conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS,
- null, GlobalAggregator.class);
+ return (Class<? extends GlobalAggregator<I, V, E, M, P, F>>) conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS,
+ GlobalAggregator.class);
}
/**
@@ -145,9 +145,9 @@
* @return Instantiated user vertex combiner class
*/
@SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, T extends Writable> GlobalAggregator<I, V, E, M, T> createGlobalAggregator(
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> GlobalAggregator<I, V, E, M, P, F> createGlobalAggregator(
Configuration conf) {
- Class<? extends GlobalAggregator<I, V, E, M, T>> globalAggregatorClass = getGlobalAggregatorClass(conf);
+ Class<? extends GlobalAggregator<I, V, E, M, P, F>> globalAggregatorClass = getGlobalAggregatorClass(conf);
return ReflectionUtils.newInstance(globalAggregatorClass, conf);
}
@@ -287,17 +287,31 @@
}
/**
- * Get the user's subclassed global aggregator value class.
+ * Get the user's subclassed global aggregator's partial aggregate value class.
*
* @param conf
* Configuration to check
* @return User's global aggregate value class
*/
@SuppressWarnings("unchecked")
- public static <M extends Writable> Class<M> getAggregateValueClass(Configuration conf) {
+ public static <M extends Writable> Class<M> getPartialAggregateValueClass(Configuration conf) {
if (conf == null)
conf = defaultConf;
- return (Class<M>) conf.getClass(PregelixJob.Aggregate_VALUE_CLASS, Writable.class);
+ return (Class<M>) conf.getClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS, Writable.class);
+ }
+
+ /**
+ * Get the user's subclassed global aggregator's global value class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's global aggregate value class
+ */
+ @SuppressWarnings("unchecked")
+ public static <M extends Writable> Class<M> getFinalAggregateValueClass(Configuration conf) {
+ if (conf == null)
+ conf = defaultConf;
+ return (Class<M>) conf.getClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, Writable.class);
}
/**
@@ -325,8 +339,26 @@
* Configuration to check
* @return Instantiated user aggregate value
*/
- public static <M extends Writable> M createAggregateValue(Configuration conf) {
- Class<M> aggregateValueClass = getAggregateValueClass(conf);
+ public static <M extends Writable> M createPartialAggregateValue(Configuration conf) {
+ Class<M> aggregateValueClass = getPartialAggregateValueClass(conf);
+ try {
+ return aggregateValueClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+ }
+ }
+
+ /**
+ * Create a user aggregate value
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user aggregate value
+ */
+ public static <M extends Writable> M createFinalAggregateValue(Configuration conf) {
+ Class<M> aggregateValueClass = getFinalAggregateValueClass(conf);
try {
return aggregateValueClass.newInstance();
} catch (InstantiationException e) {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 7add10f..63c1d8c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -196,6 +196,7 @@
}
private void execute(JobSpecification job) throws Exception {
+ job.setUseConnectorPolicyForScheduling(false);
JobId jobId = hcc.startJob(applicationName, job,
profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.waitForCompletion(jobId);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index e6007f9..2dddccc 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -123,8 +123,11 @@
Class aggregatorClass = conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS, GlobalAggregator.class);
if (!aggregatorClass.equals(GlobalAggregator.class)) {
List<Type> argTypes = ReflectionUtils.getTypeArguments(GlobalAggregator.class, aggregatorClass);
- Type aggregateValueType = argTypes.get(4);
- conf.setClass(PregelixJob.Aggregate_VALUE_CLASS, (Class<?>) aggregateValueType, Writable.class);
+ Type partialAggregateValueType = argTypes.get(4);
+ conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS, (Class<?>) partialAggregateValueType,
+ Writable.class);
+ Type finalAggregateValueType = argTypes.get(5);
+ conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, (Class<?>) finalAggregateValueType, Writable.class);
}
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 6ea6358..5852b72 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -52,6 +52,7 @@
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
@@ -82,6 +83,7 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+ Class<? extends Writable> partialAggregateValueClass = BspUtils.getPartialAggregateValueClass(conf);
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
JobSpecification spec = new JobSpecification();
@@ -118,6 +120,8 @@
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MessageList.class.getName());
+ RecordDescriptor rdPartialAggregate = DataflowUtils
+ .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
@@ -128,8 +132,9 @@
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 3, StartComputeUpdateFunctionFactory.INSTANCE,
- preHookFactory, null, rdMessage, rdDummy, rdFinal);
+ new BTreeDataflowHelperFactory(), inputRdFactory, 4,
+ new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+ rdPartialAggregate, rdFinal);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -140,6 +145,15 @@
PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
/**
+ * final aggregate write operator
+ */
+ IRecordDescriptorFactory aggRdFactory = DataflowUtils
+ .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+ FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
+ configurationFactory, aggRdFactory, jobId);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+
+ /**
* construct bulk-load index operator
*/
int[] fieldPermutation = new int[] { 0, 1 };
@@ -212,7 +226,9 @@
spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
terminateWriter, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 2, btreeBulkLoad, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+ finalAggregator, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 3, btreeBulkLoad, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
localGby, 0, globalGby, 0);
@@ -223,6 +239,7 @@
spec.addRoot(emptySink);
spec.addRoot(btreeBulkLoad);
spec.addRoot(terminateWriter);
+ spec.addRoot(finalAggregator);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
spec.setFrameSize(frameSize);
@@ -235,6 +252,7 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+ Class<? extends Writable> partialAggregateValueClass = BspUtils.getPartialAggregateValueClass(conf);
JobSpecification spec = new JobSpecification();
/**
@@ -295,6 +313,8 @@
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MessageList.class.getName());
+ RecordDescriptor rdPartialAggregate = DataflowUtils
+ .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
@@ -303,8 +323,8 @@
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 3, ComputeUpdateFunctionFactory.INSTANCE,
- preHookFactory, null, rdMessage, rdDummy, rdFinal);
+ new BTreeDataflowHelperFactory(), inputRdFactory, 4, new ComputeUpdateFunctionFactory(confFactory),
+ preHookFactory, null, rdMessage, rdDummy, rdPartialAggregate, rdFinal);
ClusterConfig.setLocationConstraint(spec, join);
/**
@@ -371,8 +391,17 @@
TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
configurationFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ /**
+ * final aggregate write operator
+ */
+ IRecordDescriptorFactory aggRdFactory = DataflowUtils
+ .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+ FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
+ configurationFactory, aggRdFactory, jobId);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
/** connect all operators **/
@@ -383,7 +412,10 @@
spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
terminateWriter, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), join, 2, btreeBulkLoad, 0);
+
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+ finalAggregator, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), join, 3, btreeBulkLoad, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
localGby, 0, globalGby, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 1dda68e..f1951f2 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -52,6 +52,7 @@
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
@@ -83,6 +84,7 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+ Class<? extends Writable> partialAggregateValueClass = BspUtils.getPartialAggregateValueClass(conf);
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
JobSpecification spec = new JobSpecification();
@@ -110,19 +112,22 @@
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
-
+
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MessageList.class.getName());
+ RecordDescriptor rdPartialAggregate = DataflowUtils
+ .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
vertexIdClass.getName(), vertexClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
- recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider,
- typeTraits, comparatorFactories,
- JobGenUtil.getForwardScan(iteration), null, null, true, true, new BTreeDataflowHelperFactory(),
- inputRdFactory, 2, StartComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+ recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
+ new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+ new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+ rdPartialAggregate);
ClusterConfig.setLocationConstraint(spec, scanner);
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
@@ -181,6 +186,15 @@
PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ /**
+ * final aggregate write operator
+ */
+ IRecordDescriptorFactory aggRdFactory = DataflowUtils
+ .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+ FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
+ configurationFactory, aggRdFactory, jobId);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
/** connect all operators **/
@@ -189,6 +203,8 @@
spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
terminateWriter, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+ finalAggregator, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
localGby, 0, globalGby, 0);
@@ -197,6 +213,7 @@
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
spec.addRoot(terminateWriter);
+ spec.addRoot(finalAggregator);
spec.addRoot(emptySink2);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
@@ -210,6 +227,7 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+ Class<? extends Writable> partialAggregateValueClass = BspUtils.getPartialAggregateValueClass(conf);
JobSpecification spec = new JobSpecification();
/**
@@ -262,6 +280,8 @@
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MessageList.class.getName());
+ RecordDescriptor rdPartialAggregate = DataflowUtils
+ .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
@@ -270,8 +290,9 @@
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
- keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 2,
- ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+ keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+ new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+ rdPartialAggregate);
ClusterConfig.setLocationConstraint(spec, join);
/**
@@ -325,6 +346,15 @@
configurationFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+ /**
+ * final aggregate write operator
+ */
+ IRecordDescriptorFactory aggRdFactory = DataflowUtils
+ .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+ FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
+ configurationFactory, aggRdFactory, jobId);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+
ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
@@ -336,6 +366,8 @@
spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
terminateWriter, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+ finalAggregator, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
localGby, 0, globalGby, 0);
@@ -344,6 +376,7 @@
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
spec.addRoot(terminateWriter);
+ spec.addRoot(finalAggregator);
spec.addRoot(emptySink);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 7453cde..d075929 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -50,6 +50,7 @@
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.NonCombinerConnectorPolicyAssignmentPolicy;
@@ -82,6 +83,7 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+ Class<? extends Writable> partialAggregateValueClass = BspUtils.getPartialAggregateValueClass(conf);
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
JobSpecification spec = new JobSpecification();
@@ -116,6 +118,8 @@
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MessageList.class.getName());
+ RecordDescriptor rdPartialAggregate = DataflowUtils
+ .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
@@ -123,8 +127,9 @@
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 2, StartComputeUpdateFunctionFactory.INSTANCE,
- preHookFactory, null, rdMessage, rdDummy);
+ new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+ new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+ rdPartialAggregate);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -173,8 +178,17 @@
TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
configurationFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ /**
+ * final aggregate write operator
+ */
+ IRecordDescriptorFactory aggRdFactory = DataflowUtils
+ .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+ FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
+ configurationFactory, aggRdFactory, jobId);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
/** connect all operators **/
@@ -183,12 +197,15 @@
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 0, globalSort, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
terminateWriter, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+ finalAggregator, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
spec.addRoot(terminateWriter);
+ spec.addRoot(finalAggregator);
spec.addRoot(emptySink2);
spec.setConnectorPolicyAssignmentPolicy(new NonCombinerConnectorPolicyAssignmentPolicy());
@@ -202,6 +219,7 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+ Class<? extends Writable> partialAggregateValueClass = BspUtils.getPartialAggregateValueClass(conf);
JobSpecification spec = new JobSpecification();
/**
@@ -254,6 +272,8 @@
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MessageList.class.getName());
+ RecordDescriptor rdPartialAggregate = DataflowUtils
+ .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
@@ -262,8 +282,9 @@
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
- keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 2,
- ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+ keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+ new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+ rdPartialAggregate);
ClusterConfig.setLocationConstraint(spec, join);
/**
@@ -309,6 +330,15 @@
configurationFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+ /**
+ * final aggregate write operator
+ */
+ IRecordDescriptorFactory aggRdFactory = DataflowUtils
+ .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+ FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
+ configurationFactory, aggRdFactory, jobId);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+
ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
@@ -320,12 +350,15 @@
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 0, globalSort, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
terminateWriter, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+ finalAggregator, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
spec.addRoot(terminateWriter);
+ spec.addRoot(finalAggregator);
spec.addRoot(emptySink);
spec.setConnectorPolicyAssignmentPolicy(new NonCombinerConnectorPolicyAssignmentPolicy());
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 070f114..7c0b259 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -50,6 +50,7 @@
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
@@ -81,6 +82,7 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+ Class<? extends Writable> partialAggregateValueClass = BspUtils.getPartialAggregateValueClass(conf);
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
JobSpecification spec = new JobSpecification();
@@ -112,6 +114,8 @@
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MessageList.class.getName());
+ RecordDescriptor rdPartialAggregate = DataflowUtils
+ .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
@@ -119,8 +123,9 @@
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 2, StartComputeUpdateFunctionFactory.INSTANCE,
- preHookFactory, null, rdMessage, rdDummy);
+ new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+ new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+ rdPartialAggregate);
ClusterConfig.setLocationConstraint(spec, scanner);
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
@@ -186,6 +191,15 @@
PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ /**
+ * final aggregate write operator
+ */
+ IRecordDescriptorFactory aggRdFactory = DataflowUtils
+ .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+ FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
+ configurationFactory, aggRdFactory, jobId);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
/** connect all operators **/
@@ -194,6 +208,8 @@
spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
terminateWriter, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+ finalAggregator, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
@@ -202,6 +218,7 @@
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
spec.addRoot(terminateWriter);
+ spec.addRoot(finalAggregator);
spec.addRoot(emptySink2);
spec.setFrameSize(frameSize);
@@ -214,6 +231,7 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+ Class<? extends Writable> partialAggregateValueClass = BspUtils.getPartialAggregateValueClass(conf);
JobSpecification spec = new JobSpecification();
/**
@@ -266,6 +284,8 @@
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MessageList.class.getName());
+ RecordDescriptor rdPartialAggregate = DataflowUtils
+ .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
@@ -274,8 +294,9 @@
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
- keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 2,
- ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+ keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+ new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+ rdPartialAggregate);
ClusterConfig.setLocationConstraint(spec, join);
/**
@@ -336,6 +357,15 @@
configurationFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+ /**
+ * final aggregate write operator
+ */
+ IRecordDescriptorFactory aggRdFactory = DataflowUtils
+ .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+ FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
+ configurationFactory, aggRdFactory, jobId);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+
ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
@@ -347,6 +377,8 @@
spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
terminateWriter, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+ finalAggregator, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
@@ -355,6 +387,7 @@
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
spec.addRoot(terminateWriter);
+ spec.addRoot(finalAggregator);
spec.addRoot(emptySink);
spec.setFrameSize(frameSize);
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index f57b3fa..f1dff72 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -29,6 +30,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
import edu.uci.ics.pregelix.dataflow.context.StateKey;
@@ -89,6 +91,21 @@
}
}
+ public static void writeGlobalAggregateValue(Configuration conf, String jobId, Writable agg)
+ throws HyracksDataException {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = IterationUtils.TMP_DIR + jobId + "agg";
+ Path path = new Path(pathStr);
+ FSDataOutputStream output = dfs.create(path, true);
+ agg.write(output);
+ output.flush();
+ output.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
public static boolean readTerminationState(Configuration conf, String jobId) throws HyracksDataException {
try {
FileSystem dfs = FileSystem.get(conf);
@@ -103,4 +120,19 @@
}
}
+ public static Writable readGlobalAggregateValue(Configuration conf, String jobId) throws HyracksDataException {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = IterationUtils.TMP_DIR + jobId + "agg";
+ Path path = new Path(pathStr);
+ FSDataInputStream input = dfs.open(path);
+ Writable agg = BspUtils.createFinalAggregateValue(conf);
+ agg.readFields(input);
+ input.close();
+ return agg;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index ed0ecb6..105d3e2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -21,6 +21,8 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.io.Writable;
+
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -28,10 +30,13 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.util.ArrayListWritable;
import edu.uci.ics.pregelix.api.util.ArrayListWritable.ArrayIterator;
+import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
import edu.uci.ics.pregelix.dataflow.util.ResetableByteArrayOutputStream;
@@ -39,7 +44,11 @@
@SuppressWarnings({ "rawtypes", "unchecked" })
public class ComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
private static final long serialVersionUID = 1L;
- public static IUpdateFunctionFactory INSTANCE = new ComputeUpdateFunctionFactory();
+ private final IConfigurationFactory confFactory;
+
+ public ComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
+ this.confFactory = confFactory;
+ }
@Override
public IUpdateFunction createFunction() {
@@ -48,6 +57,7 @@
private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
+ private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
// for writing out to message channel
private IFrameWriter writerMsg;
@@ -66,6 +76,12 @@
private ByteBuffer bufferTerminate;
private boolean terminate = true;
+ // for writing out termination detection control channel
+ private IFrameWriter writerGlobalAggregate;
+ private FrameTupleAppender appenderGlobalAggregate;
+ private ByteBuffer bufferGlobalAggregate;
+ private GlobalAggregator aggregator;
+
private Vertex vertex;
private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
private DataOutput output = new DataOutputStream(bbos);
@@ -78,6 +94,9 @@
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
+ this.aggregator = BspUtils.createGlobalAggregator(confFactory.createConfiguration());
+ this.aggregator.init();
+
this.writerMsg = writers[0];
this.bufferMsg = ctx.allocateFrame();
this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
@@ -90,8 +109,13 @@
this.appenderTerminate = new FrameTupleAppender(ctx.getFrameSize());
this.appenderTerminate.reset(bufferTerminate, true);
- if (writers.length > 2) {
- this.writerAlive = writers[2];
+ this.writerGlobalAggregate = writers[2];
+ this.bufferGlobalAggregate = ctx.allocateFrame();
+ this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+
+ if (writers.length > 3) {
+ this.writerAlive = writers[3];
this.bufferAlive = ctx.allocateFrame();
this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
this.appenderAlive.reset(bufferAlive, true);
@@ -133,6 +157,8 @@
*/
if (terminate && (!vertex.isHalted() || vertex.hasMessage()))
terminate = false;
+
+ aggregator.step(vertex);
}
@Override
@@ -143,6 +169,25 @@
if (!terminate) {
writeOutTerminationState();
}
+
+ /** write out global aggregate value */
+ writeOutGlobalAggregate();
+ }
+
+ private void writeOutGlobalAggregate() throws HyracksDataException {
+ try {
+ /**
+ * get partial aggregate result and flush to the final aggregator
+ */
+ Writable agg = aggregator.finishPartial();
+ agg.write(tbGlobalAggregate.getDataOutput());
+ tbGlobalAggregate.addFieldEndOffset();
+ appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
+ tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
+ FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
}
private void writeOutTerminationState() throws HyracksDataException {
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index 9ea8215..f72b059 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -21,6 +21,8 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.io.Writable;
+
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -28,10 +30,13 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.util.ArrayListWritable.ArrayIterator;
+import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
import edu.uci.ics.pregelix.dataflow.util.ResetableByteArrayOutputStream;
@@ -39,7 +44,11 @@
@SuppressWarnings({ "rawtypes", "unchecked" })
public class StartComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
private static final long serialVersionUID = 1L;
- public static IUpdateFunctionFactory INSTANCE = new StartComputeUpdateFunctionFactory();
+ private final IConfigurationFactory confFactory;
+
+ public StartComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
+ this.confFactory = confFactory;
+ }
@Override
public IUpdateFunction createFunction() {
@@ -48,6 +57,7 @@
private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
+ private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
// for writing out to message channel
private IFrameWriter writerMsg;
@@ -61,6 +71,12 @@
private boolean pushAlive;
// for writing out termination detection control channel
+ private IFrameWriter writerGlobalAggregate;
+ private FrameTupleAppender appenderGlobalAggregate;
+ private ByteBuffer bufferGlobalAggregate;
+ private GlobalAggregator aggregator;
+
+ //for writing out the global aggregate
private IFrameWriter writerTerminate;
private FrameTupleAppender appenderTerminate;
private ByteBuffer bufferTerminate;
@@ -81,6 +97,9 @@
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
+ this.aggregator = BspUtils.createGlobalAggregator(confFactory.createConfiguration());
+ this.aggregator.init();
+
this.writerMsg = writers[0];
this.bufferMsg = ctx.allocateFrame();
this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
@@ -93,8 +112,13 @@
this.appenderTerminate = new FrameTupleAppender(ctx.getFrameSize());
this.appenderTerminate.reset(bufferTerminate, true);
- if (writers.length > 2) {
- this.writerAlive = writers[2];
+ this.writerGlobalAggregate = writers[2];
+ this.bufferGlobalAggregate = ctx.allocateFrame();
+ this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+
+ if (writers.length > 3) {
+ this.writerAlive = writers[3];
this.bufferAlive = ctx.allocateFrame();
this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
this.appenderAlive.reset(bufferAlive, true);
@@ -134,6 +158,11 @@
*/
if (terminate && (!vertex.isHalted() || vertex.hasMessage()))
terminate = false;
+
+ /**
+ * call the global aggregator
+ */
+ aggregator.step(vertex);
}
@Override
@@ -144,6 +173,25 @@
if (!terminate) {
writeOutTerminationState();
}
+
+ /**write out global aggregate value*/
+ writeOutGlobalAggregate();
+ }
+
+ private void writeOutGlobalAggregate() throws HyracksDataException {
+ try {
+ /**
+ * get partial aggregate result and flush to the final aggregator
+ */
+ Writable agg = aggregator.finishPartial();
+ agg.write(tbGlobalAggregate.getDataOutput());
+ tbGlobalAggregate.addFieldEndOffset();
+ appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
+ tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
+ FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
}
private void writeOutTerminationState() throws HyracksDataException {
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunction.java
deleted file mode 100755
index a8bea45..0000000
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunction.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.runtime.simpleagg;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
-
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class GlobalAggregationFunction implements IAggregateFunction {
- private final Configuration conf;
- private final DataOutput output;
- private ByteBufferInputStream valueInputStream = new ByteBufferInputStream();
- private DataInput valueInput = new DataInputStream(valueInputStream);
- private GlobalAggregator globalAggregator;
- private Vertex vertex;
- private Writable aggregateResult;
-
- public GlobalAggregationFunction(IConfigurationFactory confFactory, DataOutput output, boolean isFinalStage)
- throws HyracksDataException {
- this.conf = confFactory.createConfiguration();
- this.output = output;
-
- vertex = BspUtils.createVertex(conf);
- aggregateResult = BspUtils.createAggregateValue(conf);
- globalAggregator = BspUtils.createGlobalAggregator(conf);
- }
-
- @Override
- public void init() throws HyracksDataException {
-
- }
-
- @Override
- public void step(IFrameTupleReference tuple) throws HyracksDataException {
- FrameTupleReference ftr = (FrameTupleReference) tuple;
- IFrameTupleAccessor fta = ftr.getFrameTupleAccessor();
- ByteBuffer buffer = fta.getBuffer();
- int tIndex = ftr.getTupleIndex();
-
- int valueStart = fta.getFieldSlotsLength() + fta.getTupleStartOffset(tIndex)
- + fta.getFieldStartOffset(tIndex, 1);
-
- valueInputStream.setByteBuffer(buffer, valueStart);
- try {
- vertex.readFields(valueInput);
- globalAggregator.step(vertex);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
-
- }
-
- @Override
- public void finish() throws HyracksDataException {
- try {
- aggregateResult = globalAggregator.finish();
- aggregateResult.write(output);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
-}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunctionFactory.java
deleted file mode 100755
index 519634d..0000000
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunctionFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.runtime.simpleagg;
-
-import java.io.DataOutput;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
-import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunctionFactory;
-
-public class GlobalAggregationFunctionFactory implements IAggregateFunctionFactory {
- private static final long serialVersionUID = 1L;
- private final IConfigurationFactory confFactory;
- private final boolean isFinalStage;
-
- public GlobalAggregationFunctionFactory(IConfigurationFactory confFactory, boolean isFinalStage) {
- this.confFactory = confFactory;
- this.isFinalStage = isFinalStage;
- }
-
- @Override
- public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws HyracksException {
- DataOutput output = provider.getDataOutput();
- return new GlobalAggregationFunction(confFactory, output, isFinalStage);
- }
-}