[ASTERIXDB-3287][COMP] Introduce write operator
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Add the logical and physical write operators. We utilize
the old (and deprecated) operators.
Change-Id: Ib4fca256c6bdfa4b83890c285f509d476f130a54
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17891
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index bf86cc5..29984e6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -38,6 +38,7 @@
import org.apache.asterix.optimizer.rules.CheckFilterExpressionTypeRule;
import org.apache.asterix.optimizer.rules.CheckFullParallelSortRule;
import org.apache.asterix.optimizer.rules.CheckInsertUpsertReturningRule;
+import org.apache.asterix.optimizer.rules.CleanupWriteOperatorRule;
import org.apache.asterix.optimizer.rules.ConstantFoldingRule;
import org.apache.asterix.optimizer.rules.CountVarToCountOneRule;
import org.apache.asterix.optimizer.rules.DisjunctivePredicateToJoinRule;
@@ -357,6 +358,7 @@
// RemoveRedundantBooleanExpressionsInJoinRule has to run first to probably eliminate the need for
// introducing an assign operator in ExtractSimilarVariablesInJoinRule
planCleanupRules.add(new ExtractRedundantVariablesInJoinRule());
+ planCleanupRules.add(new CleanupWriteOperatorRule());
// Needs to invoke ByNameToByIndexFieldAccessRule as the last logical optimization rule because
// some rules can push a FieldAccessByName to a place where the name it tries to access is in the closed part.
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CleanupWriteOperatorRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CleanupWriteOperatorRule.java
new file mode 100644
index 0000000..459a29b
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CleanupWriteOperatorRule.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class CleanupWriteOperatorRule implements IAlgebraicRewriteRule {
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ ILogicalOperator op = opRef.getValue();
+ if (op.getOperatorTag() != LogicalOperatorTag.WRITE) {
+ return false;
+ }
+
+ WriteOperator writeOp = (WriteOperator) op;
+ ILogicalExpression pathExpr = writeOp.getPathExpression().getValue();
+ if (pathExpr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+ return false;
+ }
+
+ boolean changed = false;
+ List<Mutable<ILogicalExpression>> partitionExprs = writeOp.getPartitionExpressions();
+ if (!partitionExprs.isEmpty()) {
+ // Useless partition expressions due to having a constant path expression
+ partitionExprs.clear();
+ writeOp.getOrderExpressions().clear();
+ changed = true;
+ }
+
+ return changed;
+ }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
index 256d481..343ff5d 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
@@ -19,145 +19,20 @@
package org.apache.asterix.optimizer.rules;
-import java.io.DataInputStream;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.WarningCollector;
-import org.apache.asterix.dataflow.data.common.ExpressionTypeComputer;
-import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
-import org.apache.asterix.dataflow.data.nontagged.NullWriterFactory;
-import org.apache.asterix.formats.nontagged.ADMPrinterFactoryProvider;
-import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
-import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
-import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
-import org.apache.asterix.formats.nontagged.BinaryHashFunctionFamilyProvider;
-import org.apache.asterix.formats.nontagged.BinaryIntegerInspector;
-import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.formats.nontagged.TypeTraitProvider;
-import org.apache.asterix.jobgen.QueryLogicalExpressionJobGen;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.om.base.ADouble;
-import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.om.constants.AsterixConstantValue;
-import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
-import org.apache.asterix.om.types.AbstractCollectionType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.TypeTagUtil;
-import org.apache.asterix.om.utils.ConstantExpressionUtil;
-import org.apache.asterix.runtime.base.UnnestingPositionWriterFactory;
-import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.asterix.optimizer.rules.visitor.ConstantFoldingVisitor;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ExpressionRuntimeProvider;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
-import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
-import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
-import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.NoOpWarningCollector;
-import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import org.apache.hyracks.util.LogRedactionUtil;
-
-import com.google.common.collect.ImmutableMap;
public class ConstantFoldingRule implements IAlgebraicRewriteRule {
- private final ConstantFoldingVisitor cfv = new ConstantFoldingVisitor();
- private final JobGenContext jobGenCtx;
-
- private static final Map<FunctionIdentifier, IAObject> FUNC_ID_TO_CONSTANT = ImmutableMap
- .of(BuiltinFunctions.NUMERIC_E, new ADouble(Math.E), BuiltinFunctions.NUMERIC_PI, new ADouble(Math.PI));
-
- /**
- * Throws exceptions in substituteProducedVariable, setVarType, and one getVarType method.
- */
- private static final IVariableTypeEnvironment _emptyTypeEnv = new IVariableTypeEnvironment() {
-
- @Override
- public boolean substituteProducedVariable(LogicalVariable v1, LogicalVariable v2) {
- throw new IllegalStateException();
- }
-
- @Override
- public void setVarType(LogicalVariable var, Object type) {
- throw new IllegalStateException();
- }
-
- @Override
- public Object getVarType(LogicalVariable var, List<LogicalVariable> nonMissableVariables,
- List<List<LogicalVariable>> correlatedMissableVariableLists, List<LogicalVariable> nonNullableVariables,
- List<List<LogicalVariable>> correlatedNullableVariableLists) {
- throw new IllegalStateException();
- }
-
- @Override
- public Object getVarType(LogicalVariable var) {
- throw new IllegalStateException();
- }
-
- @Override
- public Object getType(ILogicalExpression expr) throws AlgebricksException {
- return ExpressionTypeComputer.INSTANCE.getType(expr, null, this);
- }
- };
-
- private static final IOperatorSchema[] _emptySchemas = new IOperatorSchema[] {};
+ private final ConstantFoldingVisitor cfv;
public ConstantFoldingRule(ICcApplicationContext appCtx) {
- MetadataProvider metadataProvider = MetadataProvider.createWithDefaultNamespace(appCtx);
- jobGenCtx = new JobGenContext(null, metadataProvider, appCtx, SerializerDeserializerProvider.INSTANCE,
- BinaryHashFunctionFactoryProvider.INSTANCE, BinaryHashFunctionFamilyProvider.INSTANCE,
- BinaryComparatorFactoryProvider.INSTANCE, TypeTraitProvider.INSTANCE, BinaryBooleanInspector.FACTORY,
- BinaryIntegerInspector.FACTORY, ADMPrinterFactoryProvider.INSTANCE, PrinterBasedWriterFactory.INSTANCE,
- ResultSerializerFactoryProvider.INSTANCE, MissingWriterFactory.INSTANCE, NullWriterFactory.INSTANCE,
- UnnestingPositionWriterFactory.INSTANCE, null,
- new ExpressionRuntimeProvider(new QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager())),
- ExpressionTypeComputer.INSTANCE, null, null, null, null, GlobalConfig.DEFAULT_FRAME_SIZE, null,
- NoOpWarningCollector.INSTANCE, 0, new PhysicalOptimizationConfig());
+ cfv = new ConstantFoldingVisitor(appCtx);
}
@Override
@@ -176,332 +51,4 @@
cfv.reset(context);
return op.acceptExpressionTransform(cfv);
}
-
- private class ConstantFoldingVisitor implements ILogicalExpressionVisitor<Pair<Boolean, ILogicalExpression>, Void>,
- ILogicalExpressionReferenceTransform, IEvaluatorContext {
-
- private final IPointable p = VoidPointable.FACTORY.createPointable();
- private final ByteBufferInputStream bbis = new ByteBufferInputStream();
- private final DataInputStream dis = new DataInputStream(bbis);
- private final WarningCollector warningCollector = new WarningCollector();
- private IOptimizationContext optContext;
- private IServiceContext serviceContext;
-
- private void reset(IOptimizationContext context) {
- optContext = context;
- serviceContext =
- ((MetadataProvider) context.getMetadataProvider()).getApplicationContext().getServiceContext();
- }
-
- @Override
- public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
- AbstractLogicalExpression expr = (AbstractLogicalExpression) exprRef.getValue();
- Pair<Boolean, ILogicalExpression> newExpression = expr.accept(this, null);
- if (newExpression.first) {
- exprRef.setValue(newExpression.second);
- }
- return newExpression.first;
- }
-
- @Override
- public Pair<Boolean, ILogicalExpression> visitConstantExpression(ConstantExpression expr, Void arg) {
- return new Pair<>(false, expr);
- }
-
- @Override
- public Pair<Boolean, ILogicalExpression> visitVariableReferenceExpression(VariableReferenceExpression expr,
- Void arg) {
- return new Pair<>(false, expr);
- }
-
- @Override
- public Pair<Boolean, ILogicalExpression> visitScalarFunctionCallExpression(ScalarFunctionCallExpression expr,
- Void arg) throws AlgebricksException {
- boolean changed = constantFoldArgs(expr, arg);
- List<Mutable<ILogicalExpression>> argList = expr.getArguments();
- int argConstantCount = countConstantArgs(argList);
- FunctionIdentifier fid = expr.getFunctionIdentifier();
- if (argConstantCount != argList.size()) {
- if (argConstantCount > 0 && (BuiltinFunctions.OR.equals(fid) || BuiltinFunctions.AND.equals(fid))) {
- if (foldOrAndArgs(expr)) {
- ILogicalExpression changedExpr =
- expr.getArguments().size() == 1 ? expr.getArguments().get(0).getValue() : expr;
- return new Pair<>(true, changedExpr);
- }
- }
- return new Pair<>(changed, expr);
- }
-
- if (!expr.isFunctional() || !canConstantFold(expr)) {
- return new Pair<>(changed, expr);
- }
-
- try {
- if (BuiltinFunctions.FIELD_ACCESS_BY_NAME.equals(fid)) {
- IAType argType = (IAType) _emptyTypeEnv.getType(expr.getArguments().get(0).getValue());
- if (argType.getTypeTag() == ATypeTag.OBJECT) {
- ARecordType rt = (ARecordType) argType;
- String str = ConstantExpressionUtil.getStringConstant(expr.getArguments().get(1).getValue());
- int k = rt.getFieldIndex(str);
- if (k >= 0) {
- // wait for the ByNameToByIndex rule to apply
- return new Pair<>(changed, expr);
- }
- }
- }
- IAObject c = FUNC_ID_TO_CONSTANT.get(fid);
- if (c != null) {
- ConstantExpression constantExpression = new ConstantExpression(new AsterixConstantValue(c));
- constantExpression.setSourceLocation(expr.getSourceLocation());
- return new Pair<>(true, constantExpression);
- }
-
- IScalarEvaluatorFactory fact = jobGenCtx.getExpressionRuntimeProvider().createEvaluatorFactory(expr,
- _emptyTypeEnv, _emptySchemas, jobGenCtx);
-
- warningCollector.clear();
- IScalarEvaluator eval = fact.createScalarEvaluator(this);
- eval.evaluate(null, p);
- IAType returnType = (IAType) _emptyTypeEnv.getType(expr);
- ATypeTag runtimeType = PointableHelper.getTypeTag(p);
- if (runtimeType.isDerivedType()) {
- returnType = TypeComputeUtils.getActualType(returnType);
- } else {
- returnType = TypeTagUtil.getBuiltinTypeByTag(runtimeType);
- }
- @SuppressWarnings("rawtypes")
- ISerializerDeserializer serde =
- jobGenCtx.getSerializerDeserializerProvider().getSerializerDeserializer(returnType);
- bbis.setByteBuffer(ByteBuffer.wrap(p.getByteArray(), p.getStartOffset(), p.getLength()), 0);
- IAObject o = (IAObject) serde.deserialize(dis);
- warningCollector.getWarnings(optContext.getWarningCollector());
- ConstantExpression constantExpression = new ConstantExpression(new AsterixConstantValue(o));
- constantExpression.setSourceLocation(expr.getSourceLocation());
- return new Pair<>(true, constantExpression);
- } catch (HyracksDataException | AlgebricksException e) {
- if (AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled()) {
- AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Exception caught at constant folding: " + e, e);
- }
- return new Pair<>(false, null);
- }
- }
-
- @Override
- public Pair<Boolean, ILogicalExpression> visitAggregateFunctionCallExpression(
- AggregateFunctionCallExpression expr, Void arg) throws AlgebricksException {
- boolean changed = constantFoldArgs(expr, arg);
- return new Pair<>(changed, expr);
- }
-
- @Override
- public Pair<Boolean, ILogicalExpression> visitStatefulFunctionCallExpression(
- StatefulFunctionCallExpression expr, Void arg) throws AlgebricksException {
- boolean changed = constantFoldArgs(expr, arg);
- return new Pair<>(changed, expr);
- }
-
- @Override
- public Pair<Boolean, ILogicalExpression> visitUnnestingFunctionCallExpression(
- UnnestingFunctionCallExpression expr, Void arg) throws AlgebricksException {
- boolean changed = constantFoldArgs(expr, arg);
- return new Pair<>(changed, expr);
- }
-
- private boolean constantFoldArgs(AbstractFunctionCallExpression expr, Void arg) throws AlgebricksException {
- return expr.getFunctionIdentifier().equals(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)
- ? foldRecordArgs(expr, arg) : foldFunctionArgs(expr, arg);
- }
-
- private boolean foldFunctionArgs(AbstractFunctionCallExpression expr, Void arg) throws AlgebricksException {
- boolean changed = false;
- for (Mutable<ILogicalExpression> exprArgRef : expr.getArguments()) {
- changed |= foldArg(exprArgRef, arg);
- }
- return changed;
- }
-
- private boolean foldRecordArgs(AbstractFunctionCallExpression expr, Void arg) throws AlgebricksException {
- if (expr.getArguments().size() % 2 != 0) {
- String functionName = expr.getFunctionIdentifier().getName();
- throw CompilationException.create(ErrorCode.COMPILATION_INVALID_NUM_OF_ARGS, expr.getSourceLocation(),
- functionName);
- }
- boolean changed = false;
- Iterator<Mutable<ILogicalExpression>> iterator = expr.getArguments().iterator();
- int fieldNameIdx = 0;
- while (iterator.hasNext()) {
- Mutable<ILogicalExpression> fieldNameExprRef = iterator.next();
- Pair<Boolean, ILogicalExpression> fieldNameExpr = fieldNameExprRef.getValue().accept(this, arg);
- boolean isDuplicate = false;
- if (fieldNameExpr.first) {
- String fieldName = ConstantExpressionUtil.getStringConstant(fieldNameExpr.second);
- if (fieldName != null) {
- isDuplicate = isDuplicateField(fieldName, fieldNameIdx, expr.getArguments());
- }
- if (isDuplicate) {
- IWarningCollector warningCollector = optContext.getWarningCollector();
- if (warningCollector.shouldWarn()) {
- warningCollector.warn(Warning.of(fieldNameExpr.second.getSourceLocation(),
- ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME, LogRedactionUtil.userData(fieldName)));
- }
- iterator.remove();
- iterator.next();
- iterator.remove();
- } else {
- fieldNameExprRef.setValue(fieldNameExpr.second);
- }
- changed = true;
- }
- if (!isDuplicate) {
- Mutable<ILogicalExpression> fieldValue = iterator.next();
- changed |= foldArg(fieldValue, arg);
- fieldNameIdx += 2;
- }
- }
- return changed;
- }
-
- private boolean isDuplicateField(String fName, int fIdx, List<Mutable<ILogicalExpression>> args) {
- for (int i = 0, size = args.size(); i < size; i += 2) {
- if (i != fIdx && fName.equals(ConstantExpressionUtil.getStringConstant(args.get(i).getValue()))) {
- return true;
- }
- }
- return false;
- }
-
- private boolean foldArg(Mutable<ILogicalExpression> exprArgRef, Void arg) throws AlgebricksException {
- Pair<Boolean, ILogicalExpression> newExpr = exprArgRef.getValue().accept(this, arg);
- if (newExpr.first) {
- exprArgRef.setValue(newExpr.second);
- return true;
- }
- return false;
- }
-
- private int countConstantArgs(List<Mutable<ILogicalExpression>> argList) {
- int n = 0;
- for (Mutable<ILogicalExpression> r : argList) {
- if (r.getValue().getExpressionTag() == LogicalExpressionTag.CONSTANT) {
- n++;
- }
- }
- return n;
- }
-
- private boolean canConstantFold(ScalarFunctionCallExpression function) throws AlgebricksException {
- // skip external functions because they're not available at compile time (on CC)
- IFunctionInfo fi = function.getFunctionInfo();
- if (fi.isExternal()) {
- return false;
- }
- IAType returnType = (IAType) _emptyTypeEnv.getType(function);
- // skip all functions that would produce records/arrays/multisets (derived types) in their open format
- // this is because constant folding them will make them closed (currently)
- if (function.getFunctionIdentifier().equals(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)) {
- if (returnType.getTypeTag() != ATypeTag.OBJECT || ((ARecordType) returnType).isOpen()) {
- return false;
- }
- }
- return canConstantFoldType(returnType);
- }
-
- private boolean canConstantFoldType(IAType returnType) {
- ATypeTag tag = returnType.getTypeTag();
- if (tag == ATypeTag.ANY) {
- // if the function is to return a record (or derived data), that record would (should) be an open record
- return false;
- } else if (tag == ATypeTag.OBJECT) {
- ARecordType recordType = (ARecordType) returnType;
- if (recordType.isOpen()) {
- return false;
- }
- IAType[] fieldTypes = recordType.getFieldTypes();
- for (int i = 0; i < fieldTypes.length; i++) {
- if (!canConstantFoldType(fieldTypes[i])) {
- return false;
- }
- }
- } else if (tag.isListType()) {
- AbstractCollectionType listType = (AbstractCollectionType) returnType;
- return canConstantFoldType(listType.getItemType());
- } else if (tag == ATypeTag.UNION) {
- return canConstantFoldType(((AUnionType) returnType).getActualType());
- }
- return true;
- }
-
- private boolean foldOrAndArgs(ScalarFunctionCallExpression expr) {
- // or(true,x,y) -> true; or(false,x,y) -> or(x,y)
- boolean changed = false;
- List<Mutable<ILogicalExpression>> argList = expr.getArguments();
- Iterator<Mutable<ILogicalExpression>> argIter = argList.iterator();
- Mutable<ILogicalExpression> argFalse = null;
- while (argIter.hasNext()) {
- Mutable<ILogicalExpression> argExprRef = argIter.next();
- ILogicalExpression argExpr = argExprRef.getValue();
- if (argExpr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
- continue;
- }
-
- ConstantExpression cExpr = (ConstantExpression) argExpr;
- IAlgebricksConstantValue cValue = cExpr.getValue();
- FunctionIdentifier fid = expr.getFunctionIdentifier();
-
- if (replaceAndReturn(cValue, fid)) {
- // or(true,x,y) -> true;
- // and(false, x, y) -> false
- argList.clear();
- argList.add(argExprRef);
- return true;
- } else if (removeAndContinue(cValue, fid)) {
- // or(false, x, y) -> or(x, y)
- // and(true, x, y) -> and(x, y)
- // remove 'false' (or 'true') from arg list, but save the expression.
- argFalse = argExprRef;
- argIter.remove();
- changed = true;
- }
- }
- if (argList.isEmpty() && argFalse != null) {
- argList.add(argFalse);
- }
- return changed;
- }
-
- private boolean replaceAndReturn(IAlgebricksConstantValue cValue, FunctionIdentifier fid) {
- if (BuiltinFunctions.OR.equals(fid)) {
- return cValue.isTrue();
- } else {
- // BuiltinFunctions.AND
- return cValue.isFalse();
- }
- }
-
- private boolean removeAndContinue(IAlgebricksConstantValue cValue, FunctionIdentifier fid) {
- if (BuiltinFunctions.OR.equals(fid)) {
- return cValue.isFalse();
- } else {
- // BuiltinFunctions.AND
- return cValue.isTrue();
- }
- }
-
- // IEvaluatorContext
-
- @Override
- public IServiceContext getServiceContext() {
- return serviceContext;
- }
-
- @Override
- public IHyracksTaskContext getTaskContext() {
- return null;
- }
-
- @Override
- public IWarningCollector getWarningCollector() {
- return warningCollector;
- }
- }
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/ConstantFoldingVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/ConstantFoldingVisitor.java
new file mode 100644
index 0000000..0feb6c5
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/ConstantFoldingVisitor.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.visitor;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.WarningCollector;
+import org.apache.asterix.dataflow.data.common.ExpressionTypeComputer;
+import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
+import org.apache.asterix.dataflow.data.nontagged.NullWriterFactory;
+import org.apache.asterix.formats.nontagged.ADMPrinterFactoryProvider;
+import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
+import org.apache.asterix.formats.nontagged.BinaryHashFunctionFamilyProvider;
+import org.apache.asterix.formats.nontagged.BinaryIntegerInspector;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.jobgen.QueryLogicalExpressionJobGen;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.runtime.base.UnnestingPositionWriterFactory;
+import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ExpressionRuntimeProvider;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
+import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
+import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.NoOpWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.hyracks.util.LogRedactionUtil;
+
+import com.google.common.collect.ImmutableMap;
+
+public class ConstantFoldingVisitor implements ILogicalExpressionVisitor<Pair<Boolean, ILogicalExpression>, Void>,
+ ILogicalExpressionReferenceTransform, IEvaluatorContext {
+
+ /**
+ * Throws exceptions in substituteProducedVariable, setVarType, and one getVarType method.
+ */
+ private static final IVariableTypeEnvironment _emptyTypeEnv = new IVariableTypeEnvironment() {
+
+ @Override
+ public boolean substituteProducedVariable(LogicalVariable v1, LogicalVariable v2) {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public void setVarType(LogicalVariable var, Object type) {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public Object getVarType(LogicalVariable var, List<LogicalVariable> nonMissableVariables,
+ List<List<LogicalVariable>> correlatedMissableVariableLists, List<LogicalVariable> nonNullableVariables,
+ List<List<LogicalVariable>> correlatedNullableVariableLists) {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public Object getVarType(LogicalVariable var) {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public Object getType(ILogicalExpression expr) throws AlgebricksException {
+ return ExpressionTypeComputer.INSTANCE.getType(expr, null, this);
+ }
+ };
+
+ private static final IOperatorSchema[] _emptySchemas = new IOperatorSchema[] {};
+ private static final Map<FunctionIdentifier, IAObject> FUNC_ID_TO_CONSTANT = ImmutableMap
+ .of(BuiltinFunctions.NUMERIC_E, new ADouble(Math.E), BuiltinFunctions.NUMERIC_PI, new ADouble(Math.PI));
+ private final JobGenContext jobGenCtx;
+ private final IPointable p = VoidPointable.FACTORY.createPointable();
+ private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+ private final DataInputStream dis = new DataInputStream(bbis);
+ private final WarningCollector warningCollector = new WarningCollector();
+ private IOptimizationContext optContext;
+ private IServiceContext serviceContext;
+
+ public ConstantFoldingVisitor(ICcApplicationContext appCtx) {
+ MetadataProvider metadataProvider = MetadataProvider.createWithDefaultNamespace(appCtx);
+ jobGenCtx = new JobGenContext(null, metadataProvider, appCtx, SerializerDeserializerProvider.INSTANCE,
+ BinaryHashFunctionFactoryProvider.INSTANCE, BinaryHashFunctionFamilyProvider.INSTANCE,
+ BinaryComparatorFactoryProvider.INSTANCE, TypeTraitProvider.INSTANCE, BinaryBooleanInspector.FACTORY,
+ BinaryIntegerInspector.FACTORY, ADMPrinterFactoryProvider.INSTANCE, PrinterBasedWriterFactory.INSTANCE,
+ ResultSerializerFactoryProvider.INSTANCE, MissingWriterFactory.INSTANCE, NullWriterFactory.INSTANCE,
+ UnnestingPositionWriterFactory.INSTANCE, null,
+ new ExpressionRuntimeProvider(new QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager())),
+ ExpressionTypeComputer.INSTANCE, null, null, null, null, GlobalConfig.DEFAULT_FRAME_SIZE, null,
+ NoOpWarningCollector.INSTANCE, 0, new PhysicalOptimizationConfig());
+ }
+
+ public void reset(IOptimizationContext context) {
+ optContext = context;
+ serviceContext = ((MetadataProvider) context.getMetadataProvider()).getApplicationContext().getServiceContext();
+ }
+
+ @Override
+ public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+ AbstractLogicalExpression expr = (AbstractLogicalExpression) exprRef.getValue();
+ Pair<Boolean, ILogicalExpression> newExpression = expr.accept(this, null);
+ if (newExpression.first) {
+ exprRef.setValue(newExpression.second);
+ }
+ return newExpression.first;
+ }
+
+ @Override
+ public Pair<Boolean, ILogicalExpression> visitConstantExpression(ConstantExpression expr, Void arg) {
+ return new Pair<>(false, expr);
+ }
+
+ @Override
+ public Pair<Boolean, ILogicalExpression> visitVariableReferenceExpression(VariableReferenceExpression expr,
+ Void arg) {
+ return new Pair<>(false, expr);
+ }
+
+ @Override
+ public Pair<Boolean, ILogicalExpression> visitScalarFunctionCallExpression(ScalarFunctionCallExpression expr,
+ Void arg) throws AlgebricksException {
+ boolean changed = constantFoldArgs(expr, arg);
+ List<Mutable<ILogicalExpression>> argList = expr.getArguments();
+ int argConstantCount = countConstantArgs(argList);
+ FunctionIdentifier fid = expr.getFunctionIdentifier();
+ if (argConstantCount != argList.size()) {
+ if (argConstantCount > 0 && (BuiltinFunctions.OR.equals(fid) || BuiltinFunctions.AND.equals(fid))) {
+ if (foldOrAndArgs(expr)) {
+ ILogicalExpression changedExpr =
+ expr.getArguments().size() == 1 ? expr.getArguments().get(0).getValue() : expr;
+ return new Pair<>(true, changedExpr);
+ }
+ }
+ return new Pair<>(changed, expr);
+ }
+
+ if (!expr.isFunctional() || !canConstantFold(expr)) {
+ return new Pair<>(changed, expr);
+ }
+
+ try {
+ if (BuiltinFunctions.FIELD_ACCESS_BY_NAME.equals(fid)) {
+ IAType argType = (IAType) _emptyTypeEnv.getType(expr.getArguments().get(0).getValue());
+ if (argType.getTypeTag() == ATypeTag.OBJECT) {
+ ARecordType rt = (ARecordType) argType;
+ String str = ConstantExpressionUtil.getStringConstant(expr.getArguments().get(1).getValue());
+ int k = rt.getFieldIndex(str);
+ if (k >= 0) {
+ // wait for the ByNameToByIndex rule to apply
+ return new Pair<>(changed, expr);
+ }
+ }
+ }
+ IAObject c = FUNC_ID_TO_CONSTANT.get(fid);
+ if (c != null) {
+ ConstantExpression constantExpression = new ConstantExpression(new AsterixConstantValue(c));
+ constantExpression.setSourceLocation(expr.getSourceLocation());
+ return new Pair<>(true, constantExpression);
+ }
+
+ IScalarEvaluatorFactory fact = jobGenCtx.getExpressionRuntimeProvider().createEvaluatorFactory(expr,
+ _emptyTypeEnv, _emptySchemas, jobGenCtx);
+
+ warningCollector.clear();
+ IScalarEvaluator eval = fact.createScalarEvaluator(this);
+ eval.evaluate(null, p);
+ IAType returnType = (IAType) _emptyTypeEnv.getType(expr);
+ ATypeTag runtimeType = PointableHelper.getTypeTag(p);
+ if (runtimeType.isDerivedType()) {
+ returnType = TypeComputeUtils.getActualType(returnType);
+ } else {
+ returnType = TypeTagUtil.getBuiltinTypeByTag(runtimeType);
+ }
+ @SuppressWarnings("rawtypes")
+ ISerializerDeserializer serde =
+ jobGenCtx.getSerializerDeserializerProvider().getSerializerDeserializer(returnType);
+ bbis.setByteBuffer(ByteBuffer.wrap(p.getByteArray(), p.getStartOffset(), p.getLength()), 0);
+ IAObject o = (IAObject) serde.deserialize(dis);
+ warningCollector.getWarnings(optContext.getWarningCollector());
+ ConstantExpression constantExpression = new ConstantExpression(new AsterixConstantValue(o));
+ constantExpression.setSourceLocation(expr.getSourceLocation());
+ return new Pair<>(true, constantExpression);
+ } catch (HyracksDataException | AlgebricksException e) {
+ if (AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled()) {
+ AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Exception caught at constant folding: " + e, e);
+ }
+ return new Pair<>(false, null);
+ }
+ }
+
+ @Override
+ public Pair<Boolean, ILogicalExpression> visitAggregateFunctionCallExpression(AggregateFunctionCallExpression expr,
+ Void arg) throws AlgebricksException {
+ boolean changed = constantFoldArgs(expr, arg);
+ return new Pair<>(changed, expr);
+ }
+
+ @Override
+ public Pair<Boolean, ILogicalExpression> visitStatefulFunctionCallExpression(StatefulFunctionCallExpression expr,
+ Void arg) throws AlgebricksException {
+ boolean changed = constantFoldArgs(expr, arg);
+ return new Pair<>(changed, expr);
+ }
+
+ @Override
+ public Pair<Boolean, ILogicalExpression> visitUnnestingFunctionCallExpression(UnnestingFunctionCallExpression expr,
+ Void arg) throws AlgebricksException {
+ boolean changed = constantFoldArgs(expr, arg);
+ return new Pair<>(changed, expr);
+ }
+
+ private boolean constantFoldArgs(AbstractFunctionCallExpression expr, Void arg) throws AlgebricksException {
+ return expr.getFunctionIdentifier().equals(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR) ? foldRecordArgs(expr, arg)
+ : foldFunctionArgs(expr, arg);
+ }
+
+ private boolean foldFunctionArgs(AbstractFunctionCallExpression expr, Void arg) throws AlgebricksException {
+ boolean changed = false;
+ for (Mutable<ILogicalExpression> exprArgRef : expr.getArguments()) {
+ changed |= foldArg(exprArgRef, arg);
+ }
+ return changed;
+ }
+
+ private boolean foldRecordArgs(AbstractFunctionCallExpression expr, Void arg) throws AlgebricksException {
+ if (expr.getArguments().size() % 2 != 0) {
+ String functionName = expr.getFunctionIdentifier().getName();
+ throw CompilationException.create(ErrorCode.COMPILATION_INVALID_NUM_OF_ARGS, expr.getSourceLocation(),
+ functionName);
+ }
+ boolean changed = false;
+ Iterator<Mutable<ILogicalExpression>> iterator = expr.getArguments().iterator();
+ int fieldNameIdx = 0;
+ while (iterator.hasNext()) {
+ Mutable<ILogicalExpression> fieldNameExprRef = iterator.next();
+ Pair<Boolean, ILogicalExpression> fieldNameExpr = fieldNameExprRef.getValue().accept(this, arg);
+ boolean isDuplicate = false;
+ if (fieldNameExpr.first) {
+ String fieldName = ConstantExpressionUtil.getStringConstant(fieldNameExpr.second);
+ if (fieldName != null) {
+ isDuplicate = isDuplicateField(fieldName, fieldNameIdx, expr.getArguments());
+ }
+ if (isDuplicate) {
+ IWarningCollector warningCollector = optContext.getWarningCollector();
+ if (warningCollector.shouldWarn()) {
+ warningCollector.warn(Warning.of(fieldNameExpr.second.getSourceLocation(),
+ ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME, LogRedactionUtil.userData(fieldName)));
+ }
+ iterator.remove();
+ iterator.next();
+ iterator.remove();
+ } else {
+ fieldNameExprRef.setValue(fieldNameExpr.second);
+ }
+ changed = true;
+ }
+ if (!isDuplicate) {
+ Mutable<ILogicalExpression> fieldValue = iterator.next();
+ changed |= foldArg(fieldValue, arg);
+ fieldNameIdx += 2;
+ }
+ }
+ return changed;
+ }
+
+ private boolean isDuplicateField(String fName, int fIdx, List<Mutable<ILogicalExpression>> args) {
+ for (int i = 0, size = args.size(); i < size; i += 2) {
+ if (i != fIdx && fName.equals(ConstantExpressionUtil.getStringConstant(args.get(i).getValue()))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean foldArg(Mutable<ILogicalExpression> exprArgRef, Void arg) throws AlgebricksException {
+ Pair<Boolean, ILogicalExpression> newExpr = exprArgRef.getValue().accept(this, arg);
+ if (newExpr.first) {
+ exprArgRef.setValue(newExpr.second);
+ return true;
+ }
+ return false;
+ }
+
+ private int countConstantArgs(List<Mutable<ILogicalExpression>> argList) {
+ int n = 0;
+ for (Mutable<ILogicalExpression> r : argList) {
+ if (r.getValue().getExpressionTag() == LogicalExpressionTag.CONSTANT) {
+ n++;
+ }
+ }
+ return n;
+ }
+
+ private boolean canConstantFold(ScalarFunctionCallExpression function) throws AlgebricksException {
+ // skip external functions because they're not available at compile time (on CC)
+ IFunctionInfo fi = function.getFunctionInfo();
+ if (fi.isExternal()) {
+ return false;
+ }
+ IAType returnType = (IAType) _emptyTypeEnv.getType(function);
+ // skip all functions that would produce records/arrays/multisets (derived types) in their open format
+ // this is because constant folding them will make them closed (currently)
+ if (function.getFunctionIdentifier().equals(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)) {
+ if (returnType.getTypeTag() != ATypeTag.OBJECT || ((ARecordType) returnType).isOpen()) {
+ return false;
+ }
+ }
+ return canConstantFoldType(returnType);
+ }
+
+ private boolean canConstantFoldType(IAType returnType) {
+ ATypeTag tag = returnType.getTypeTag();
+ if (tag == ATypeTag.ANY) {
+ // if the function is to return a record (or derived data), that record would (should) be an open record
+ return false;
+ } else if (tag == ATypeTag.OBJECT) {
+ ARecordType recordType = (ARecordType) returnType;
+ if (recordType.isOpen()) {
+ return false;
+ }
+ IAType[] fieldTypes = recordType.getFieldTypes();
+ for (int i = 0; i < fieldTypes.length; i++) {
+ if (!canConstantFoldType(fieldTypes[i])) {
+ return false;
+ }
+ }
+ } else if (tag.isListType()) {
+ AbstractCollectionType listType = (AbstractCollectionType) returnType;
+ return canConstantFoldType(listType.getItemType());
+ } else if (tag == ATypeTag.UNION) {
+ return canConstantFoldType(((AUnionType) returnType).getActualType());
+ }
+ return true;
+ }
+
+ private boolean foldOrAndArgs(ScalarFunctionCallExpression expr) {
+ // or(true,x,y) -> true; or(false,x,y) -> or(x,y)
+ boolean changed = false;
+ List<Mutable<ILogicalExpression>> argList = expr.getArguments();
+ Iterator<Mutable<ILogicalExpression>> argIter = argList.iterator();
+ Mutable<ILogicalExpression> argFalse = null;
+ while (argIter.hasNext()) {
+ Mutable<ILogicalExpression> argExprRef = argIter.next();
+ ILogicalExpression argExpr = argExprRef.getValue();
+ if (argExpr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+ continue;
+ }
+
+ ConstantExpression cExpr = (ConstantExpression) argExpr;
+ IAlgebricksConstantValue cValue = cExpr.getValue();
+ FunctionIdentifier fid = expr.getFunctionIdentifier();
+
+ if (replaceAndReturn(cValue, fid)) {
+ // or(true,x,y) -> true;
+ // and(false, x, y) -> false
+ argList.clear();
+ argList.add(argExprRef);
+ return true;
+ } else if (removeAndContinue(cValue, fid)) {
+ // or(false, x, y) -> or(x, y)
+ // and(true, x, y) -> and(x, y)
+ // remove 'false' (or 'true') from arg list, but save the expression.
+ argFalse = argExprRef;
+ argIter.remove();
+ changed = true;
+ }
+ }
+ if (argList.isEmpty() && argFalse != null) {
+ argList.add(argFalse);
+ }
+ return changed;
+ }
+
+ private boolean replaceAndReturn(IAlgebricksConstantValue cValue, FunctionIdentifier fid) {
+ if (BuiltinFunctions.OR.equals(fid)) {
+ return cValue.isTrue();
+ } else {
+ // BuiltinFunctions.AND
+ return cValue.isFalse();
+ }
+ }
+
+ private boolean removeAndContinue(IAlgebricksConstantValue cValue, FunctionIdentifier fid) {
+ if (BuiltinFunctions.OR.equals(fid)) {
+ return cValue.isFalse();
+ } else {
+ // BuiltinFunctions.AND
+ return cValue.isTrue();
+ }
+ }
+
+ // IEvaluatorContext
+
+ @Override
+ public IServiceContext getServiceContext() {
+ return serviceContext;
+ }
+
+ @Override
+ public IHyracksTaskContext getTaskContext() {
+ return null;
+ }
+
+ @Override
+ public IWarningCollector getWarningCollector() {
+ return warningCollector;
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index fc63d89..28bce7e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -22,7 +22,6 @@
import static org.apache.asterix.common.metadata.MetadataConstants.METADATA_OBJECT_NAME_INVALID_CHARS;
import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
-import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -108,6 +107,7 @@
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.common.utils.Quadruple;
import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -123,6 +123,7 @@
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
@@ -134,8 +135,8 @@
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
@@ -740,19 +741,14 @@
}
@Override
- public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
- int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
- RecordDescriptor inputDesc) {
- FileSplitDataSink fsds = (FileSplitDataSink) sink;
- FileSplitSinkId fssi = fsds.getId();
- FileSplit fs = fssi.getFileSplit();
- File outFile = new File(fs.getPath());
- String nodeId = fs.getNodeName();
+ public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(int sourceColumn,
+ int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
+ IScalarEvaluatorFactory dynamicPathEvalFactory, ILogicalExpression staticPathExpr,
+ SourceLocation pathSourceLocation, IWriteDataSink sink, RecordDescriptor inputDesc, Object sourceType)
+ throws AlgebricksException {
- SinkWriterRuntimeFactory runtime =
- new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile, writerFactory, inputDesc);
- AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
- return new Pair<>(runtime, apc);
+ // TODO implement
+ throw new NotImplementedException();
}
@Override
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
new file mode 100644
index 0000000..753ac54
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.declared;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
+
+public class WriteDataSink implements IWriteDataSink {
+ private final String adapterName;
+ private final Map<String, String> configuration;
+
+ public WriteDataSink(String adapterName, Map<String, String> configuration) {
+ this.adapterName = adapterName;
+ this.configuration = configuration;
+ }
+
+ private WriteDataSink(WriteDataSink writeDataSink) {
+ this.adapterName = writeDataSink.getAdapterName();
+ this.configuration = new HashMap<>(writeDataSink.configuration);
+ }
+
+ @Override
+ public final String getAdapterName() {
+ return adapterName;
+ }
+
+ @Override
+ public final Map<String, String> getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public IWriteDataSink createCopy() {
+ return new WriteDataSink(this);
+ }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/ConstantExpressionUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/ConstantExpressionUtil.java
index cc05183..72f659d 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/ConstantExpressionUtil.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/ConstantExpressionUtil.java
@@ -19,6 +19,7 @@
package org.apache.asterix.om.utils;
import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.ADouble;
import org.apache.asterix.om.base.AInt32;
import org.apache.asterix.om.base.AInt64;
import org.apache.asterix.om.base.AOrderedList;
@@ -32,6 +33,7 @@
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import org.apache.hyracks.api.exceptions.SourceLocation;
public class ConstantExpressionUtil {
@@ -115,4 +117,22 @@
return expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL
? getStringArgument((AbstractFunctionCallExpression) expr, index) : null;
}
+
+ public static ConstantExpression create(String value, SourceLocation sourceLocation) {
+ return createExpression(new AString(value), sourceLocation);
+ }
+
+ public static ConstantExpression create(long value, SourceLocation sourceLocation) {
+ return createExpression(new AInt64(value), sourceLocation);
+ }
+
+ public static ConstantExpression create(double value, SourceLocation sourceLocation) {
+ return createExpression(new ADouble(value), sourceLocation);
+ }
+
+ private static ConstantExpression createExpression(IAObject value, SourceLocation sourceLocation) {
+ ConstantExpression constExpr = new ConstantExpression(new AsterixConstantValue(value));
+ constExpr.setSourceLocation(sourceLocation);
+ return constExpr;
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 4f3d8e4..2072dee 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -36,8 +36,11 @@
import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
@@ -56,8 +59,10 @@
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig,
IProjectionFiltrationInfo projectionFiltrationInfo) throws AlgebricksException;
- Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink, int[] printColumns,
- IPrinterFactory[] printerFactories, IAWriterFactory writerFactory, RecordDescriptor inputDesc)
+ Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(int sourceColumn,
+ int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
+ IScalarEvaluatorFactory dynamicPathEvalFactory, ILogicalExpression staticPathExpr,
+ SourceLocation pathSourceLocation, IWriteDataSink sink, RecordDescriptor inputDesc, Object sourceType)
throws AlgebricksException;
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink, int[] printColumns,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IWriteDataSink.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IWriteDataSink.java
new file mode 100644
index 0000000..fa7a55e
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IWriteDataSink.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.metadata;
+
+import java.util.Map;
+
+public interface IWriteDataSink {
+ String getAdapterName();
+
+ Map<String, String> getConfiguration();
+
+ IWriteDataSink createCopy();
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
index 61b7796..7eef90e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
@@ -23,31 +23,77 @@
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
public class WriteOperator extends AbstractLogicalOperator {
- private List<Mutable<ILogicalExpression>> expressions;
- private IDataSink dataSink;
+ private final Mutable<ILogicalExpression> sourceExpression;
+ private final Mutable<ILogicalExpression> pathExpression;
+ private final List<Mutable<ILogicalExpression>> partitionExpressions;
+ private final List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions;
+ private final IWriteDataSink writeDataSink;
- public WriteOperator(List<Mutable<ILogicalExpression>> expressions, IDataSink dataSink) {
- this.expressions = expressions;
- this.dataSink = dataSink;
+ public WriteOperator(Mutable<ILogicalExpression> sourceExpression, Mutable<ILogicalExpression> pathExpression,
+ List<Mutable<ILogicalExpression>> partitionExpressions,
+ List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions,
+ IWriteDataSink writeDataSink) {
+ this.sourceExpression = sourceExpression;
+ this.pathExpression = pathExpression;
+ this.partitionExpressions = partitionExpressions;
+ this.orderExpressions = orderExpressions;
+ this.writeDataSink = writeDataSink;
}
- public List<Mutable<ILogicalExpression>> getExpressions() {
- return expressions;
+ public Mutable<ILogicalExpression> getSourceExpression() {
+ return sourceExpression;
}
- public IDataSink getDataSink() {
- return dataSink;
+ public LogicalVariable getSourceVariable() {
+ return VariableUtilities.getVariable(sourceExpression.getValue());
+ }
+
+ public Mutable<ILogicalExpression> getPathExpression() {
+ return pathExpression;
+ }
+
+ public List<Mutable<ILogicalExpression>> getPartitionExpressions() {
+ return partitionExpressions;
+ }
+
+ public List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> getOrderExpressions() {
+ return orderExpressions;
+ }
+
+ public List<LogicalVariable> getPartitionVariables() {
+ List<LogicalVariable> partitionVariables = new ArrayList<>();
+ for (Mutable<ILogicalExpression> partitionExpression : partitionExpressions) {
+ partitionVariables.add(VariableUtilities.getVariable(partitionExpression.getValue()));
+ }
+ return partitionVariables;
+ }
+
+ public List<OrderColumn> getOrderColumns() {
+ List<OrderColumn> orderColumns = new ArrayList<>();
+ for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> orderExpressionPair : orderExpressions) {
+ LogicalVariable variable = VariableUtilities.getVariable(orderExpressionPair.getSecond().getValue());
+ OrderOperator.IOrder.OrderKind kind = orderExpressionPair.first.getKind();
+ orderColumns.add(new OrderColumn(variable, kind));
+ }
+ return orderColumns;
+ }
+
+ public IWriteDataSink getWriteDataSink() {
+ return writeDataSink;
}
@Override
@@ -62,35 +108,37 @@
@Override
public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
- boolean modif = false;
- for (int i = 0; i < expressions.size(); i++) {
- boolean b = visitor.transform(expressions.get(i));
- if (b) {
- modif = true;
- }
+ boolean changed = visitor.transform(sourceExpression);
+ changed |= visitor.transform(pathExpression);
+
+ for (Mutable<ILogicalExpression> expression : partitionExpressions) {
+ changed |= visitor.transform(expression);
}
- return modif;
+
+ for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> orderExpressionPair : orderExpressions) {
+ changed |= visitor.transform(orderExpressionPair.second);
+ }
+
+ return changed;
}
@Override
public VariablePropagationPolicy getVariablePropagationPolicy() {
- return VariablePropagationPolicy.ALL;
+ return VariablePropagationPolicy.NONE;
}
@Override
public boolean isMap() {
- return false; // actually depends on the physical op.
+ return true;
}
@Override
public void recomputeSchema() {
- schema = new ArrayList<LogicalVariable>();
- schema.addAll(inputs.get(0).getValue().getSchema());
+ schema = new ArrayList<>(inputs.get(0).getValue().getSchema());
}
@Override
public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
return createPropagatingAllInputsTypeEnvironment(ctx);
}
-
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 960e399..b3828df 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -35,6 +35,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -289,9 +290,15 @@
@Override
public ILogicalOperator visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
- ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
- deepCopyExpressionRefs(newExpressions, op.getExpressions());
- return new WriteOperator(newExpressions, op.getDataSink());
+ Mutable<ILogicalExpression> newSourceExpression = deepCopyExpressionRef(op.getSourceExpression());
+ Mutable<ILogicalExpression> newPathExpression = deepCopyExpressionRef(op.getPathExpression());
+ List<Mutable<ILogicalExpression>> newPartitionExpressions =
+ deepCopyExpressionRefs(new ArrayList<>(), op.getPartitionExpressions());
+ List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrderPairExpressions =
+ deepCopyOrderAndExpression(op.getOrderExpressions());
+ IWriteDataSink writeDataSink = op.getWriteDataSink().createCopy();
+ return new WriteOperator(newSourceExpression, newPathExpression, newPartitionExpressions,
+ newOrderPairExpressions, writeDataSink);
}
@Override
@@ -369,11 +376,12 @@
return new SinkOperator();
}
- private void deepCopyExpressionRefs(List<Mutable<ILogicalExpression>> newExprs,
+ private List<Mutable<ILogicalExpression>> deepCopyExpressionRefs(List<Mutable<ILogicalExpression>> newExprs,
List<Mutable<ILogicalExpression>> oldExprs) {
for (Mutable<ILogicalExpression> oldExpr : oldExprs) {
newExprs.add(new MutableObject<>(oldExpr.getValue().cloneExpression()));
}
+ return newExprs;
}
private Mutable<ILogicalExpression> deepCopyExpressionRef(Mutable<ILogicalExpression> oldExprRef) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 74de6f5..4067b62 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -254,7 +254,7 @@
@Override
public Void visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
- standardLayout(op);
+ // Write is akin to project empty
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index faf3c11..7ceb812 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -380,7 +380,12 @@
@Override
public Void visitWriteOperator(WriteOperator op, Pair<LogicalVariable, LogicalVariable> pair)
throws AlgebricksException {
- substUsedVariablesInExpr(op.getExpressions(), pair.first, pair.second);
+ substUsedVariablesInExpr(op.getSourceExpression(), pair.first, pair.second);
+ substUsedVariablesInExpr(op.getPathExpression(), pair.first, pair.second);
+ substUsedVariablesInExpr(op.getPartitionExpressions(), pair.first, pair.second);
+ for (Pair<IOrder, Mutable<ILogicalExpression>> orderExpr : op.getOrderExpressions()) {
+ substUsedVariablesInExpr(orderExpr.second, pair.first, pair.second);
+ }
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index d7b6228..d7b2555 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -353,9 +353,15 @@
@Override
public Void visitWriteOperator(WriteOperator op, Void arg) {
- for (Mutable<ILogicalExpression> expr : op.getExpressions()) {
+ op.getSourceExpression().getValue().getUsedVariables(usedVariables);
+ op.getPathExpression().getValue().getUsedVariables(usedVariables);
+ for (Mutable<ILogicalExpression> expr : op.getPartitionExpressions()) {
expr.getValue().getUsedVariables(usedVariables);
}
+
+ for (Pair<IOrder, Mutable<ILogicalExpression>> orderExpr : op.getOrderExpressions()) {
+ orderExpr.second.getValue().getUsedVariables(usedVariables);
+ }
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
index fcc8c8e..8ff605d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
@@ -92,25 +92,8 @@
throw new IllegalStateException(op.getExecutionMode().name());
}
- // require local order property [pc1, ... pcN, oc1, ... ocN]
- // accounting for cases where there's an overlap between order and partition columns
- // TODO replace with required local grouping on partition columns + local order on order columns
- List<OrderColumn> lopColumns = new ArrayList<>();
- ListSet<LogicalVariable> pcVars = new ListSet<>();
- pcVars.addAll(partitionColumns);
- for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
- OrderColumn oc = orderColumns.get(oIdx);
- LogicalVariable ocVar = oc.getColumn();
- if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, pcVars)) {
- throw AlgebricksException.create(ErrorCode.UNSUPPORTED_WINDOW_SPEC, op.getSourceLocation(),
- String.valueOf(partitionColumns), String.valueOf(orderColumns));
- }
- lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
- }
- int pIdx = 0;
- for (LogicalVariable pColumn : pcVars) {
- lopColumns.add(pIdx++, new OrderColumn(pColumn, OrderOperator.IOrder.OrderKind.ASC));
- }
+ List<OrderColumn> lopColumns =
+ getOrderRequirement(op, ErrorCode.UNSUPPORTED_WINDOW_SPEC, partitionColumns, orderColumns);
List<ILocalStructuralProperty> localProps =
lopColumns.isEmpty() ? null : Collections.singletonList(new LocalOrderProperty(lopColumns));
@@ -295,4 +278,30 @@
}
return false;
}
+
+ static List<OrderColumn> getOrderRequirement(ILogicalOperator op, ErrorCode errorCode,
+ List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns) throws AlgebricksException {
+ // require local order property [pc1, ... pcN, oc1, ... ocN]
+ // accounting for cases where there's an overlap between order and partition columns
+ // TODO replace with required local grouping on partition columns + local order on order columns
+ List<OrderColumn> lopColumns = new ArrayList<>();
+ ListSet<LogicalVariable> pcVars = new ListSet<>();
+ pcVars.addAll(partitionColumns);
+ for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
+ OrderColumn oc = orderColumns.get(oIdx);
+ LogicalVariable ocVar = oc.getColumn();
+ if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, pcVars)) {
+ throw AlgebricksException.create(errorCode, op.getSourceLocation(), String.valueOf(partitionColumns),
+ String.valueOf(orderColumns));
+ }
+ lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
+ }
+ int pIdx = 0;
+ for (LogicalVariable pColumn : pcVars) {
+ lopColumns.add(pIdx++, new OrderColumn(pColumn, OrderOperator.IOrder.OrderKind.ASC));
+ }
+
+ return lopColumns;
+ }
+
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index 07c798f..d462cd5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -18,10 +18,14 @@
*/
package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-import org.apache.commons.lang3.mutable.Mutable;
+import static org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractWindowPOperator.getOrderRequirement;
+
+import java.util.Collections;
+import java.util.List;
+
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -30,23 +34,40 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IPrinterFactory;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
public class SinkWritePOperator extends AbstractPhysicalOperator {
+ private final LogicalVariable sourceVariable;
+ private final List<LogicalVariable> partitionVariables;
+ private final List<OrderColumn> orderColumns;
+
+ public SinkWritePOperator(LogicalVariable sourceVariable, List<LogicalVariable> partitionVariables,
+ List<OrderColumn> orderColumns) {
+ this.sourceVariable = sourceVariable;
+ this.partitionVariables = partitionVariables;
+ this.orderColumns = orderColumns;
+ }
@Override
public PhysicalOperatorTag getOperatorTag() {
@@ -66,12 +87,34 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
- WriteOperator write = (WriteOperator) op;
- IDataSink sink = write.getDataSink();
- IPartitioningProperty pp = sink.getPartitioningProperty();
- StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
- return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ IPhysicalPropertiesVector reqByParent, IOptimizationContext context) throws AlgebricksException {
+ if (partitionVariables.isEmpty()) {
+ return emptyUnaryRequirements();
+ }
+ IPartitioningProperty pp;
+ switch (op.getExecutionMode()) {
+ case PARTITIONED:
+ pp = UnorderedPartitionedProperty.of(new ListSet<>(partitionVariables),
+ context.getComputationNodeDomain());
+ break;
+ case UNPARTITIONED:
+ pp = IPartitioningProperty.UNPARTITIONED;
+ break;
+ case LOCAL:
+ pp = null;
+ break;
+ default:
+ throw new IllegalStateException(op.getExecutionMode().name());
+ }
+
+ List<OrderColumn> finalOrderColumns =
+ getOrderRequirement(op, ErrorCode.UNSUPPORTED_WRITE_SPEC, partitionVariables, orderColumns);
+
+ List<ILocalStructuralProperty> localProps =
+ Collections.singletonList(new LocalOrderProperty(finalOrderColumns));
+ return new PhysicalRequirements(
+ new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, localProps) },
+ IPartitioningRequirementsCoordinator.NO_COORDINATION);
}
@Override
@@ -79,29 +122,39 @@
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
WriteOperator write = (WriteOperator) op;
- int[] columns = new int[write.getExpressions().size()];
- int i = 0;
- for (Mutable<ILogicalExpression> exprRef : write.getExpressions()) {
- ILogicalExpression expr = exprRef.getValue();
- if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
- throw new NotImplementedException("Only writing variable expressions is supported.");
- }
- VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
- LogicalVariable v = varRef.getVariableReference();
- columns[i++] = inputSchemas[0].findVariable(v);
+ IExpressionRuntimeProvider runtimeProvider = context.getExpressionRuntimeProvider();
+ IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+ IOperatorSchema schema = inputSchemas[0];
+ IWriteDataSink writeDataSink = write.getWriteDataSink();
+
+ // Source evaluator column
+ int sourceColumn = schema.findVariable(sourceVariable);
+
+ // Path expression
+ IScalarEvaluatorFactory dynamicPathEvalFactory = null;
+ ILogicalExpression staticPathExpr = null;
+ ILogicalExpression pathExpr = write.getPathExpression().getValue();
+ if (pathExpr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+ dynamicPathEvalFactory = runtimeProvider.createEvaluatorFactory(pathExpr, typeEnv, inputSchemas, context);
+ } else {
+ staticPathExpr = pathExpr;
}
+
+ // Partition columns
+ int[] partitionColumns = JobGenHelper.projectVariables(schema, partitionVariables);
+ IBinaryComparatorFactory[] partitionComparatorFactories =
+ JobGenHelper.variablesToAscBinaryComparatorFactories(partitionVariables, typeEnv, context);
+
RecordDescriptor recDesc =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
- IPrinterFactory[] pf =
- JobGenHelper.mkPrinterFactories(inputSchemas[0], context.getTypeEnvironment(op), context, columns);
-
IMetadataProvider<?, ?> mp = context.getMetadataProvider();
- Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> runtimeAndConstraints =
- mp.getWriteFileRuntime(write.getDataSink(), columns, pf, context.getWriterFactory(), inputDesc);
+ Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getWriteFileRuntime(
+ sourceColumn, partitionColumns, partitionComparatorFactories, dynamicPathEvalFactory, staticPathExpr,
+ pathExpr.getSourceLocation(), writeDataSink, inputDesc, typeEnv.getVarType(sourceVariable));
IPushRuntimeFactory runtime = runtimeAndConstraints.first;
runtime.setSourceLocation(write.getSourceLocation());
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index f49b6d4..e1e8c50 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -266,8 +266,22 @@
@Override
public Void visitWriteOperator(WriteOperator op, Integer indent) throws AlgebricksException {
- addIndent(indent).append("write ");
- pprintExprList(op.getExpressions(), indent);
+ AlgebricksStringBuilderWriter writer = addIndent(indent);
+ writer.append("write (");
+ writer.append(op.getSourceExpression().getValue().accept(exprVisitor, indent));
+ writer.append(") to path [");
+ writer.append(op.getPathExpression().getValue().accept(exprVisitor, indent));
+ writer.append("] ");
+ List<Mutable<ILogicalExpression>> partitionExpressions = op.getPartitionExpressions();
+ if (!partitionExpressions.isEmpty()) {
+ writer.append(" partition ");
+ pprintExprList(op.getPartitionExpressions(), indent);
+ List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions = op.getOrderExpressions();
+ if (!orderExpressions.isEmpty()) {
+ writer.append(" order ");
+ pprintOrderList(orderExpressions, indent);
+ }
+ }
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 6f65a9d..a60308a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -482,9 +482,20 @@
public Void visitWriteOperator(WriteOperator op, Void indent) throws AlgebricksException {
try {
jsonGenerator.writeStringField(OPERATOR_FIELD, "write");
- List<Mutable<ILogicalExpression>> expressions = op.getExpressions();
- if (!expressions.isEmpty()) {
- writeArrayFieldOfExpressions(EXPRESSIONS_FIELD, expressions, indent);
+
+ writeStringFieldExpression("value", op.getSourceExpression(), indent);
+ writeStringFieldExpression("path", op.getPathExpression(), indent);
+
+ List<Mutable<ILogicalExpression>> partitionExpressions = op.getPartitionExpressions();
+ if (!partitionExpressions.isEmpty()) {
+ writeObjectFieldWithExpressions("partition-by", partitionExpressions, indent);
+
+ List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions =
+ op.getOrderExpressions();
+ if (!orderExpressions.isEmpty()) {
+ writeArrayFieldOfOrderExprList("order-by", orderExpressions, indent);
+ }
+
}
return null;
} catch (IOException e) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
index 1cea0a9..1f36aa5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
@@ -230,8 +230,23 @@
@Override
public String visitWriteOperator(WriteOperator op, Boolean showDetails) {
stringBuilder.setLength(0);
- stringBuilder.append("write ");
- printExprList(op.getExpressions());
+ stringBuilder.append("write (");
+ stringBuilder.append(op.getSourceExpression());
+ stringBuilder.append(") to [");
+ stringBuilder.append(op.getPathExpression());
+ stringBuilder.append(']');
+ List<Mutable<ILogicalExpression>> partitionExpressions = op.getPartitionExpressions();
+ if (!partitionExpressions.isEmpty()) {
+ stringBuilder.append(" partition by ");
+ printExprList(partitionExpressions);
+
+ List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions = op.getOrderExpressions();
+ if (!orderExpressions.isEmpty()) {
+ stringBuilder.append(" order ");
+ printOrderExprList(orderExpressions);
+ }
+ }
+
appendSchema(op, showDetails);
appendAnnotations(op, showDetails);
appendPhysicalOperatorInfo(op, showDetails);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 2784a6a..cc8c007 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -369,8 +369,14 @@
}
@Override
- public IPhysicalOperator visitWriteOperator(WriteOperator op, Boolean topLevelOp) {
- return new SinkWritePOperator();
+ public IPhysicalOperator visitWriteOperator(WriteOperator op, Boolean topLevelOp) throws AlgebricksException {
+ ILogicalExpression sourceExpr = op.getSourceExpression().getValue();
+ if (sourceExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw AlgebricksException.create(ErrorCode.EXPR_NOT_NORMALIZED, sourceExpr.getSourceLocation());
+ }
+ ensureAllVariables(op.getPartitionExpressions(), v -> v);
+ ensureAllVariables(op.getOrderExpressions(), Pair::getSecond);
+ return new SinkWritePOperator(op.getSourceVariable(), op.getPartitionVariables(), op.getOrderColumns());
}
@Override