Changes to support Feeds 2.0 (random partitioning of tuples)
commit 8b5c352d831aa0d7e006457f0b1430ac12b54731
Author: ramangrover29 <ramangrover29@gmail.com>
Date: Sat Jun 20 17:16:11 2015 -0700
Changes to support Feeds 2.0 (random partitioning of tuples)
Change-Id: I712c1f019cbc43f66d50620772c3df03c5944394
Reviewed-on: https://asterix-gerrit.ics.uci.edu/296
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Steven Jacobs <sjaco002@ucr.edu>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index d0be2a1..0c9e89a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -45,6 +45,7 @@
PARTITIONINGSPLIT,
PRE_CLUSTERED_GROUP_BY,
PRE_SORTED_DISTINCT_BY,
+ RANDOM_PARTITION_EXCHANGE,
RANDOM_MERGE_EXCHANGE,
RANGE_PARTITION_EXCHANGE,
RANGE_PARTITION_MERGE_EXCHANGE,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index 26313fb..0a67ced 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -18,6 +18,7 @@
import org.apache.commons.lang3.mutable.Mutable;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -39,6 +40,7 @@
public class AssignPOperator extends AbstractPhysicalOperator {
private boolean flushFramesRapidly;
+ private int cardinalityConstraint = 0;
@Override
public PhysicalOperatorTag getOperatorTag() {
@@ -87,7 +89,13 @@
// contribute one Asterix framewriter
RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
- builder.contributeMicroOperator(assign, runtime, recDesc);
+ if (cardinalityConstraint > 0) {
+ AlgebricksCountPartitionConstraint countConstraint = new AlgebricksCountPartitionConstraint(
+ cardinalityConstraint);
+ builder.contributeMicroOperator(assign, runtime, recDesc, countConstraint);
+ } else {
+ builder.contributeMicroOperator(assign, runtime, recDesc);
+ }
// and contribute one edge from its child
ILogicalOperator src = assign.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, assign, 0);
@@ -103,6 +111,11 @@
this.flushFramesRapidly = flushFramesRapidly;
}
+ public void setCardinalityConstraint(int cardinality) {
+ this.cardinalityConstraint = cardinality;
+ }
+
+
@Override
public boolean expensiveThanMaterialization() {
return false;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
new file mode 100644
index 0000000..aa4852a
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
@@ -0,0 +1,72 @@
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.RandomPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+
+public class RandomPartitionPOperator extends AbstractExchangePOperator {
+
+ private final INodeDomain domain;
+
+ public RandomPartitionPOperator(INodeDomain domain) {
+ this.domain = domain;
+ }
+
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ Pair<IConnectorDescriptor, TargetConstraint> connPair = createConnectorDescriptor(builder.getJobSpec(), op,
+ opSchema, context);
+ builder.contributeConnectorWithTargetConstraint(op, connPair.first, connPair.second);
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, op, 0);
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+ ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
+ ITuplePartitionComputerFactory tpcf = new RandomPartitionComputerFactory(domain.cardinality());
+ MToNPartitioningConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
+ return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.RANDOM_PARTITION_EXCHANGE;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ this.deliveredProperties = new StructuralPropertiesVector(new RandomPartitioningProperty(domain), op2
+ .getDeliveredPhysicalProperties().getLocalProperties());
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ return emptyUnaryRequirements();
+ }
+
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index ebbca8b..ebd7da1 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -56,6 +56,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
@@ -538,18 +539,7 @@
case RANDOM: {
RandomPartitioningProperty rpp = (RandomPartitioningProperty) pp;
INodeDomain nd = rpp.getNodeDomain();
- if (nd == null) {
- throw new AlgebricksException("Unknown node domain for " + rpp);
- }
- if (nd.cardinality() == null) {
- throw new AlgebricksException("Unknown cardinality for node domain " + nd);
- }
- if (nd.cardinality() != 1) {
- throw new NotImplementedException(
- "Random repartitioning is only implemented for target domains of"
- + "cardinality equal to 1.");
- }
- pop = new BroadcastPOperator(nd);
+ pop = new RandomPartitionPOperator(nd);
break;
}
default: {
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
new file mode 100644
index 0000000..4f43584
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hyracks.dataflow.common.data.partition;
+
+import java.util.Random;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class RandomPartitionComputerFactory implements
+ ITuplePartitionComputerFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int domainCardinality;
+
+ public RandomPartitionComputerFactory(int domainCardinality) {
+ this.domainCardinality = domainCardinality;
+ }
+
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ return new ITuplePartitionComputer() {
+
+ private final Random random = new Random();
+
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex,
+ int nParts) throws HyracksDataException {
+ return random.nextInt(domainCardinality);
+ }
+ };
+ }
+
+}