Disable join rewriting for OR predicates for Feeds.
Change-Id: Ief2fafdc9fcc6c905e44ede13439796894213e7a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1467
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/DisjunctivePredicateToJoinRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/DisjunctivePredicateToJoinRule.java
index 133c833..7b84e98 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/DisjunctivePredicateToJoinRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/DisjunctivePredicateToJoinRule.java
@@ -21,14 +21,14 @@
import java.util.HashSet;
import java.util.List;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
+import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.om.base.AOrderedList;
import org.apache.asterix.om.constants.AsterixConstantValue;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.types.AOrderedListType;
import org.apache.asterix.om.types.IAType;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -37,6 +37,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation;
import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.IndexedNLJoinExpressionAnnotation;
import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
@@ -56,7 +57,10 @@
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
-
+ MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+ if (metadataProvider.isBlockingOperatorDisabled()) {
+ return false;
+ }
SelectOperator select;
if ((select = asSelectOperator(opRef)) == null) {
return false;
@@ -122,24 +126,27 @@
context.computeAndSetTypeEnvironmentForOperator(ets);
ILogicalExpression cExp = new ConstantExpression(new AsterixConstantValue(list));
- Mutable<ILogicalExpression> mutCExp = new MutableObject<ILogicalExpression>(cExp);
+ Mutable<ILogicalExpression> mutCExp = new MutableObject<>(cExp);
IFunctionInfo scanFctInfo = BuiltinFunctions
.getAsterixFunctionInfo(BuiltinFunctions.SCAN_COLLECTION);
UnnestingFunctionCallExpression scanExp = new UnnestingFunctionCallExpression(scanFctInfo, mutCExp);
LogicalVariable scanVar = context.newVar();
- UnnestOperator unn = new UnnestOperator(scanVar, new MutableObject<ILogicalExpression>(scanExp));
- unn.getInputs().add(new MutableObject<ILogicalOperator>(ets));
+ UnnestOperator unn = new UnnestOperator(scanVar, new MutableObject<>(scanExp));
+ unn.getInputs().add(new MutableObject<>(ets));
context.computeAndSetTypeEnvironmentForOperator(unn);
IFunctionInfo eqFctInfo = BuiltinFunctions.getAsterixFunctionInfo(AlgebricksBuiltinFunctions.EQ);
AbstractFunctionCallExpression eqExp = new ScalarFunctionCallExpression(eqFctInfo);
- eqExp.getArguments().add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(scanVar)));
- eqExp.getArguments().add(new MutableObject<ILogicalExpression>(varEx.cloneExpression()));
+ eqExp.getArguments().add(new MutableObject<>(new VariableReferenceExpression(scanVar)));
+ eqExp.getArguments().add(new MutableObject<>(varEx.cloneExpression()));
eqExp.getAnnotations().put(IndexedNLJoinExpressionAnnotation.INSTANCE,
IndexedNLJoinExpressionAnnotation.INSTANCE);
+ BroadcastExpressionAnnotation bcast = new BroadcastExpressionAnnotation();
+ bcast.setObject(BroadcastExpressionAnnotation.BroadcastSide.LEFT); // Broadcast the OR predicates branch.
+ eqExp.getAnnotations().put(BroadcastExpressionAnnotation.BROADCAST_ANNOTATION_KEY, bcast);
- InnerJoinOperator jOp = new InnerJoinOperator(new MutableObject<ILogicalExpression>(eqExp));
- jOp.getInputs().add(new MutableObject<ILogicalOperator>(unn));
+ InnerJoinOperator jOp = new InnerJoinOperator(new MutableObject<>(eqExp));
+ jOp.getInputs().add(new MutableObject<>(unn));
jOp.getInputs().add(select.getInputs().get(0));
opRef.setValue(jOp);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java
index e95fc3e..fc8f77a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java
@@ -18,14 +18,13 @@
*/
package org.apache.asterix.optimizer.rules;
-import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
public class RemoveSortInFeedIngestionRule implements IAlgebraicRewriteRule {
@@ -44,31 +43,16 @@
return false;
}
+ MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+ if (!metadataProvider.isBlockingOperatorDisabled()) {
+ return false;
+ }
AbstractLogicalOperator insertOp = op;
- AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
- boolean isSourceAFeed = false;
- while (descendantOp != null) {
- if (descendantOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
- DataSource dataSource = (DataSource) ((DataSourceScanOperator) descendantOp).getDataSource();
- if (dataSource.getDatasourceType() == DataSource.Type.FEED) {
- isSourceAFeed = true;
- }
- break;
- }
- if (descendantOp.getInputs().isEmpty()) {
- break;
- }
- descendantOp = (AbstractLogicalOperator) descendantOp.getInputs().get(0).getValue();
+ AbstractLogicalOperator prevOp = (AbstractLogicalOperator) insertOp.getInputs().get(0).getValue();
+ if (prevOp.getOperatorTag() == LogicalOperatorTag.ORDER) {
+ insertOp.getInputs().set(0, prevOp.getInputs().get(0));
+ return true;
}
-
- if (isSourceAFeed) {
- AbstractLogicalOperator prevOp = (AbstractLogicalOperator) insertOp.getInputs().get(0).getValue();
- if (prevOp.getOperatorTag() == LogicalOperatorTag.ORDER) {
- insertOp.getInputs().set(0, prevOp.getInputs().get(0));
- return true;
- }
- }
-
return false;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d60ed37..bb65b74 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2171,6 +2171,7 @@
boolean bActiveTxn = true;
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ metadataProvider.disableBlockingOperator();
boolean subscriberRegistered = false;
IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
FeedConnectionId feedConnId = null;
@@ -2410,6 +2411,7 @@
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
metadataProvider.setWriteTransaction(true);
+ metadataProvider.disableBlockingOperator();
SubscribeFeedStatement bfs = (SubscribeFeedStatement) stmt;
bfs.initialize(metadataProvider.getMetadataTxnContext());
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-3.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-3.plan
index 1f686bd2..f4a21e2 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-3.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-3.plan
@@ -8,18 +8,18 @@
-- INSERT_DELETE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- MATERIALIZE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$8] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$11][$$9] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$11] |PARTITIONED|
- -- UNNEST |UNPARTITIONED|
- -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$9] |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$9][$$11] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- DATASOURCE_SCAN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- UNNEST |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1005.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1005.plan
index de339c0..5675545 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1005.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1005.plan
@@ -17,11 +17,11 @@
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STABLE_SORT [$$12(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$12] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- HYBRID_HASH_JOIN [$$1][$$13] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$1] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- UNNEST |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
@@ -29,7 +29,7 @@
-- DATASOURCE_SCAN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- EMPTY_TUPLE_SOURCE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$13] |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
-- ASSIGN |UNPARTITIONED|
-- UNNEST |UNPARTITIONED|
-- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan
index 173aae7..b7e11d3 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan
@@ -37,21 +37,21 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- HYBRID_HASH_JOIN [$$62][$$70] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$62] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$65][$$11] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$65] |PARTITIONED|
- -- UNNEST |UNPARTITIONED|
- -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$11] |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$11][$$65] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- DATASOURCE_SCAN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- UNNEST |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
-- HASH_PARTITION_EXCHANGE [$$70] |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping.plan
index ad011cb..83fb064 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping.plan
@@ -17,11 +17,8 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$93][$$86] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$93] |PARTITIONED|
- -- UNNEST |UNPARTITIONED|
- -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$86] |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$86][$$93] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- HYBRID_HASH_JOIN [$$95][$$90] |PARTITIONED|
@@ -41,3 +38,6 @@
-- DATASOURCE_SCAN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- UNNEST |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast.plan
index 27108be..87bf5bf 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast.plan
@@ -17,11 +17,8 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$93][$$86] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$93] |PARTITIONED|
- -- UNNEST |UNPARTITIONED|
- -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$86] |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$86][$$93] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- HYBRID_HASH_JOIN [$$94][$$90] |PARTITIONED|
@@ -41,3 +38,6 @@
-- DATASOURCE_SCAN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- UNNEST |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index bd991fc..7238db9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -186,6 +186,7 @@
private JobId jobId;
private Map<String, Integer> locks;
private boolean isTemporaryDatasetWriteJob = true;
+ private boolean blockingOperatorDisabled = false;
public MetadataProvider(Dataverse defaultDataverse) {
this.defaultDataverse = defaultDataverse;
@@ -201,6 +202,14 @@
this.config = config;
}
+ public void disableBlockingOperator() {
+ blockingOperatorDisabled = true;
+ }
+
+ public boolean isBlockingOperatorDisabled() {
+ return blockingOperatorDisabled;
+ }
+
@Override
public Map<String, String> getConfig() {
return config;