Merged asterix_stabilization r981:r992.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_fix_issue_224@995 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index 09a4c6b..f0880ec 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -45,12 +45,14 @@
 
     private final List<LogicalVariable> lowKeyVarList;
     private final List<LogicalVariable> highKeyVarList;
-    private boolean isPrimaryIndex;
+    private final boolean isPrimaryIndex;
+    private final boolean isEqCondition;
 
     public BTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast,
-            boolean isPrimaryIndex, List<LogicalVariable> lowKeyVarList, List<LogicalVariable> highKeyVarList) {
+            boolean isPrimaryIndex, boolean isEqCondition, List<LogicalVariable> lowKeyVarList, List<LogicalVariable> highKeyVarList) {
         super(idx, requiresBroadcast);
         this.isPrimaryIndex = isPrimaryIndex;
+        this.isEqCondition = isEqCondition;
         this.lowKeyVarList = lowKeyVarList;
         this.highKeyVarList = highKeyVarList;
     }
@@ -100,13 +102,13 @@
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
             IPhysicalPropertiesVector reqdByParent) {
         if (requiresBroadcast) {
-            if (isPrimaryIndex) {
-                // For primary indexes, we require re-partitioning on the primary key, and not a broadcast.
-                // Also, add a local sorting property to enforce a sort before the primary-index operator.
+            // For primary indexes optimizing an equality condition we can reduce the broadcast requirement to hash partitioning.
+            if (isPrimaryIndex && isEqCondition) {
                 StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
                 ListSet<LogicalVariable> searchKeyVars = new ListSet<LogicalVariable>();
                 searchKeyVars.addAll(lowKeyVarList);
                 searchKeyVars.addAll(highKeyVarList);
+                // Also, add a local sorting property to enforce a sort before the primary-index operator.
                 List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>();
                 for (LogicalVariable orderVar : searchKeyVars) {
                     propsLocal.add(new LocalOrderProperty(new OrderColumn(orderVar, OrderKind.ASC)));
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 19791e3..e19466e 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -165,8 +165,8 @@
                                 BTreeJobGenParams btreeJobGenParams = new BTreeJobGenParams();
                                 btreeJobGenParams.readFromFuncArgs(f.getArguments());
                                 op.setPhysicalOperator(new BTreeSearchPOperator(dsi, requiresBroadcast,
-                                        btreeJobGenParams.isPrimaryIndex(), btreeJobGenParams.getLowKeyVarList(),
-                                        btreeJobGenParams.getHighKeyVarList()));
+                                        btreeJobGenParams.isPrimaryIndex(), btreeJobGenParams.isEqCondition(),
+                                        btreeJobGenParams.getLowKeyVarList(), btreeJobGenParams.getHighKeyVarList()));
                                 break;
                             }
                             case RTREE: {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
index 1379bf4..5c65299 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
@@ -191,6 +191,7 @@
         // If we can't figure out how to integrate a certain funcExpr into the current predicate, we just bail by setting this flag.
         boolean couldntFigureOut = false;
         boolean doneWithExprs = false;
+        boolean isEqCondition = false;
         // TODO: For now don't consider prefix searches.
         BitSet setLowKeys = new BitSet(numSecondaryKeys);
         BitSet setHighKeys = new BitSet(numSecondaryKeys);
@@ -212,7 +213,7 @@
             }
             ILogicalExpression searchKeyExpr = AccessMethodUtils.createSearchKeyExpr(optFuncExpr, indexSubTree,
                     probeSubTree);
-            LimitType limit = getLimitType(optFuncExpr);
+            LimitType limit = getLimitType(optFuncExpr, probeSubTree);
             switch (limit) {
                 case EQUAL: {
                     if (lowKeyLimits[keyPos] == null && highKeyLimits[keyPos] == null) {
@@ -221,14 +222,23 @@
                         lowKeyExprs[keyPos] = highKeyExprs[keyPos] = searchKeyExpr;
                         setLowKeys.set(keyPos);
                         setHighKeys.set(keyPos);
+                        isEqCondition = true;
                     } else {
+                        // Has already been set to the identical values. When optimizing join we may encounter the same optimizable expression twice
+                        // (once from analyzing each side of the join)
+                        if (lowKeyLimits[keyPos] == limit && lowKeyInclusive[keyPos] == true
+                                && lowKeyExprs[keyPos].equals(searchKeyExpr) && highKeyLimits[keyPos] == limit
+                                && highKeyInclusive[keyPos] == true && highKeyExprs[keyPos].equals(searchKeyExpr)) {
+                            isEqCondition = true;
+                            break;
+                        }
                         couldntFigureOut = true;
                     }
                     // TODO: For now don't consider prefix searches.
                     // If high and low keys are set, we exit for now.
                     if (setLowKeys.cardinality() == numSecondaryKeys && setHighKeys.cardinality() == numSecondaryKeys) {
                         doneWithExprs = true;
-                    }
+                    }                    
                     break;
                 }
                 case HIGH_EXCLUSIVE: {
@@ -237,6 +247,12 @@
                         highKeyExprs[keyPos] = searchKeyExpr;
                         highKeyInclusive[keyPos] = false;
                     } else {
+                        // Has already been set to the identical values. When optimizing join we may encounter the same optimizable expression twice
+                        // (once from analyzing each side of the join)
+                        if (highKeyLimits[keyPos] == limit && highKeyInclusive[keyPos] == false
+                                && highKeyExprs[keyPos].equals(searchKeyExpr)) {
+                            break;
+                        }
                         couldntFigureOut = true;
                         doneWithExprs = true;
                     }
@@ -248,6 +264,12 @@
                         highKeyExprs[keyPos] = searchKeyExpr;
                         highKeyInclusive[keyPos] = true;
                     } else {
+                        // Has already been set to the identical values. When optimizing join we may encounter the same optimizable expression twice
+                        // (once from analyzing each side of the join)
+                        if (highKeyLimits[keyPos] == limit && highKeyInclusive[keyPos] == true
+                                && highKeyExprs[keyPos].equals(searchKeyExpr)) {
+                            break;
+                        }
                         couldntFigureOut = true;
                         doneWithExprs = true;
                     }
@@ -259,6 +281,12 @@
                         lowKeyExprs[keyPos] = searchKeyExpr;
                         lowKeyInclusive[keyPos] = false;
                     } else {
+                        // Has already been set to the identical values. When optimizing join we may encounter the same optimizable expression twice
+                        // (once from analyzing each side of the join)
+                        if (lowKeyLimits[keyPos] == limit && lowKeyInclusive[keyPos] == false
+                                && lowKeyExprs[keyPos].equals(searchKeyExpr)) {
+                            break;
+                        }
                         couldntFigureOut = true;
                         doneWithExprs = true;
                     }
@@ -270,6 +298,12 @@
                         lowKeyExprs[keyPos] = searchKeyExpr;
                         lowKeyInclusive[keyPos] = true;
                     } else {
+                        // Has already been set to the identical values. When optimizing join we may encounter the same optimizable expression twice
+                        // (once from analyzing each side of the join)
+                        if (lowKeyLimits[keyPos] == limit && lowKeyInclusive[keyPos] == true
+                                && lowKeyExprs[keyPos].equals(searchKeyExpr)) {
+                            break;
+                        }
                         couldntFigureOut = true;
                         doneWithExprs = true;
                     }
@@ -328,6 +362,7 @@
                 dataset.getDataverseName(), dataset.getDatasetName(), retainInput, requiresBroadcast);
         jobGenParams.setLowKeyInclusive(lowKeyInclusive[0]);
         jobGenParams.setHighKeyInclusive(highKeyInclusive[0]);
+        jobGenParams.setIsEqCondition(isEqCondition);
         jobGenParams.setLowKeyVarList(keyVarList, 0, numLowKeys);
         jobGenParams.setHighKeyVarList(keyVarList, numLowKeys, numHighKeys);
 
@@ -441,7 +476,7 @@
         return -1;
     }
 
-    private LimitType getLimitType(IOptimizableFuncExpr optFuncExpr) {
+    private LimitType getLimitType(IOptimizableFuncExpr optFuncExpr, OptimizableOperatorSubTree probeSubTree) {
         ComparisonKind ck = AlgebricksBuiltinFunctions.getComparisonType(optFuncExpr.getFuncExpr()
                 .getFunctionIdentifier());
         LimitType limit = null;
@@ -451,19 +486,19 @@
                 break;
             }
             case GE: {
-                limit = constantIsOnLhs(optFuncExpr) ? LimitType.HIGH_INCLUSIVE : LimitType.LOW_INCLUSIVE;
+                limit = probeIsOnLhs(optFuncExpr, probeSubTree) ? LimitType.HIGH_INCLUSIVE : LimitType.LOW_INCLUSIVE;
                 break;
             }
             case GT: {
-                limit = constantIsOnLhs(optFuncExpr) ? LimitType.HIGH_EXCLUSIVE : LimitType.LOW_EXCLUSIVE;
+                limit = probeIsOnLhs(optFuncExpr, probeSubTree) ? LimitType.HIGH_EXCLUSIVE : LimitType.LOW_EXCLUSIVE;
                 break;
             }
             case LE: {
-                limit = constantIsOnLhs(optFuncExpr) ? LimitType.LOW_INCLUSIVE : LimitType.HIGH_INCLUSIVE;
+                limit = probeIsOnLhs(optFuncExpr, probeSubTree) ? LimitType.LOW_INCLUSIVE : LimitType.HIGH_INCLUSIVE;
                 break;
             }
             case LT: {
-                limit = constantIsOnLhs(optFuncExpr) ? LimitType.LOW_EXCLUSIVE : LimitType.HIGH_EXCLUSIVE;
+                limit = probeIsOnLhs(optFuncExpr, probeSubTree) ? LimitType.LOW_EXCLUSIVE : LimitType.HIGH_EXCLUSIVE;
                 break;
             }
             case NEQ: {
@@ -477,11 +512,16 @@
         return limit;
     }
 
-    // Returns true if there is a constant value on the left-hand side  if the given optimizable function (assuming a binary function).
-    public boolean constantIsOnLhs(IOptimizableFuncExpr optFuncExpr) {
-        return optFuncExpr.getFuncExpr().getArguments().get(0) == optFuncExpr.getConstantVal(0);
+    private boolean probeIsOnLhs(IOptimizableFuncExpr optFuncExpr, OptimizableOperatorSubTree probeSubTree) {
+        if (probeSubTree == null) {
+            // We are optimizing a selection query. Search key is a constant. Return true if constant is on lhs.
+            return optFuncExpr.getFuncExpr().getArguments().get(0) == optFuncExpr.getConstantVal(0);
+        } else {
+            // We are optimizing a join query. Determine whether the feeding variable is on the lhs. 
+            return (optFuncExpr.getOperatorSubTree(0) == null || optFuncExpr.getOperatorSubTree(0) == probeSubTree);
+        }
     }
-
+    
     private ILogicalExpression createSelectCondition(List<Mutable<ILogicalExpression>> predList) {
         if (predList.size() > 1) {
             IFunctionInfo finfo = AsterixBuiltinFunctions.getAsterixFunctionInfo(AlgebricksBuiltinFunctions.AND);
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeJobGenParams.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeJobGenParams.java
index 9a735c9..8b7636b 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeJobGenParams.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeJobGenParams.java
@@ -22,13 +22,14 @@
 
     protected boolean lowKeyInclusive;
     protected boolean highKeyInclusive;
+    protected boolean isEqCondition;
 
     public BTreeJobGenParams() {
         super();
     }
 
-    public BTreeJobGenParams(String indexName, IndexType indexType, String dataverseName, String datasetName, boolean retainInput,
-            boolean requiresBroadcast) {
+    public BTreeJobGenParams(String indexName, IndexType indexType, String dataverseName, String datasetName,
+            boolean retainInput, boolean requiresBroadcast) {
         super(indexName, indexType, dataverseName, datasetName, retainInput, requiresBroadcast);
     }
 
@@ -56,12 +57,17 @@
         this.highKeyInclusive = highKeyInclusive;
     }
 
+    public void setIsEqCondition(boolean isEqConsition) {
+        this.isEqCondition = isEqConsition;
+    }
+
     public void writeToFuncArgs(List<Mutable<ILogicalExpression>> funcArgs) {
         super.writeToFuncArgs(funcArgs);
         writeVarList(lowKeyVarList, funcArgs);
         writeVarList(highKeyVarList, funcArgs);
-        writeKeyInclusive(lowKeyInclusive, funcArgs);
-        writeKeyInclusive(highKeyInclusive, funcArgs);
+        writeBoolean(lowKeyInclusive, funcArgs);
+        writeBoolean(highKeyInclusive, funcArgs);
+        writeBoolean(isEqCondition, funcArgs);
     }
 
     public void readFromFuncArgs(List<Mutable<ILogicalExpression>> funcArgs) {
@@ -71,16 +77,24 @@
         highKeyVarList = new ArrayList<LogicalVariable>();
         int nextIndex = readVarList(funcArgs, index, lowKeyVarList);
         nextIndex = readVarList(funcArgs, nextIndex, highKeyVarList);
-        readKeyInclusives(funcArgs, nextIndex);
+        nextIndex = readKeyInclusives(funcArgs, nextIndex);
+        readIsEqCondition(funcArgs, nextIndex);
     }
 
-    private void readKeyInclusives(List<Mutable<ILogicalExpression>> funcArgs, int index) {
+    private int readKeyInclusives(List<Mutable<ILogicalExpression>> funcArgs, int index) {
         lowKeyInclusive = ((ConstantExpression) funcArgs.get(index).getValue()).getValue().isTrue();
+        // Read the next function argument at index + 1.
         highKeyInclusive = ((ConstantExpression) funcArgs.get(index + 1).getValue()).getValue().isTrue();
+        // We have read two of the function arguments, so the next index is at index + 2.
+        return index + 2;
     }
 
-    private void writeKeyInclusive(boolean keyInclusive, List<Mutable<ILogicalExpression>> funcArgs) {
-        ILogicalExpression keyExpr = keyInclusive ? ConstantExpression.TRUE : ConstantExpression.FALSE;
+    private void readIsEqCondition(List<Mutable<ILogicalExpression>> funcArgs, int index) {
+        isEqCondition = ((ConstantExpression) funcArgs.get(index).getValue()).getValue().isTrue();
+    }
+
+    private void writeBoolean(boolean val, List<Mutable<ILogicalExpression>> funcArgs) {
+        ILogicalExpression keyExpr = val ? ConstantExpression.TRUE : ConstantExpression.FALSE;
         funcArgs.add(new MutableObject<ILogicalExpression>(keyExpr));
     }
 
@@ -92,6 +106,10 @@
         return highKeyVarList;
     }
 
+    public boolean isEqCondition() {
+        return isEqCondition;
+    }
+
     public boolean isLowKeyInclusive() {
         return lowKeyInclusive;
     }