Disable join rewriting for OR predicates for Feeds.

Change-Id: Ief2fafdc9fcc6c905e44ede13439796894213e7a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1467
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/DisjunctivePredicateToJoinRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/DisjunctivePredicateToJoinRule.java
index 133c833..7b84e98 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/DisjunctivePredicateToJoinRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/DisjunctivePredicateToJoinRule.java
@@ -21,14 +21,14 @@
 import java.util.HashSet;
 import java.util.List;
 
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
+import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.om.base.AOrderedList;
 import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.types.AOrderedListType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -37,6 +37,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IndexedNLJoinExpressionAnnotation;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
@@ -56,7 +57,10 @@
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
-
+        MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+        if (metadataProvider.isBlockingOperatorDisabled()) {
+            return false;
+        }
         SelectOperator select;
         if ((select = asSelectOperator(opRef)) == null) {
             return false;
@@ -122,24 +126,27 @@
         context.computeAndSetTypeEnvironmentForOperator(ets);
 
         ILogicalExpression cExp = new ConstantExpression(new AsterixConstantValue(list));
-        Mutable<ILogicalExpression> mutCExp = new MutableObject<ILogicalExpression>(cExp);
+        Mutable<ILogicalExpression> mutCExp = new MutableObject<>(cExp);
         IFunctionInfo scanFctInfo = BuiltinFunctions
                 .getAsterixFunctionInfo(BuiltinFunctions.SCAN_COLLECTION);
         UnnestingFunctionCallExpression scanExp = new UnnestingFunctionCallExpression(scanFctInfo, mutCExp);
         LogicalVariable scanVar = context.newVar();
-        UnnestOperator unn = new UnnestOperator(scanVar, new MutableObject<ILogicalExpression>(scanExp));
-        unn.getInputs().add(new MutableObject<ILogicalOperator>(ets));
+        UnnestOperator unn = new UnnestOperator(scanVar, new MutableObject<>(scanExp));
+        unn.getInputs().add(new MutableObject<>(ets));
         context.computeAndSetTypeEnvironmentForOperator(unn);
 
         IFunctionInfo eqFctInfo = BuiltinFunctions.getAsterixFunctionInfo(AlgebricksBuiltinFunctions.EQ);
         AbstractFunctionCallExpression eqExp = new ScalarFunctionCallExpression(eqFctInfo);
-        eqExp.getArguments().add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(scanVar)));
-        eqExp.getArguments().add(new MutableObject<ILogicalExpression>(varEx.cloneExpression()));
+        eqExp.getArguments().add(new MutableObject<>(new VariableReferenceExpression(scanVar)));
+        eqExp.getArguments().add(new MutableObject<>(varEx.cloneExpression()));
         eqExp.getAnnotations().put(IndexedNLJoinExpressionAnnotation.INSTANCE,
                 IndexedNLJoinExpressionAnnotation.INSTANCE);
+        BroadcastExpressionAnnotation bcast = new BroadcastExpressionAnnotation();
+        bcast.setObject(BroadcastExpressionAnnotation.BroadcastSide.LEFT); // Broadcast the OR predicates branch.
+        eqExp.getAnnotations().put(BroadcastExpressionAnnotation.BROADCAST_ANNOTATION_KEY, bcast);
 
-        InnerJoinOperator jOp = new InnerJoinOperator(new MutableObject<ILogicalExpression>(eqExp));
-        jOp.getInputs().add(new MutableObject<ILogicalOperator>(unn));
+        InnerJoinOperator jOp = new InnerJoinOperator(new MutableObject<>(eqExp));
+        jOp.getInputs().add(new MutableObject<>(unn));
         jOp.getInputs().add(select.getInputs().get(0));
 
         opRef.setValue(jOp);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java
index e95fc3e..fc8f77a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java
@@ -18,14 +18,13 @@
  */
 package org.apache.asterix.optimizer.rules;
 
-import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 public class RemoveSortInFeedIngestionRule implements IAlgebraicRewriteRule {
@@ -44,31 +43,16 @@
             return false;
         }
 
+        MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+        if (!metadataProvider.isBlockingOperatorDisabled()) {
+            return false;
+        }
         AbstractLogicalOperator insertOp = op;
-        AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-        boolean isSourceAFeed = false;
-        while (descendantOp != null) {
-            if (descendantOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
-                DataSource dataSource = (DataSource) ((DataSourceScanOperator) descendantOp).getDataSource();
-                if (dataSource.getDatasourceType() == DataSource.Type.FEED) {
-                    isSourceAFeed = true;
-                }
-                break;
-            }
-            if (descendantOp.getInputs().isEmpty()) {
-                break;
-            }
-            descendantOp = (AbstractLogicalOperator) descendantOp.getInputs().get(0).getValue();
+        AbstractLogicalOperator prevOp = (AbstractLogicalOperator) insertOp.getInputs().get(0).getValue();
+        if (prevOp.getOperatorTag() == LogicalOperatorTag.ORDER) {
+            insertOp.getInputs().set(0, prevOp.getInputs().get(0));
+            return true;
         }
-
-        if (isSourceAFeed) {
-            AbstractLogicalOperator prevOp = (AbstractLogicalOperator) insertOp.getInputs().get(0).getValue();
-            if (prevOp.getOperatorTag() == LogicalOperatorTag.ORDER) {
-                insertOp.getInputs().set(0, prevOp.getInputs().get(0));
-                return true;
-            }
-        }
-
         return false;
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d60ed37..bb65b74 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2171,6 +2171,7 @@
         boolean bActiveTxn = true;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        metadataProvider.disableBlockingOperator();
         boolean subscriberRegistered = false;
         IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
         FeedConnectionId feedConnId = null;
@@ -2410,6 +2411,7 @@
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         metadataProvider.setWriteTransaction(true);
+        metadataProvider.disableBlockingOperator();
         SubscribeFeedStatement bfs = (SubscribeFeedStatement) stmt;
         bfs.initialize(metadataProvider.getMetadataTxnContext());
 
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-3.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-3.plan
index 1f686bd2..f4a21e2 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-3.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-3.plan
@@ -8,18 +8,18 @@
               -- INSERT_DELETE  |PARTITIONED|
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                   -- MATERIALIZE  |PARTITIONED|
-                    -- HASH_PARTITION_EXCHANGE [$$8]  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                       -- ASSIGN  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- HYBRID_HASH_JOIN [$$11][$$9]  |PARTITIONED|
-                              -- HASH_PARTITION_EXCHANGE [$$11]  |PARTITIONED|
-                                -- UNNEST  |UNPARTITIONED|
-                                  -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                              -- HASH_PARTITION_EXCHANGE [$$9]  |PARTITIONED|
+                            -- HYBRID_HASH_JOIN [$$9][$$11]  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                 -- ASSIGN  |PARTITIONED|
                                   -- STREAM_PROJECT  |PARTITIONED|
                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                       -- DATASOURCE_SCAN  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                -- UNNEST  |UNPARTITIONED|
+                                  -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1005.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1005.plan
index de339c0..5675545 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1005.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1005.plan
@@ -17,11 +17,11 @@
                               }
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                           -- STABLE_SORT [$$12(ASC)]  |PARTITIONED|
-                            -- HASH_PARTITION_EXCHANGE [$$12]  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                               -- STREAM_PROJECT  |PARTITIONED|
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                   -- HYBRID_HASH_JOIN [$$1][$$13]  |PARTITIONED|
-                                    -- HASH_PARTITION_EXCHANGE [$$1]  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                       -- UNNEST  |PARTITIONED|
                                         -- STREAM_PROJECT  |PARTITIONED|
                                           -- ASSIGN  |PARTITIONED|
@@ -29,7 +29,7 @@
                                               -- DATASOURCE_SCAN  |PARTITIONED|
                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                   -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                    -- HASH_PARTITION_EXCHANGE [$$13]  |PARTITIONED|
+                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
                                       -- ASSIGN  |UNPARTITIONED|
                                         -- UNNEST  |UNPARTITIONED|
                                           -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan
index 173aae7..b7e11d3 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan
@@ -37,21 +37,21 @@
                                       -- STREAM_PROJECT  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                           -- HYBRID_HASH_JOIN [$$62][$$70]  |PARTITIONED|
-                                            -- HASH_PARTITION_EXCHANGE [$$62]  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                               -- ASSIGN  |PARTITIONED|
                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- HYBRID_HASH_JOIN [$$65][$$11]  |PARTITIONED|
-                                                      -- HASH_PARTITION_EXCHANGE [$$65]  |PARTITIONED|
-                                                        -- UNNEST  |UNPARTITIONED|
-                                                          -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                      -- HASH_PARTITION_EXCHANGE [$$11]  |PARTITIONED|
+                                                    -- HYBRID_HASH_JOIN [$$11][$$65]  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                         -- STREAM_PROJECT  |PARTITIONED|
                                                           -- ASSIGN  |PARTITIONED|
                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                               -- DATASOURCE_SCAN  |PARTITIONED|
                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                   -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                      -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                        -- UNNEST  |UNPARTITIONED|
+                                                          -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
                                             -- HASH_PARTITION_EXCHANGE [$$70]  |PARTITIONED|
                                               -- ASSIGN  |PARTITIONED|
                                                 -- STREAM_PROJECT  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping.plan
index ad011cb..83fb064 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping.plan
@@ -17,11 +17,8 @@
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                   -- STREAM_PROJECT  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- HYBRID_HASH_JOIN [$$93][$$86]  |PARTITIONED|
-                        -- HASH_PARTITION_EXCHANGE [$$93]  |PARTITIONED|
-                          -- UNNEST  |UNPARTITIONED|
-                            -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                        -- HASH_PARTITION_EXCHANGE [$$86]  |PARTITIONED|
+                      -- HYBRID_HASH_JOIN [$$86][$$93]  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                               -- HYBRID_HASH_JOIN [$$95][$$90]  |PARTITIONED|
@@ -41,3 +38,6 @@
                                         -- DATASOURCE_SCAN  |PARTITIONED|
                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                             -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                          -- UNNEST  |UNPARTITIONED|
+                            -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast.plan
index 27108be..87bf5bf 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast.plan
@@ -17,11 +17,8 @@
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                   -- STREAM_PROJECT  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- HYBRID_HASH_JOIN [$$93][$$86]  |PARTITIONED|
-                        -- HASH_PARTITION_EXCHANGE [$$93]  |PARTITIONED|
-                          -- UNNEST  |UNPARTITIONED|
-                            -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                        -- HASH_PARTITION_EXCHANGE [$$86]  |PARTITIONED|
+                      -- HYBRID_HASH_JOIN [$$86][$$93]  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                               -- HYBRID_HASH_JOIN [$$94][$$90]  |PARTITIONED|
@@ -41,3 +38,6 @@
                                         -- DATASOURCE_SCAN  |PARTITIONED|
                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                             -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                          -- UNNEST  |UNPARTITIONED|
+                            -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index bd991fc..7238db9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -186,6 +186,7 @@
     private JobId jobId;
     private Map<String, Integer> locks;
     private boolean isTemporaryDatasetWriteJob = true;
+    private boolean blockingOperatorDisabled = false;
 
     public MetadataProvider(Dataverse defaultDataverse) {
         this.defaultDataverse = defaultDataverse;
@@ -201,6 +202,14 @@
         this.config = config;
     }
 
+    public void disableBlockingOperator() {
+        blockingOperatorDisabled = true;
+    }
+
+    public boolean isBlockingOperatorDisabled() {
+        return blockingOperatorDisabled;
+    }
+
     @Override
     public Map<String, String> getConfig() {
         return config;