Changes to support Feeds 2.0 (random partitioning of tuples)

commit 8b5c352d831aa0d7e006457f0b1430ac12b54731
Author: ramangrover29 <ramangrover29@gmail.com>
Date:   Sat Jun 20 17:16:11 2015 -0700

    Changes to support Feeds 2.0 (random partitioning of tuples)

Change-Id: I712c1f019cbc43f66d50620772c3df03c5944394
Reviewed-on: https://asterix-gerrit.ics.uci.edu/296
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Steven Jacobs <sjaco002@ucr.edu>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index d0be2a1..0c9e89a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -45,6 +45,7 @@
     PARTITIONINGSPLIT,
     PRE_CLUSTERED_GROUP_BY,
     PRE_SORTED_DISTINCT_BY,
+    RANDOM_PARTITION_EXCHANGE,
     RANDOM_MERGE_EXCHANGE,
     RANGE_PARTITION_EXCHANGE,
     RANGE_PARTITION_MERGE_EXCHANGE,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index 26313fb..0a67ced 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -18,6 +18,7 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -39,6 +40,7 @@
 public class AssignPOperator extends AbstractPhysicalOperator {
 
     private boolean flushFramesRapidly;
+    private int cardinalityConstraint = 0;
 
     @Override
     public PhysicalOperatorTag getOperatorTag() {
@@ -87,7 +89,13 @@
 
         // contribute one Asterix framewriter
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
-        builder.contributeMicroOperator(assign, runtime, recDesc);
+        if (cardinalityConstraint > 0) {
+            AlgebricksCountPartitionConstraint countConstraint = new AlgebricksCountPartitionConstraint(
+                    cardinalityConstraint);
+            builder.contributeMicroOperator(assign, runtime, recDesc, countConstraint);
+        } else {
+            builder.contributeMicroOperator(assign, runtime, recDesc);
+        }
         // and contribute one edge from its child
         ILogicalOperator src = assign.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, assign, 0);
@@ -103,6 +111,11 @@
         this.flushFramesRapidly = flushFramesRapidly;
     }
 
+    public void setCardinalityConstraint(int cardinality) {
+        this.cardinalityConstraint = cardinality;
+    }
+
+
     @Override
     public boolean expensiveThanMaterialization() {
         return false;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
new file mode 100644
index 0000000..aa4852a
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
@@ -0,0 +1,72 @@
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.RandomPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+
+public class RandomPartitionPOperator extends AbstractExchangePOperator {
+
+    private final INodeDomain domain;
+
+    public RandomPartitionPOperator(INodeDomain domain) {
+        this.domain = domain;
+    }
+
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        Pair<IConnectorDescriptor, TargetConstraint> connPair = createConnectorDescriptor(builder.getJobSpec(), op,
+                opSchema, context);
+        builder.contributeConnectorWithTargetConstraint(op, connPair.first, connPair.second);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
+        ITuplePartitionComputerFactory tpcf = new RandomPartitionComputerFactory(domain.cardinality());
+        MToNPartitioningConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
+        return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.RANDOM_PARTITION_EXCHANGE;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        this.deliveredProperties = new StructuralPropertiesVector(new RandomPartitioningProperty(domain), op2
+                .getDeliveredPhysicalProperties().getLocalProperties());
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index ebbca8b..ebd7da1 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -56,6 +56,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
@@ -538,18 +539,7 @@
                 case RANDOM: {
                     RandomPartitioningProperty rpp = (RandomPartitioningProperty) pp;
                     INodeDomain nd = rpp.getNodeDomain();
-                    if (nd == null) {
-                        throw new AlgebricksException("Unknown node domain for " + rpp);
-                    }
-                    if (nd.cardinality() == null) {
-                        throw new AlgebricksException("Unknown cardinality for node domain " + nd);
-                    }
-                    if (nd.cardinality() != 1) {
-                        throw new NotImplementedException(
-                                "Random repartitioning is only implemented for target domains of"
-                                        + "cardinality equal to 1.");
-                    }
-                    pop = new BroadcastPOperator(nd);
+                    pop = new RandomPartitionPOperator(nd);
                     break;
                 }
                 default: {
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
new file mode 100644
index 0000000..4f43584
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hyracks.dataflow.common.data.partition;
+
+import java.util.Random;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class RandomPartitionComputerFactory implements
+		ITuplePartitionComputerFactory {
+
+	private static final long serialVersionUID = 1L;
+
+	private final int domainCardinality;
+
+	public RandomPartitionComputerFactory(int domainCardinality) {
+		this.domainCardinality = domainCardinality;
+	}
+
+	@Override
+	public ITuplePartitionComputer createPartitioner() {
+		return new ITuplePartitionComputer() {
+
+			private final Random random = new Random();
+
+			@Override
+			public int partition(IFrameTupleAccessor accessor, int tIndex,
+					int nParts) throws HyracksDataException {
+				return random.nextInt(domainCardinality);
+			}
+		};
+	}
+
+}