[NO ISSUE][COMP] Refactor physical window operator
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Create a new physical operator (WindowStreamPOperator)
for window operators that do not require partition materialization
- Create AbstractWindowPOperator which is now a base
class for both physical window operators
- Rename WindowSimpleRuntime* to WindowStreamRuntime*
Change-Id: I3863fa3d298aef53d4098be9fc17b0451eb2c23e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3369
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index b26eaca..69eecfd 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -56,8 +56,10 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractWindowPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowStreamPOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
import org.apache.hyracks.algebricks.rewriter.rules.SetAlgebricksPhysicalOperatorsRule;
@@ -241,19 +243,24 @@
}
@Override
- public WindowPOperator createWindowPOperator(WindowOperator winOp) throws AlgebricksException {
- boolean partitionMaterialization = winOp.hasNestedPlans() || AnalysisUtil.hasFunctionWithProperty(winOp,
- BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION);
- boolean frameStartIsMonotonic = AnalysisUtil
- .isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(), winOp.getFrameValueExpressions());
- boolean frameEndIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameEndExpressions(),
- winOp.getFrameValueExpressions());
- boolean nestedTrivialAggregates = winOp.hasNestedPlans()
- && winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
-
- return new WindowPOperator(winOp.getPartitionVarList(), partitionMaterialization,
- winOp.getOrderColumnList(), frameStartIsMonotonic, frameEndIsMonotonic, nestedTrivialAggregates,
- context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+ public AbstractWindowPOperator createWindowPOperator(WindowOperator winOp) throws AlgebricksException {
+ if (winOp.hasNestedPlans()) {
+ boolean frameStartIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(
+ winOp.getFrameStartExpressions(), winOp.getFrameValueExpressions());
+ boolean frameEndIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(
+ winOp.getFrameEndExpressions(), winOp.getFrameValueExpressions());
+ boolean nestedTrivialAggregates =
+ winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
+ return new WindowPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList(),
+ frameStartIsMonotonic, frameEndIsMonotonic, nestedTrivialAggregates,
+ context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+ } else if (AnalysisUtil.hasFunctionWithProperty(winOp,
+ BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION)) {
+ return new WindowPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList(), false, false, false,
+ context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+ } else {
+ return new WindowStreamPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList());
+ }
}
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
index 6cba1b1..a2c1c33 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
@@ -25,7 +25,6 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
public class OperatorResourcesComputer {
@@ -146,10 +145,10 @@
}
private long getWindowRequiredMemory(WindowOperator op) {
- WindowPOperator physOp = (WindowPOperator) op.getPhysicalOperator();
// memory budget configuration only applies to window operators that materialize partitions (non-streaming)
// streaming window operators only need 2 frames: output + (conservative estimate) last frame partition columns
- long memorySize = physOp.isPartitionMaterialization() ? windowMemorySize : 2 * frameSize;
+ long memorySize = op.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.WINDOW_STREAM ? 2 * frameSize
+ : windowMemorySize;
return getOperatorRequiredMemory(op, memorySize);
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
index c0fca94..024a13e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
@@ -318,7 +318,7 @@
WindowPOperator physOp = (WindowPOperator) op.getPhysicalOperator();
visitInternal(op, true);
addOutputBuffer(op); // + previous frame
- if (physOp.isPartitionMaterialization()) {
+ if (physOp.getOperatorTag() == PhysicalOperatorTag.WINDOW) {
addOutputBuffer(op); // + run frame
}
return null;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
index e452d03..c12faf5 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
@@ -11,7 +11,7 @@
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- WINDOW |PARTITIONED|
+ -- WINDOW_STREAM |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STABLE_SORT [$$34(ASC), $$48(ASC)] |PARTITIONED|
-- HASH_PARTITION_EXCHANGE [$$34] |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
index ab78ecc..a1e04ad 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
@@ -3,7 +3,7 @@
-- STREAM_PROJECT |LOCAL|
-- ASSIGN |LOCAL|
-- WINDOW |LOCAL|
- -- WINDOW |LOCAL|
+ -- WINDOW_STREAM |LOCAL|
-- ONE_TO_ONE_EXCHANGE |LOCAL|
-- STABLE_SORT [$$m(ASC), $$t(ASC)] |LOCAL|
-- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
index 5b3d480..b111336 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
@@ -8,7 +8,7 @@
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- WINDOW |LOCAL|
+ -- WINDOW_STREAM |LOCAL|
-- ONE_TO_ONE_EXCHANGE |LOCAL|
-- STABLE_SORT [$$m(ASC), $$t(ASC)] |LOCAL|
-- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 8e1f77f..84d19c1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -78,5 +78,6 @@
UPDATE,
WRITE_RESULT,
INTERSECT,
- WINDOW
+ WINDOW,
+ WINDOW_STREAM
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
new file mode 100644
index 0000000..7065b70
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.win.WindowAggregatorDescriptorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+
+public abstract class AbstractWindowPOperator extends AbstractPhysicalOperator {
+
+ private final List<LogicalVariable> partitionColumns;
+
+ private final List<OrderColumn> orderColumns;
+
+ AbstractWindowPOperator(List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns) {
+ this.partitionColumns = partitionColumns;
+ this.orderColumns = orderColumns;
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
+ IPartitioningProperty pp;
+ switch (op.getExecutionMode()) {
+ case PARTITIONED:
+ pp = new UnorderedPartitionedProperty(new ListSet<>(partitionColumns),
+ context.getComputationNodeDomain());
+ break;
+ case UNPARTITIONED:
+ pp = IPartitioningProperty.UNPARTITIONED;
+ break;
+ case LOCAL:
+ pp = null;
+ break;
+ default:
+ throw new IllegalStateException(op.getExecutionMode().name());
+ }
+
+ // require local order property [pc1, ... pcN, oc1, ... ocN]
+ // accounting for cases where there's an overlap between order and partition columns
+ // TODO replace with required local grouping on partition columns + local order on order columns
+ List<OrderColumn> lopColumns = new ArrayList<>();
+ ListSet<LogicalVariable> pcVars = new ListSet<>();
+ pcVars.addAll(partitionColumns);
+ for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
+ OrderColumn oc = orderColumns.get(oIdx);
+ LogicalVariable ocVar = oc.getColumn();
+ if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, pcVars)) {
+ throw new AlgebricksException(ErrorCode.HYRACKS, ErrorCode.UNSUPPORTED_WINDOW_SPEC,
+ op.getSourceLocation(), String.valueOf(partitionColumns), String.valueOf(orderColumns));
+ }
+ lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
+ }
+ int pIdx = 0;
+ for (LogicalVariable pColumn : pcVars) {
+ lopColumns.add(pIdx++, new OrderColumn(pColumn, OrderOperator.IOrder.OrderKind.ASC));
+ }
+ List<ILocalStructuralProperty> localProps =
+ lopColumns.isEmpty() ? null : Collections.singletonList(new LocalOrderProperty(lopColumns));
+
+ return new PhysicalRequirements(
+ new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, localProps) },
+ IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ WindowOperator winOp = (WindowOperator) op;
+
+ int[] partitionColumnsList = JobGenHelper.projectVariables(inputSchemas[0], partitionColumns);
+
+ IVariableTypeEnvironment opTypeEnv = context.getTypeEnvironment(op);
+ IBinaryComparatorFactory[] partitionComparatorFactories =
+ JobGenHelper.variablesToAscBinaryComparatorFactories(partitionColumns, opTypeEnv, context);
+
+ //TODO not all functions need order comparators
+ IBinaryComparatorFactory[] orderComparatorFactories =
+ JobGenHelper.variablesToBinaryComparatorFactories(orderColumns, opTypeEnv, context);
+
+ IVariableTypeEnvironment inputTypeEnv = context.getTypeEnvironment(op.getInputs().get(0).getValue());
+ IExpressionRuntimeProvider exprRuntimeProvider = context.getExpressionRuntimeProvider();
+ IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider = context.getBinaryComparatorFactoryProvider();
+
+ List<Mutable<ILogicalExpression>> frameStartExprList = winOp.getFrameStartExpressions();
+ IScalarEvaluatorFactory[] frameStartExprEvals =
+ createEvaluatorFactories(frameStartExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+
+ List<Mutable<ILogicalExpression>> frameStartValidationExprList = winOp.getFrameStartValidationExpressions();
+ IScalarEvaluatorFactory[] frameStartValidationExprEvals = createEvaluatorFactories(frameStartValidationExprList,
+ inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+
+ List<Mutable<ILogicalExpression>> frameEndExprList = winOp.getFrameEndExpressions();
+ IScalarEvaluatorFactory[] frameEndExprEvals =
+ createEvaluatorFactories(frameEndExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+
+ List<Mutable<ILogicalExpression>> frameEndValidationExprList = winOp.getFrameEndValidationExpressions();
+ IScalarEvaluatorFactory[] frameEndValidationExprEvals = createEvaluatorFactories(frameEndValidationExprList,
+ inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+
+ List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExprList =
+ winOp.getFrameValueExpressions();
+ Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameValueExprEvalsAndComparators =
+ createEvaluatorAndComparatorFactories(frameValueExprList, Pair::getSecond, Pair::getFirst, inputSchemas,
+ inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context);
+
+ List<Mutable<ILogicalExpression>> frameExcludeExprList = winOp.getFrameExcludeExpressions();
+ Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameExcludeExprEvalsAndComparators =
+ createEvaluatorAndComparatorFactories(frameExcludeExprList, v -> v, v -> OrderOperator.ASC_ORDER,
+ inputSchemas, inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context);
+
+ IScalarEvaluatorFactory frameOffsetExprEval = null;
+ ILogicalExpression frameOffsetExpr = winOp.getFrameOffset().getValue();
+ if (frameOffsetExpr != null) {
+ frameOffsetExprEval =
+ exprRuntimeProvider.createEvaluatorFactory(frameOffsetExpr, inputTypeEnv, inputSchemas, context);
+ }
+
+ int[] projectionColumnsExcludingSubplans = JobGenHelper.projectAllVariables(opSchema);
+
+ int[] runningAggOutColumns = JobGenHelper.projectVariables(opSchema, winOp.getVariables());
+
+ List<Mutable<ILogicalExpression>> runningAggExprs = winOp.getExpressions();
+ int runningAggExprCount = runningAggExprs.size();
+ IRunningAggregateEvaluatorFactory[] runningAggFactories =
+ new IRunningAggregateEvaluatorFactory[runningAggExprCount];
+ for (int i = 0; i < runningAggExprCount; i++) {
+ StatefulFunctionCallExpression expr = (StatefulFunctionCallExpression) runningAggExprs.get(i).getValue();
+ runningAggFactories[i] = exprRuntimeProvider.createRunningAggregateFunctionFactory(expr, inputTypeEnv,
+ inputSchemas, context);
+ }
+
+ int nestedAggOutSchemaSize = 0;
+ WindowAggregatorDescriptorFactory nestedAggFactory = null;
+ if (winOp.hasNestedPlans()) {
+ int opSchemaSizePreSubplans = opSchema.getSize();
+ AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], winOp, opSchema, context);
+ nestedAggOutSchemaSize = opSchema.getSize() - opSchemaSizePreSubplans;
+ nestedAggFactory = new WindowAggregatorDescriptorFactory(subplans);
+ nestedAggFactory.setSourceLocation(winOp.getSourceLocation());
+ }
+
+ AbstractWindowRuntimeFactory runtime = createRuntimeFactory(winOp, partitionColumnsList,
+ partitionComparatorFactories, orderComparatorFactories, frameValueExprEvalsAndComparators.first,
+ frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameStartValidationExprEvals,
+ frameEndExprEvals, frameEndValidationExprEvals, frameExcludeExprEvalsAndComparators.first,
+ frameExcludeExprEvalsAndComparators.second, frameOffsetExprEval, projectionColumnsExcludingSubplans,
+ runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, context);
+ runtime.setSourceLocation(winOp.getSourceLocation());
+
+ // contribute one Asterix framewriter
+ RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(opTypeEnv, opSchema, context);
+ builder.contributeMicroOperator(winOp, runtime, recDesc);
+ // and contribute one edge from its child
+ ILogicalOperator src = winOp.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, winOp, 0);
+ }
+
+ protected abstract AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp,
+ int[] partitionColumnsList, IBinaryComparatorFactory[] partitionComparatorFactories,
+ IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals,
+ IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartExprEvals,
+ IScalarEvaluatorFactory[] frameStartValidationExprEvals, IScalarEvaluatorFactory[] frameEndExprEvals,
+ IScalarEvaluatorFactory[] frameEndValidationExprEvals, IScalarEvaluatorFactory[] frameExcludeExprEvals,
+ IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetExprEval,
+ int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
+ IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
+ WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext context);
+
+ @Override
+ public boolean isMicroOperator() {
+ return true;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
+
+ private IScalarEvaluatorFactory[] createEvaluatorFactories(List<Mutable<ILogicalExpression>> exprList,
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment inputTypeEnv,
+ IExpressionRuntimeProvider exprRuntimeProvider, JobGenContext context) throws AlgebricksException {
+ if (exprList.isEmpty()) {
+ return null;
+ }
+ int ln = exprList.size();
+ IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
+ for (int i = 0; i < ln; i++) {
+ ILogicalExpression expr = exprList.get(i).getValue();
+ evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, inputTypeEnv, inputSchemas, context);
+ }
+ return evals;
+ }
+
+ private <T> Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> createEvaluatorAndComparatorFactories(
+ List<T> exprList, Function<T, Mutable<ILogicalExpression>> exprGetter,
+ Function<T, OrderOperator.IOrder> orderGetter, IOperatorSchema[] inputSchemas,
+ IVariableTypeEnvironment inputTypeEnv, IExpressionRuntimeProvider exprRuntimeProvider,
+ IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider, JobGenContext context)
+ throws AlgebricksException {
+ if (exprList.isEmpty()) {
+ return new Pair<>(null, null);
+ }
+ int ln = exprList.size();
+ IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
+ IBinaryComparatorFactory[] comparators = new IBinaryComparatorFactory[ln];
+ for (int i = 0; i < ln; i++) {
+ T exprObj = exprList.get(i);
+ ILogicalExpression expr = exprGetter.apply(exprObj).getValue();
+ OrderOperator.IOrder order = orderGetter.apply(exprObj);
+ evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, inputTypeEnv, inputSchemas, context);
+ comparators[i] = binaryComparatorFactoryProvider.getBinaryComparatorFactory(inputTypeEnv.getType(expr),
+ order.getKind() == OrderOperator.IOrder.OrderKind.ASC);
+ }
+ return new Pair<>(evals, comparators);
+ }
+
+ private static boolean containsAny(List<OrderColumn> ocList, int startIdx, Set<LogicalVariable> varSet) {
+ for (int i = startIdx, ln = ocList.size(); i < ln; i++) {
+ if (varSet.contains(ocList.get(i).getColumn())) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index 8bd4610..23853e8 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -19,42 +19,13 @@
package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.Set;
-import java.util.function.Function;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.ListSet;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
-import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
@@ -63,18 +34,9 @@
import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRunningRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansUnboundedRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.operators.win.WindowSimpleRuntimeFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.ErrorCode;
-public class WindowPOperator extends AbstractPhysicalOperator {
-
- private final List<LogicalVariable> partitionColumns;
-
- private final boolean partitionMaterialization;
-
- private final List<OrderColumn> orderColumns;
+public final class WindowPOperator extends AbstractWindowPOperator {
private final boolean frameStartIsMonotonic;
@@ -85,12 +47,10 @@
// The maximum number of in-memory frames that this operator can use.
private final int memSizeInFrames;
- public WindowPOperator(List<LogicalVariable> partitionColumns, boolean partitionMaterialization,
- List<OrderColumn> orderColumns, boolean frameStartIsMonotonic, boolean frameEndIsMonotonic,
- boolean nestedTrivialAggregates, int memSizeInFrames) {
- this.partitionColumns = partitionColumns;
- this.partitionMaterialization = partitionMaterialization;
- this.orderColumns = orderColumns;
+ public WindowPOperator(List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns,
+ boolean frameStartIsMonotonic, boolean frameEndIsMonotonic, boolean nestedTrivialAggregates,
+ int memSizeInFrames) {
+ super(partitionColumns, orderColumns);
this.frameStartIsMonotonic = frameStartIsMonotonic;
this.frameEndIsMonotonic = frameEndIsMonotonic;
this.nestedTrivialAggregates = nestedTrivialAggregates;
@@ -103,245 +63,55 @@
}
@Override
- public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
- IPartitioningProperty pp;
- switch (op.getExecutionMode()) {
- case PARTITIONED:
- pp = new UnorderedPartitionedProperty(new ListSet<>(partitionColumns),
- context.getComputationNodeDomain());
- break;
- case UNPARTITIONED:
- pp = IPartitioningProperty.UNPARTITIONED;
- break;
- case LOCAL:
- pp = null;
- break;
- default:
- throw new IllegalStateException(op.getExecutionMode().name());
- }
+ protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList,
+ IBinaryComparatorFactory[] partitionComparatorFactories,
+ IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals,
+ IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartExprEvals,
+ IScalarEvaluatorFactory[] frameStartValidationExprEvals, IScalarEvaluatorFactory[] frameEndExprEvals,
+ IScalarEvaluatorFactory[] frameEndValidationExprEvals, IScalarEvaluatorFactory[] frameExcludeExprEvals,
+ IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetExprEval,
+ int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
+ IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
+ WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext context) {
- // require local order property [pc1, ... pcN, oc1, ... ocN]
- // accounting for cases where there's an overlap between order and partition columns
- // TODO replace with required local grouping on partition columns + local order on order columns
- List<OrderColumn> lopColumns = new ArrayList<>();
- ListSet<LogicalVariable> pcVars = new ListSet<>();
- pcVars.addAll(partitionColumns);
- for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
- OrderColumn oc = orderColumns.get(oIdx);
- LogicalVariable ocVar = oc.getColumn();
- if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, pcVars)) {
- throw new AlgebricksException(ErrorCode.HYRACKS, ErrorCode.UNSUPPORTED_WINDOW_SPEC,
- op.getSourceLocation(), String.valueOf(partitionColumns), String.valueOf(orderColumns));
- }
- lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
- }
- int pIdx = 0;
- for (LogicalVariable pColumn : pcVars) {
- lopColumns.add(pIdx++, new OrderColumn(pColumn, OrderOperator.IOrder.OrderKind.ASC));
- }
- List<ILocalStructuralProperty> localProps =
- lopColumns.isEmpty() ? null : Collections.singletonList(new LocalOrderProperty(lopColumns));
-
- return new PhysicalRequirements(
- new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, localProps) },
- IPartitioningRequirementsCoordinator.NO_COORDINATION);
- }
-
- @Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
- deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
- }
-
- @Override
- public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
- IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
- WindowOperator winOp = (WindowOperator) op;
-
- int[] partitionColumnsList = JobGenHelper.projectVariables(inputSchemas[0], partitionColumns);
-
- IVariableTypeEnvironment opTypeEnv = context.getTypeEnvironment(op);
- IBinaryComparatorFactory[] partitionComparatorFactories =
- JobGenHelper.variablesToAscBinaryComparatorFactories(partitionColumns, opTypeEnv, context);
-
- //TODO not all functions need order comparators
- IBinaryComparatorFactory[] orderComparatorFactories =
- JobGenHelper.variablesToBinaryComparatorFactories(orderColumns, opTypeEnv, context);
-
- IVariableTypeEnvironment inputTypeEnv = context.getTypeEnvironment(op.getInputs().get(0).getValue());
- IExpressionRuntimeProvider exprRuntimeProvider = context.getExpressionRuntimeProvider();
- IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider = context.getBinaryComparatorFactoryProvider();
-
- List<Mutable<ILogicalExpression>> frameStartExprList = winOp.getFrameStartExpressions();
- IScalarEvaluatorFactory[] frameStartExprEvals =
- createEvaluatorFactories(frameStartExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
-
- List<Mutable<ILogicalExpression>> frameStartValidationExprList = winOp.getFrameStartValidationExpressions();
- IScalarEvaluatorFactory[] frameStartValidationExprEvals = createEvaluatorFactories(frameStartValidationExprList,
- inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
-
- List<Mutable<ILogicalExpression>> frameEndExprList = winOp.getFrameEndExpressions();
- IScalarEvaluatorFactory[] frameEndExprEvals =
- createEvaluatorFactories(frameEndExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
-
- List<Mutable<ILogicalExpression>> frameEndValidationExprList = winOp.getFrameEndValidationExpressions();
- IScalarEvaluatorFactory[] frameEndValidationExprEvals = createEvaluatorFactories(frameEndValidationExprList,
- inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
-
- List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExprList =
- winOp.getFrameValueExpressions();
- Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameValueExprEvalsAndComparators =
- createEvaluatorAndComparatorFactories(frameValueExprList, Pair::getSecond, Pair::getFirst, inputSchemas,
- inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context);
-
- List<Mutable<ILogicalExpression>> frameExcludeExprList = winOp.getFrameExcludeExpressions();
- Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameExcludeExprEvalsAndComparators =
- createEvaluatorAndComparatorFactories(frameExcludeExprList, v -> v, v -> OrderOperator.ASC_ORDER,
- inputSchemas, inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context);
-
- IScalarEvaluatorFactory frameOffsetExprEval = null;
- ILogicalExpression frameOffsetExpr = winOp.getFrameOffset().getValue();
- if (frameOffsetExpr != null) {
- frameOffsetExprEval =
- exprRuntimeProvider.createEvaluatorFactory(frameOffsetExpr, inputTypeEnv, inputSchemas, context);
- }
-
- int[] projectionColumnsExcludingSubplans = JobGenHelper.projectAllVariables(opSchema);
-
- int[] runningAggOutColumns = JobGenHelper.projectVariables(opSchema, winOp.getVariables());
-
- List<Mutable<ILogicalExpression>> runningAggExprs = winOp.getExpressions();
- int runningAggExprCount = runningAggExprs.size();
- IRunningAggregateEvaluatorFactory[] runningAggFactories =
- new IRunningAggregateEvaluatorFactory[runningAggExprCount];
- for (int i = 0; i < runningAggExprCount; i++) {
- StatefulFunctionCallExpression expr = (StatefulFunctionCallExpression) runningAggExprs.get(i).getValue();
- runningAggFactories[i] = exprRuntimeProvider.createRunningAggregateFunctionFactory(expr, inputTypeEnv,
- inputSchemas, context);
- }
-
- AbstractWindowRuntimeFactory runtime = null;
- if (winOp.hasNestedPlans()) {
- int opSchemaSizePreSubplans = opSchema.getSize();
- AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], winOp, opSchema, context);
- int aggregatorOutputSchemaSize = opSchema.getSize() - opSchemaSizePreSubplans;
- WindowAggregatorDescriptorFactory nestedAggFactory = new WindowAggregatorDescriptorFactory(subplans);
- nestedAggFactory.setSourceLocation(winOp.getSourceLocation());
-
- int frameMaxObjects = winOp.getFrameMaxObjects();
-
- // special cases
- if (frameStartExprList.isEmpty() && frameExcludeExprList.isEmpty() && frameOffsetExpr == null) {
- if (frameEndExprList.isEmpty()) {
- // special case #1: frame == whole partition, no exclusions, no offset
- runtime = new WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList,
- partitionComparatorFactories, orderComparatorFactories, frameMaxObjects,
- projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
- aggregatorOutputSchemaSize, nestedAggFactory, memSizeInFrames);
- } else if (frameEndIsMonotonic && nestedTrivialAggregates) {
- // special case #2: accumulating frame from beginning of the partition, no exclusions, no offset,
- // trivial aggregate subplan ( aggregate + nts )
- nestedAggFactory.setPartialOutputEnabled(true);
- runtime = new WindowNestedPlansRunningRuntimeFactory(partitionColumnsList,
- partitionComparatorFactories, orderComparatorFactories,
- frameValueExprEvalsAndComparators.first, frameValueExprEvalsAndComparators.second,
- frameEndExprEvals, frameEndValidationExprEvals, frameMaxObjects,
- context.getBinaryBooleanInspectorFactory(), projectionColumnsExcludingSubplans,
- runningAggOutColumns, runningAggFactories, aggregatorOutputSchemaSize, nestedAggFactory,
- memSizeInFrames);
- }
- }
- // default case
- if (runtime == null) {
- runtime = new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
- orderComparatorFactories, frameValueExprEvalsAndComparators.first,
- frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameStartValidationExprEvals,
- frameStartIsMonotonic, frameEndExprEvals, frameEndValidationExprEvals,
- frameExcludeExprEvalsAndComparators.first, winOp.getFrameExcludeNegationStartIdx(),
- frameExcludeExprEvalsAndComparators.second, frameOffsetExprEval, frameMaxObjects,
- context.getBinaryBooleanInspectorFactory(), context.getBinaryIntegerInspectorFactory(),
- projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
- aggregatorOutputSchemaSize, nestedAggFactory, memSizeInFrames);
- }
- } else if (partitionMaterialization) {
- runtime = new WindowMaterializingRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
+ // special cases
+ if (!winOp.hasNestedPlans()) {
+ return new WindowMaterializingRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns,
runningAggFactories, memSizeInFrames);
- } else {
- runtime = new WindowSimpleRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
- orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns,
- runningAggFactories);
}
- runtime.setSourceLocation(winOp.getSourceLocation());
- // contribute one Asterix framewriter
- RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(opTypeEnv, opSchema, context);
- builder.contributeMicroOperator(winOp, runtime, recDesc);
- // and contribute one edge from its child
- ILogicalOperator src = winOp.getInputs().get(0).getValue();
- builder.contributeGraphEdge(src, 0, winOp, 0);
- }
-
- @Override
- public boolean isMicroOperator() {
- return true;
- }
-
- @Override
- public boolean expensiveThanMaterialization() {
- return true;
- }
-
- public boolean isPartitionMaterialization() {
- return partitionMaterialization;
- }
-
- private IScalarEvaluatorFactory[] createEvaluatorFactories(List<Mutable<ILogicalExpression>> exprList,
- IOperatorSchema[] inputSchemas, IVariableTypeEnvironment inputTypeEnv,
- IExpressionRuntimeProvider exprRuntimeProvider, JobGenContext context) throws AlgebricksException {
- if (exprList.isEmpty()) {
- return null;
- }
- int ln = exprList.size();
- IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
- for (int i = 0; i < ln; i++) {
- ILogicalExpression expr = exprList.get(i).getValue();
- evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, inputTypeEnv, inputSchemas, context);
- }
- return evals;
- }
-
- private <T> Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> createEvaluatorAndComparatorFactories(
- List<T> exprList, Function<T, Mutable<ILogicalExpression>> exprGetter,
- Function<T, OrderOperator.IOrder> orderGetter, IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment inputTypeEnv, IExpressionRuntimeProvider exprRuntimeProvider,
- IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider, JobGenContext context)
- throws AlgebricksException {
- if (exprList.isEmpty()) {
- return new Pair<>(null, null);
- }
- int ln = exprList.size();
- IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
- IBinaryComparatorFactory[] comparators = new IBinaryComparatorFactory[ln];
- for (int i = 0; i < ln; i++) {
- T exprObj = exprList.get(i);
- ILogicalExpression expr = exprGetter.apply(exprObj).getValue();
- OrderOperator.IOrder order = orderGetter.apply(exprObj);
- evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, inputTypeEnv, inputSchemas, context);
- comparators[i] = binaryComparatorFactoryProvider.getBinaryComparatorFactory(inputTypeEnv.getType(expr),
- order.getKind() == OrderOperator.IOrder.OrderKind.ASC);
- }
- return new Pair<>(evals, comparators);
- }
-
- private boolean containsAny(List<OrderColumn> ocList, int startIdx, Set<LogicalVariable> varSet) {
- for (int i = startIdx, ln = ocList.size(); i < ln; i++) {
- if (varSet.contains(ocList.get(i).getColumn())) {
- return true;
+ boolean hasFrameStart = frameStartExprEvals != null && frameStartExprEvals.length > 0;
+ boolean hasFrameEnd = frameEndExprEvals != null && frameEndExprEvals.length > 0;
+ boolean hasFrameExclude = frameExcludeExprEvals != null && frameExcludeExprEvals.length > 0;
+ boolean hasFrameOffset = frameOffsetExprEval != null;
+ if (!hasFrameStart && !hasFrameExclude && !hasFrameOffset) {
+ if (!hasFrameEnd) {
+ // special case #1: frame == whole partition, no exclusions, no offset
+ return new WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
+ orderComparatorFactories, winOp.getFrameMaxObjects(), projectionColumnsExcludingSubplans,
+ runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory,
+ memSizeInFrames);
+ } else if (frameEndIsMonotonic && nestedTrivialAggregates) {
+ // special case #2: accumulating frame from beginning of the partition, no exclusions, no offset,
+ // trivial aggregate subplan ( aggregate + nts )
+ nestedAggFactory.setPartialOutputEnabled(true);
+ return new WindowNestedPlansRunningRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
+ orderComparatorFactories, frameValueExprEvals, frameValueComparatorFactories, frameEndExprEvals,
+ frameEndValidationExprEvals, winOp.getFrameMaxObjects(),
+ context.getBinaryBooleanInspectorFactory(), projectionColumnsExcludingSubplans,
+ runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory,
+ memSizeInFrames);
}
}
- return false;
+
+ // default case
+ return new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
+ orderComparatorFactories, frameValueExprEvals, frameValueComparatorFactories, frameStartExprEvals,
+ frameStartValidationExprEvals, frameStartIsMonotonic, frameEndExprEvals, frameEndValidationExprEvals,
+ frameExcludeExprEvals, winOp.getFrameExcludeNegationStartIdx(), frameExcludeComparatorFactories,
+ frameOffsetExprEval, winOp.getFrameMaxObjects(), context.getBinaryBooleanInspectorFactory(),
+ context.getBinaryIntegerInspectorFactory(), projectionColumnsExcludingSubplans, runningAggOutColumns,
+ runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, memSizeInFrames);
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
new file mode 100644
index 0000000..33b47ec
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.win.WindowAggregatorDescriptorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.win.WindowStreamRuntimeFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public final class WindowStreamPOperator extends AbstractWindowPOperator {
+
+ public WindowStreamPOperator(List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns) {
+ super(partitionColumns, orderColumns);
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.WINDOW_STREAM;
+ }
+
+ @Override
+ protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList,
+ IBinaryComparatorFactory[] partitionComparatorFactories,
+ IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals,
+ IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartExprEvals,
+ IScalarEvaluatorFactory[] frameStartValidationExprEvals, IScalarEvaluatorFactory[] frameEndExprEvals,
+ IScalarEvaluatorFactory[] frameEndValidationExprEvals, IScalarEvaluatorFactory[] frameExcludeExprEvals,
+ IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetExprEval,
+ int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
+ IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
+ WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext context) {
+ return new WindowStreamRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
+ orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns,
+ runningAggFactories);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index f127898..e6cdc28 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -79,6 +79,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractWindowPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.BulkloadPOperator;
@@ -468,8 +469,8 @@
return createWindowPOperator(op);
}
- protected WindowPOperator createWindowPOperator(WindowOperator op) throws AlgebricksException {
- return new WindowPOperator(op.getPartitionVarList(), true, op.getOrderColumnList(), false, false, false,
+ protected AbstractWindowPOperator createWindowPOperator(WindowOperator op) throws AlgebricksException {
+ return new WindowPOperator(op.getPartitionVarList(), op.getOrderColumnList(), false, false, false,
context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java
similarity index 94%
rename from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
rename to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java
index f7f1a25..d23d4e7 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java
@@ -30,9 +30,9 @@
/**
* Runtime for window operators that evaluates running aggregates without partition materialization.
*/
-class WindowSimplePushRuntime extends AbstractWindowPushRuntime {
+class WindowStreamPushRuntime extends AbstractWindowPushRuntime {
- WindowSimplePushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
+ WindowStreamPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns,
IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx,
SourceLocation sourceLoc) {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java
similarity index 82%
rename from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
rename to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java
index 2d1cdde..be368a9 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java
@@ -27,13 +27,14 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
/**
- * Runtime factory for window operators that evaluates running aggregates without partition materialization.
+ * Runtime factory for window operators that evaluates running aggregates in a streaming fashion
+ * (without partition materialization).
*/
-public class WindowSimpleRuntimeFactory extends AbstractWindowRuntimeFactory {
+public class WindowStreamRuntimeFactory extends AbstractWindowRuntimeFactory {
private static final long serialVersionUID = 1L;
- public WindowSimpleRuntimeFactory(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
+ public WindowStreamRuntimeFactory(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumnsExcludingSubplans,
int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
@@ -42,13 +43,13 @@
@Override
public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
- return new WindowSimplePushRuntime(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
+ return new WindowStreamPushRuntime(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
projectionList, runningAggOutColumns, runningAggFactories, ctx, sourceLoc);
}
@Override
public String toString() {
- return "window (" + Arrays.toString(partitionColumns) + ") " + Arrays.toString(runningAggOutColumns) + " := "
- + Arrays.toString(runningAggFactories);
+ return "window-stream (" + Arrays.toString(partitionColumns) + ") " + Arrays.toString(runningAggOutColumns)
+ + " := " + Arrays.toString(runningAggFactories);
}
}