[NO ISSUE][ING] Allow external UDF to use runtime parallelism
- user model changes: no
- storage format changes: no
- interface changes:
Details:
1. Enable UDF in feed to use the runtime parallelism.
2. Fix the DefaultNodeDomain where the nodes should be mutliSet but not
list, for comparison purpose.
Change-Id: Ic3b54617be115f51b6a48b9a61581c26b5be8d9d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2398
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
index dfb73ee..c41601b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -35,6 +35,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -66,19 +67,9 @@
}
ExchangeOperator exchangeOp = new ExchangeOperator();
- INodeDomain domain = new INodeDomain() {
- @Override
- public boolean sameAs(INodeDomain domain) {
- return domain == this;
- }
+ INodeDomain runtimeDomain = feedDataSource.getComputationNodeDomain();
- @Override
- public Integer cardinality() {
- return feedDataSource.getComputeCardinality();
- }
- };
-
- exchangeOp.setPhysicalOperator(new RandomPartitionExchangePOperator(domain));
+ exchangeOp.setPhysicalOperator(new RandomPartitionExchangePOperator(runtimeDomain));
op.getInputs().get(0).setValue(exchangeOp);
exchangeOp.getInputs().add(new MutableObject<ILogicalOperator>(scanOp));
ExecutionMode em = ((AbstractLogicalOperator) scanOp).getExecutionMode();
@@ -88,8 +79,9 @@
AssignOperator assignOp = (AssignOperator) opRef.getValue();
AssignPOperator assignPhyOp = (AssignPOperator) assignOp.getPhysicalOperator();
- assignPhyOp.setCardinalityConstraint(domain.cardinality());
-
+ DefaultNodeGroupDomain computationNode = (DefaultNodeGroupDomain) runtimeDomain;
+ String[] nodes = computationNode.getNodes();
+ assignPhyOp.setLocationConstraint(nodes);
return true;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
index 05dd53e..51aca5d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
@@ -163,10 +163,10 @@
} else {
keyAccessScalarFunctionCallExpression = null;
}
- FeedDataSource feedDataSource = new FeedDataSource((MetadataProvider) context.getMetadataProvider(), sourceFeed,
- aqlId, targetDataset, feedOutputType, metaType, pkTypes, keyAccessScalarFunctionCallExpression,
- sourceFeed.getFeedId(), FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","),
- context.getComputationNodeDomain(), feedConnection);
+ FeedDataSource feedDataSource = new FeedDataSource(sourceFeed, aqlId, targetDataset, feedOutputType, metaType,
+ pkTypes, keyAccessScalarFunctionCallExpression, sourceFeed.getFeedId(),
+ FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","), context.getComputationNodeDomain(),
+ feedConnection);
feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy);
return feedDataSource;
}
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
index caa317d..5141e09 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
@@ -18,7 +18,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- HYBRID_HASH_JOIN [$$30][$$31] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$30] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- STREAM_SELECT |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
index 883cd7a..0d46387 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
@@ -23,6 +23,8 @@
*/
use externallibtest;
+SET `compiler.parallelism` "5";
+
connect feed TweetFeed to dataset TweetsFeedIngest apply function `testlib#parseTweet`;
start feed TweetFeed;
\ No newline at end of file
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index 325d23b..5c3ed56 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -56,17 +56,16 @@
private final FeedRuntimeType location;
private final String targetDataset;
private final String[] locations;
- private final int computeCardinality;
+ private final INodeDomain computationNodeDomain;
private final List<IAType> pkTypes;
private final List<ScalarFunctionCallExpression> keyAccessExpression;
private final FeedConnection feedConnection;
- public FeedDataSource(MetadataProvider metadataProvider, Feed feed, DataSourceId id, String targetDataset,
- IAType itemType, IAType metaType, List<IAType> pkTypes,
- List<ScalarFunctionCallExpression> keyAccessExpression, EntityId sourceFeedId, FeedRuntimeType location,
- String[] locations, INodeDomain domain, FeedConnection feedConnection) throws AlgebricksException {
+ public FeedDataSource(Feed feed, DataSourceId id, String targetDataset, IAType itemType, IAType metaType,
+ List<IAType> pkTypes, List<ScalarFunctionCallExpression> keyAccessExpression, EntityId sourceFeedId,
+ FeedRuntimeType location, String[] locations, INodeDomain domain, FeedConnection feedConnection)
+ throws AlgebricksException {
super(id, itemType, metaType, Type.FEED, domain);
- ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
this.feed = feed;
this.targetDataset = targetDataset;
this.sourceFeedId = sourceFeedId;
@@ -74,7 +73,7 @@
this.locations = locations;
this.pkTypes = pkTypes;
this.keyAccessExpression = keyAccessExpression;
- this.computeCardinality = appCtx.getClusterStateManager().getParticipantNodes().size();
+ this.computationNodeDomain = domain;
this.feedConnection = feedConnection;
initFeedDataSource();
}
@@ -119,10 +118,6 @@
}
}
- public int getComputeCardinality() {
- return computeCardinality;
- }
-
public List<IAType> getPkTypes() {
return pkTypes;
}
@@ -208,4 +203,8 @@
public FeedConnection getFeedConnection() {
return feedConnection;
}
+
+ public INodeDomain getComputationNodeDomain() {
+ return computationNodeDomain;
+ }
}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-core/pom.xml b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
index 9f7d2bd..669988c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
@@ -85,5 +85,9 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index 995f6e0..ccd27f4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -22,7 +22,7 @@
import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -44,7 +44,7 @@
public class AssignPOperator extends AbstractPhysicalOperator {
private boolean flushFramesRapidly;
- private int cardinalityConstraint = 0;
+ private String[] locations;
@Override
public PhysicalOperatorTag getOperatorTag() {
@@ -93,10 +93,10 @@
// contribute one Asterix framewriter
RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
- if (cardinalityConstraint > 0) {
- AlgebricksCountPartitionConstraint countConstraint =
- new AlgebricksCountPartitionConstraint(cardinalityConstraint);
- builder.contributeMicroOperator(assign, runtime, recDesc, countConstraint);
+ if (locations != null && locations.length > 0) {
+ AlgebricksAbsolutePartitionConstraint locationConstraint =
+ new AlgebricksAbsolutePartitionConstraint(locations);
+ builder.contributeMicroOperator(assign, runtime, recDesc, locationConstraint);
} else {
builder.contributeMicroOperator(assign, runtime, recDesc);
}
@@ -115,8 +115,8 @@
this.flushFramesRapidly = flushFramesRapidly;
}
- public void setCardinalityConstraint(int cardinality) {
- this.cardinalityConstraint = cardinality;
+ public void setLocationConstraint(String[] locations) {
+ this.locations = locations;
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
index 719c70e..a0ef64e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
@@ -18,16 +18,17 @@
*/
package org.apache.hyracks.algebricks.core.algebra.properties;
-import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.collections4.MultiSet;
+import org.apache.commons.collections4.multiset.HashMultiSet;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
public class DefaultNodeGroupDomain implements INodeDomain {
- private List<String> nodes = new ArrayList<>();
+ private MultiSet<String> nodes = new HashMultiSet<>();
public DefaultNodeGroupDomain(List<String> nodes) {
this.nodes.addAll(nodes);
@@ -67,4 +68,8 @@
public Integer cardinality() {
return nodes.size();
}
+
+ public String[] getNodes() {
+ return nodes.toArray(new String[0]);
+ }
}