Improved index NL joins on primary btree indexes by requiring an unordered partitioning property instead of a broadcast.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@839 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index dad8d0a..0486060 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -11,6 +11,7 @@
 import edu.uci.ics.asterix.optimizer.rules.am.BTreeJobGenParams;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -23,8 +24,18 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 
@@ -33,8 +44,16 @@
  */
 public class BTreeSearchPOperator extends IndexSearchPOperator {
 
-    public BTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast) {
+    private final List<LogicalVariable> lowKeyVarList;
+    private final List<LogicalVariable> highKeyVarList;
+    private boolean isPrimaryIndex;
+
+    public BTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast,
+            boolean isPrimaryIndex, List<LogicalVariable> lowKeyVarList, List<LogicalVariable> highKeyVarList) {
         super(idx, requiresBroadcast);
+        this.isPrimaryIndex = isPrimaryIndex;
+        this.lowKeyVarList = lowKeyVarList;
+        this.highKeyVarList = highKeyVarList;
     }
 
     @Override
@@ -79,4 +98,31 @@
         ILogicalOperator srcExchange = unnestMap.getInputs().get(0).getValue();
         builder.contributeGraphEdge(srcExchange, 0, unnestMap, 0);
     }
+    
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        if (requiresBroadcast) {
+            if (isPrimaryIndex) {
+                // For primary indexes, we require re-partitioning on the primary key, and not a broadcast.
+                // Also, add a local sorting property to enforce a sort before the primary-index operator.
+                StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+                ListSet<LogicalVariable> searchKeyVars = new ListSet<LogicalVariable>();
+                searchKeyVars.addAll(lowKeyVarList);
+                searchKeyVars.addAll(highKeyVarList);
+                List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>();
+                for (LogicalVariable orderVar : searchKeyVars) {
+                    propsLocal.add(new LocalOrderProperty(new OrderColumn(orderVar, OrderKind.ASC)));
+                }
+                pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(searchKeyVars, null),
+                        propsLocal);
+                return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+            } else {
+                StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+                pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
+                return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+            }
+        } else {
+            return super.getRequiredPropertiesForChildren(op, reqdByParent);
+        }
+    }
 }
\ No newline at end of file
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/IndexSearchPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/IndexSearchPOperator.java
index 65225ca..f6c926b 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/IndexSearchPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/IndexSearchPOperator.java
@@ -23,8 +23,8 @@
  */
 public abstract class IndexSearchPOperator extends AbstractScanPOperator {
 
-    private final IDataSourceIndex<String, AqlSourceId> idx;
-    private final boolean requiresBroadcast;
+    protected final IDataSourceIndex<String, AqlSourceId> idx;
+    protected final boolean requiresBroadcast;
 
     public IndexSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast) {
         this.idx = idx;
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index 0f28239..5c61715 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -203,6 +203,8 @@
         physicalRewritesAllLevels.add(new PullPositionalVariableFromUnnestRule());
         physicalRewritesAllLevels.add(new PushProjectDownRule());
         physicalRewritesAllLevels.add(new InsertProjectBeforeUnionRule());
+        // After adding projects, we may need need to set physical operators again.
+        physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule());
         return physicalRewritesAllLevels;
     }
 
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 74f790f..f57cfb4 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -14,6 +14,7 @@
 import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.optimizer.rules.am.AccessMethodJobGenParams;
+import edu.uci.ics.asterix.optimizer.rules.am.BTreeJobGenParams;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
@@ -161,7 +162,11 @@
                         boolean requiresBroadcast = jobGenParams.getRequiresBroadcast();
                         switch (indexType) {
                             case BTREE: {
-                                op.setPhysicalOperator(new BTreeSearchPOperator(dsi, requiresBroadcast));
+                                BTreeJobGenParams btreeJobGenParams = new BTreeJobGenParams();
+                                btreeJobGenParams.readFromFuncArgs(f.getArguments());
+                                op.setPhysicalOperator(new BTreeSearchPOperator(dsi, requiresBroadcast,
+                                        btreeJobGenParams.isPrimaryIndex(), btreeJobGenParams.getLowKeyVarList(),
+                                        btreeJobGenParams.getHighKeyVarList()));
                                 break;
                             }
                             case RTREE: {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
index af30163..93099e9 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
@@ -23,7 +23,8 @@
     protected String datasetName;
     protected boolean retainInput;
     protected boolean requiresBroadcast;
-
+    protected boolean isPrimaryIndex;
+    
     private final int NUM_PARAMS = 5;
 
     public AccessMethodJobGenParams() {
@@ -36,6 +37,7 @@
         this.datasetName = datasetName;
         this.retainInput = retainInput;
         this.requiresBroadcast = requiresBroadcast;
+        this.isPrimaryIndex = datasetName.equals(indexName);
     }
 
     public void writeToFuncArgs(List<Mutable<ILogicalExpression>> funcArgs) {
@@ -52,6 +54,7 @@
         datasetName = AccessMethodUtils.getStringConstant(funcArgs.get(2));
         retainInput = AccessMethodUtils.getBooleanConstant(funcArgs.get(3));
         requiresBroadcast = AccessMethodUtils.getBooleanConstant(funcArgs.get(4));
+        isPrimaryIndex = datasetName.equals(indexName);
     }
 
     public String getIndexName() {
@@ -100,4 +103,8 @@
     protected int getNumParams() {
         return NUM_PARAMS;
     }
+    
+    public boolean isPrimaryIndex() {
+        return isPrimaryIndex;
+    }
 }
diff --git a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_01.plan b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_01.plan
index 80eec0d..b9b830e 100644
--- a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_01.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_01.plan
@@ -4,9 +4,12 @@
       -- ASSIGN  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
           -- BTREE_SEARCH  |UNPARTITIONED|
-            -- BROADCAST_EXCHANGE  |PARTITIONED|
-              -- ASSIGN  |PARTITIONED|
-                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  -- DATASOURCE_SCAN  |PARTITIONED|
-                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+              -- STABLE_SORT [$$11(ASC)]  |LOCAL|
+                -- HASH_PARTITION_EXCHANGE [$$11]  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- DATASOURCE_SCAN  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_02.plan b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_02.plan
index 80eec0d..d11dc6a 100644
--- a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_02.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_02.plan
@@ -4,9 +4,12 @@
       -- ASSIGN  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
           -- BTREE_SEARCH  |UNPARTITIONED|
-            -- BROADCAST_EXCHANGE  |PARTITIONED|
-              -- ASSIGN  |PARTITIONED|
-                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  -- DATASOURCE_SCAN  |PARTITIONED|
-                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+              -- STABLE_SORT [$$10(ASC)]  |LOCAL|
+                -- HASH_PARTITION_EXCHANGE [$$10]  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- DATASOURCE_SCAN  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_03.plan b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_03.plan
index 02f36b5..a2e27c9 100644
--- a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_03.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_03.plan
@@ -4,7 +4,7 @@
       -- ASSIGN  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
           -- BTREE_SEARCH  |UNPARTITIONED|
-            -- BROADCAST_EXCHANGE  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
               -- DATASOURCE_SCAN  |PARTITIONED|
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                   -- EMPTY_TUPLE_SOURCE  |PARTITIONED|