[ASTERIXDB-3603][FUN] Add support for transform functions
- user model changes: yes
- storage format changes: no
- interface changes: yes
Ext-ref: MB-63039
Change-Id: If10b15ebd2e9b4ac7cac3b68ca4b301a6b58c30e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19832
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/FunctionCardinalityInferenceVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/FunctionCardinalityInferenceVisitor.java
new file mode 100644
index 0000000..03ef29e
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/FunctionCardinalityInferenceVisitor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.visitor;
+
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.CardinalityInferenceVisitor;
+
+public class FunctionCardinalityInferenceVisitor extends CardinalityInferenceVisitor {
+
+ private FunctionCardinalityInferenceVisitor() {
+ super();
+ }
+
+ public static boolean isCardinalityZeroOrOne(ILogicalOperator operator) throws AlgebricksException {
+ FunctionCardinalityInferenceVisitor visitor = new FunctionCardinalityInferenceVisitor();
+ long cardinality = operator.accept(visitor, null);
+ return cardinality == ZERO_OR_ONE || cardinality == ONE;
+ }
+
+ @Override
+ public Long visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+ // While translation a limit expression is encapsulated as a switch-case expression.
+ // switch-case(treat-as-integer(user_value_expr) > 0, true, treat-as-integer(user_value_expr), 0)
+ // Here we extract the user_value_expr from the switch-case expression
+ ScalarFunctionCallExpression inputExpr = (ScalarFunctionCallExpression) op.getMaxObjects().getValue();
+ ILogicalExpression expr = ((ScalarFunctionCallExpression) inputExpr.getArguments().get(2).getValue())
+ .getArguments().get(0).getValue();
+ if (expr.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
+ AsterixConstantValue constantValue = (AsterixConstantValue) ((ConstantExpression) expr).getValue();
+ if (constantValue.getObject() instanceof AInt64
+ && ((AInt64) constantValue.getObject()).getLongValue() <= 1) {
+ return ZERO_OR_ONE;
+ }
+ }
+ return adjustCardinalityForTupleReductionOperator(op.getInputs().get(0).getValue().accept(this, arg));
+ }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index e3e78e0..ed62ba3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -488,6 +488,16 @@
return isFileStore ? String.valueOf(ExternalWriterProvider.getSeparator(adapter)) : "";
}
+ private LogicalVariable getUnnestVar(ILogicalOperator op) {
+ while (op.getOperatorTag() != LogicalOperatorTag.UNNEST && !op.getInputs().isEmpty()) {
+ op = op.getInputs().get(0).getValue();
+ }
+ if (op.getOperatorTag() == LogicalOperatorTag.UNNEST) {
+ return ((UnnestOperator) op).getVariable();
+ }
+ return null;
+ }
+
public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt,
ILogicalOperator baseOp, IResultMetadata resultMetadata) throws AlgebricksException {
MutableObject<ILogicalOperator> base = new MutableObject<>(new EmptyTupleSourceOperator());
@@ -500,8 +510,11 @@
ILogicalOperator topOp = p.first;
List<LogicalVariable> liveVars = new ArrayList<>();
VariableUtilities.getLiveVariables(topOp, liveVars);
- LogicalVariable unnestVar = liveVars.get(0);
- LogicalVariable resVar = unnestVar;
+ LogicalVariable unnestVar = getUnnestVar(topOp);
+ if (unnestVar == null) {
+ unnestVar = liveVars.get(0);
+ }
+ LogicalVariable resVar = liveVars.get(0);
if (outputDatasetName == null) {
FileSplit outputFileSplit = metadataProvider.getOutputFile();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index fee660d..547afec 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -48,6 +48,8 @@
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.active.NoRetryPolicyFactory;
+import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslator;
+import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
import org.apache.asterix.algebra.extension.ExtensionStatement;
import org.apache.asterix.api.common.APIFramework;
import org.apache.asterix.api.http.server.AbstractQueryApiServlet;
@@ -223,6 +225,7 @@
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.types.TypeSignature;
import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.asterix.optimizer.rules.visitor.FunctionCardinalityInferenceVisitor;
import org.apache.asterix.runtime.fulltext.AbstractFullTextFilterDescriptor;
import org.apache.asterix.runtime.fulltext.FullTextConfigDescriptor;
import org.apache.asterix.runtime.fulltext.StopwordsFullTextFilterDescriptor;
@@ -258,6 +261,9 @@
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.common.utils.Triple;
import org.apache.hyracks.algebricks.core.algebra.base.Counter;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
@@ -3340,10 +3346,14 @@
function = new Function(functionSignature, paramNames, paramTypes, returnTypeSignature, null,
FunctionKind.SCALAR.toString(), library.getLanguage(), libraryDatabaseName,
libraryDataverseName, libraryName, externalIdentifier, cfs.getNullCall(),
- cfs.getDeterministic(), cfs.getResources(), dependencies, creator);
+ cfs.getDeterministic(), cfs.getResources(), dependencies, creator, false);
} else {
List<Pair<VarIdentifier, TypeExpression>> paramList = cfs.getParameters();
int paramCount = paramList.size();
+ if (cfs.isTransform() && paramCount != 1) {
+ throw new CompilationException(ErrorCode.INVALID_TRANSFORM_FUNCTION, sourceLoc,
+ "Transform function should have exactly one parameter");
+ }
List<VarIdentifier> paramVars = new ArrayList<>(paramCount);
List<String> paramNames = new ArrayList<>(paramCount);
for (Pair<VarIdentifier, TypeExpression> paramPair : paramList) {
@@ -3359,7 +3369,8 @@
// Check whether the function is usable:
// create a function declaration for this function,
// and a query body calls this function with each argument set to 'missing'
- FunctionDecl fd = new FunctionDecl(functionSignature, paramVars, cfs.getFunctionBodyExpression(), true);
+ FunctionDecl fd = new FunctionDecl(functionSignature, paramVars, cfs.getFunctionBodyExpression(), true,
+ cfs.isTransform());
fd.setSourceLocation(sourceLoc);
Query wrappedQuery = queryRewriter.createFunctionAccessorQuery(fd);
@@ -3369,17 +3380,25 @@
metadataProvider.setDefaultNamespace(ns);
LangRewritingContext langRewritingContext = createLangRewritingContext(metadataProvider, fdList, null,
null, warningCollector, wrappedQuery.getVarCounter());
- apiFramework.reWriteQuery(langRewritingContext, wrappedQuery, sessionOutput, false, false,
- Collections.emptyList());
+ List<VarIdentifier> externalVars = new ArrayList<>();
+ Pair<IReturningStatement, Integer> rewrittenQuery = apiFramework.reWriteQuery(langRewritingContext,
+ wrappedQuery, sessionOutput, false, true, externalVars);
List<List<DependencyFullyQualifiedName>> dependencies =
FunctionUtil.getFunctionDependencies(metadataProvider, fd, queryRewriter);
+ if (cfs.isTransform()) {
+ if (!dependencies.get(0).isEmpty()) {
+ throw new CompilationException(ErrorCode.INVALID_TRANSFORM_FUNCTION, sourceLoc,
+ "Transform function definition can not use collections/views");
+ }
+ validateTransformFunction(metadataProvider, rewrittenQuery, sourceLoc);
+ }
appCtx.getReceptionist().ensureAuthorized(requestParameters, metadataProvider);
newInlineTypes = Collections.emptyMap();
function = new Function(functionSignature, paramNames, null, null, cfs.getFunctionBody(),
FunctionKind.SCALAR.toString(), compilationProvider.getParserFactory().getLanguage(), null,
- null, null, null, null, null, null, dependencies, creator);
+ null, null, null, null, null, null, dependencies, creator, cfs.isTransform());
}
if (existingFunction == null) {
@@ -3418,6 +3437,31 @@
}
}
+ private void validateTransformFunction(MetadataProvider metadataProvider,
+ Pair<IReturningStatement, Integer> rewrittenResult, SourceLocation sourceLoc) throws AlgebricksException {
+ ILangExpressionToPlanTranslatorFactory translatorFactory =
+ compilationProvider.getExpressionToPlanTranslatorFactory();
+ ILangExpressionToPlanTranslator t =
+ translatorFactory.createExpressionToPlanTranslator(metadataProvider, rewrittenResult.second, null);
+ org.apache.asterix.translator.ResultMetadata resultMetadata =
+ new org.apache.asterix.translator.ResultMetadata(sessionOutput.config().fmt());
+ ILogicalPlan plan = t.translate((Query) rewrittenResult.first, null, null, resultMetadata);
+ if (plan.getRoots().size() != 1) {
+ throw new CompilationException(ErrorCode.INVALID_TRANSFORM_FUNCTION, sourceLoc,
+ "Transform function cannot have more than one root");
+ }
+ ILogicalOperator op = plan.getRoots().get(0).getValue().getInputs().get(0).getValue();
+ if (op.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+ if (!FunctionCardinalityInferenceVisitor.isCardinalityZeroOrOne(op.getInputs().get(0).getValue())) {
+ throw new CompilationException(ErrorCode.INVALID_TRANSFORM_FUNCTION, sourceLoc,
+ "Transform function cannot return more than one row");
+ }
+ } else {
+ throw new CompilationException(ErrorCode.INVALID_TRANSFORM_FUNCTION, sourceLoc,
+ "Transform function should always contain a query");
+ }
+ }
+
private Triple<TypeSignature, TypeSignature, Datatype> translateFunctionParameterType(
FunctionSignature functionSignature, int paramIdx, TypeExpression paramTypeExpr, SourceLocation sourceLoc,
MetadataProvider metadataProvider, MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.000.ddl.sqlpp
new file mode 100644
index 0000000..39b0abd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.000.ddl.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+CREATE TYPE T1 AS {
+ a: int32,
+ b: int32
+};
+
+CREATE COLLECTION test_collection(T1) PRIMARY KEY a;
+
+CREATE VIEW test_view AS
+ SELECT * FROM test_collection;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.001.ddl.sqlpp
new file mode 100644
index 0000000..07f2fb1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.001.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+-- Fail: More than one argument
+
+CREATE TRANSFORM FUNCTION transformTest(a, b) {
+ SELECT * FROM [{"a": 1, "b": 2}] t
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.002.ddl.sqlpp
new file mode 100644
index 0000000..61696ad
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.002.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+-- Fail: Less than one argument
+
+CREATE TRANSFORM FUNCTION transformTest() {
+ SELECT * FROM [{"a": 1, "b": 2}] t
+};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.003.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.003.ddl.sqlpp
new file mode 100644
index 0000000..dc1a277
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.003.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+-- Fail: Using a collection in the definition
+
+CREATE TRANSFORM FUNCTION transformTest(doc) {
+ SELECT count(*) as count FROM test_collection
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.004.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.004.ddl.sqlpp
new file mode 100644
index 0000000..7ad55bd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.004.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+-- Fail: Using a view in the definition
+
+CREATE TRANSFORM FUNCTION transformTest(doc) {
+ SELECT count(*) FROM test_view
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.005.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.005.ddl.sqlpp
new file mode 100644
index 0000000..9e4e7b9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.005.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+CREATE TRANSFORM FUNCTION transformTest(doc) {
+ SELECT * FROM [doc] d UNNEST d.a
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.000.ddl.sqlpp
new file mode 100644
index 0000000..39b0abd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.000.ddl.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+CREATE TYPE T1 AS {
+ a: int32,
+ b: int32
+};
+
+CREATE COLLECTION test_collection(T1) PRIMARY KEY a;
+
+CREATE VIEW test_view AS
+ SELECT * FROM test_collection;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp
new file mode 100644
index 0000000..f03a74d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+CREATE TRANSFORM FUNCTION transformTest1(doc) {
+ SELECT count(*) as count FROM [doc] t UNNEST t.a
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp
new file mode 100644
index 0000000..a04ea00
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+CREATE TRANSFORM FUNCTION transformTest2(doc) {
+ SELECT (SELECT * FROM [doc] t UNNEST t.a) as tt
+};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp
new file mode 100644
index 0000000..0770d98
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+CREATE TRANSFORM FUNCTION transformTest3(doc) {
+ SELECT * FROM [doc] t UNNEST t.a LIMIT 1
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp
new file mode 100644
index 0000000..d2c2265
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+CREATE TRANSFORM FUNCTION transformTest4(cust) {
+ SELECT VALUE c
+ FROM [cust] AS c
+ WHERE c.address.zipcode IS NOT MISSING
+ LIMIT 1
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.005.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.005.ddl.sqlpp
new file mode 100644
index 0000000..77153ce
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.005.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+CREATE TRANSFORM FUNCTION transformTest5(ord) {
+ SELECT o.orderno, i.itemno, i.qty AS item_qty, i.price AS item_price
+ FROM [ord] AS o UNNEST o.items AS i
+ LIMIT 1
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
index 5db093b..de55877 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
@@ -14256,6 +14256,21 @@
<output-dir compare="Text">drop_if_exists</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="function/transform">
+ <compilation-unit name="negative">
+ <output-dir compare="Text">negative</output-dir>
+ <expected-error>ASX1223: Failed to create transform function. Encountered error: 'Transform function should have exactly one parameter'</expected-error>
+ <expected-error>ASX1223: Failed to create transform function. Encountered error: 'Transform function should have exactly one parameter'</expected-error>
+ <expected-error>ASX1223: Failed to create transform function. Encountered error: 'Transform function definition can not use collections/views'</expected-error>
+ <expected-error>ASX1223: Failed to create transform function. Encountered error: 'Transform function definition can not use collections/views'</expected-error>
+ <expected-error>ASX1223: Failed to create transform function. Encountered error: 'Transform function cannot return more than one row'</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="function/transform">
+ <compilation-unit name="positive">
+ <output-dir compare="Text">positive</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="feeds">
<test-case FilePath="feeds">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 463695e..260617a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -328,6 +328,7 @@
CANNOT_TRUNCATE_DATASET_TYPE(1220),
NO_VALID_AUTHENTICATION_PARAMS_PROVIDED(1221),
NO_VALID_AUTHENTICATION_PARAMS_PROVIDED_TO_IMPERSONATE_SERVICE_ACCOUNT(1222),
+ INVALID_TRANSFORM_FUNCTION(1223),
// Feed errors
DATAFLOW_ILLEGAL_STATE(3001),
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index df08016..68c23c7 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -330,6 +330,7 @@
1220 = Cannot truncate %1$s '%2$s'
1221 = No valid authentication parameters were provided
1222 = No valid authentication parameters were provided to impersonate service account
+1223 = Failed to create transform function. Encountered error: '%1$s'
# Feed Errors
3001 = Illegal state.
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
index e0623a1..74b7bbd 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
@@ -63,9 +63,11 @@
private final boolean replaceIfExists;
private final boolean ifNotExists;
+ private final boolean transform;
public CreateFunctionStatement(FunctionSignature signature, List<Pair<VarIdentifier, TypeExpression>> paramList,
- String functionBody, Expression functionBodyExpression, boolean replaceIfExists, boolean ifNotExists) {
+ String functionBody, Expression functionBodyExpression, boolean replaceIfExists, boolean ifNotExists,
+ boolean transform) {
this.signature = signature;
this.functionBody = functionBody;
this.functionBodyExpression = functionBodyExpression;
@@ -77,6 +79,7 @@
this.options = null;
this.replaceIfExists = replaceIfExists;
this.ifNotExists = ifNotExists;
+ this.transform = transform;
}
public CreateFunctionStatement(FunctionSignature signature, List<Pair<VarIdentifier, TypeExpression>> paramList,
@@ -93,6 +96,7 @@
this.functionBodyExpression = null;
this.replaceIfExists = replaceIfExists;
this.ifNotExists = ifNotExists;
+ this.transform = false;
}
public boolean getReplaceIfExists() {
@@ -196,6 +200,10 @@
return Category.DDL;
}
+ public boolean isTransform() {
+ return transform;
+ }
+
private IAdmNode getOption(String optionName) {
return options != null ? options.get(optionName) : null;
}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/FunctionDecl.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/FunctionDecl.java
index 2ef11ad..1a924ee 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/FunctionDecl.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/FunctionDecl.java
@@ -34,13 +34,20 @@
private Expression funcBody;
private Expression funcBodyNormalized;
private final boolean isStored;
+ private final boolean transform;
public FunctionDecl(FunctionSignature signature, List<VarIdentifier> paramList, Expression funcBody,
- boolean isStored) {
+ boolean isStored, boolean transform) {
this.signature = signature;
this.paramList = paramList;
this.funcBody = funcBody;
this.isStored = isStored;
+ this.transform = transform;
+ }
+
+ public FunctionDecl(FunctionSignature signature, List<VarIdentifier> paramList, Expression funcBody,
+ boolean isStored) {
+ this(signature, paramList, funcBody, isStored, false);
}
public FunctionSignature getSignature() {
@@ -72,6 +79,10 @@
return isStored;
}
+ public boolean isTransform() {
+ return transform;
+ }
+
@Override
public int hashCode() {
return signature.hashCode();
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java
index 06f22b7..4f09722 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java
@@ -179,7 +179,8 @@
}
Pair<ILangExpression, VariableSubstitutionEnvironment> p1 = fd.getFuncBody().accept(this, env);
- FunctionDecl newF = new FunctionDecl(fd.getSignature(), newList, (Expression) p1.first, fd.isStored());
+ FunctionDecl newF =
+ new FunctionDecl(fd.getSignature(), newList, (Expression) p1.first, fd.isStored(), fd.isTransform());
newF.setSourceLocation(fd.getSourceLocation());
return new Pair<>(newF, env);
}
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java
index 1d43d0b..9941dde 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java
@@ -267,7 +267,7 @@
@Override
public FunctionDecl visit(FunctionDecl fd, Void arg) throws CompilationException {
FunctionDecl copy = new FunctionDecl(fd.getSignature(), fd.getParamList(),
- (Expression) fd.getFuncBody().accept(this, arg), fd.isStored());
+ (Expression) fd.getFuncBody().accept(this, arg), fd.isStored(), fd.isTransform());
copy.setSourceLocation(fd.getSourceLocation());
return copy;
}
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 0c95c77..13272e9 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -1865,13 +1865,18 @@
CreateFunctionStatement stmt = null;
}
{
- <FUNCTION> stmt = FunctionSpecification(startStmtToken, orReplace)
+ <TRANSFORM> <FUNCTION> stmt = FunctionSpecification(startStmtToken, orReplace, true)
+ {
+ return stmt;
+ }
+ |
+ <FUNCTION> stmt = FunctionSpecification(startStmtToken, orReplace, false)
{
return stmt;
}
}
-CreateFunctionStatement FunctionSpecification(Token startStmtToken, boolean orReplace) throws ParseException:
+CreateFunctionStatement FunctionSpecification(Token startStmtToken, boolean orReplace, boolean transform) throws ParseException:
{
FunctionSignature signature = null;
FunctionName fctName = null;
@@ -1926,7 +1931,7 @@
getCurrentScope().addFunctionDescriptor(signature, false);
removeCurrentScope();
ensureNoTypeDeclsInFunction(fctName.function, params, returnType, startStmtToken);
- stmt = new CreateFunctionStatement(signature, params, functionBody, functionBodyExpr, orReplace, ifNotExists);
+ stmt = new CreateFunctionStatement(signature, params, functionBody, functionBodyExpr, orReplace, ifNotExists, transform);
}
)
|
@@ -6114,6 +6119,7 @@
| <WITH : "with">
| <WRITE : "write">
| <COPY : "copy">
+ | <TRANSFORM : "transform">
}
<DEFAULT,IN_DBL_BRACE>
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 25d43b2..a60e53b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -689,12 +689,6 @@
// in the cache.
return null;
}
- //TODO(DB): review this and other similar ones
- if (ctx.getDataverse(functionSignature.getDatabaseName(), functionSignature.getDataverseName()) != null) {
- // This transaction has dropped and subsequently created the same
- // dataverse.
- return null;
- }
function = cache.getFunction(functionSignature);
if (function != null) {
// Function is already in the cache, don't add it again.
@@ -1348,6 +1342,15 @@
INSTANCE = new NCMetadataManagerImpl(proxies, metadataNode);
}
+ @Override
+ public List<Dataset> getAllDatasets(MetadataTransactionContext ctx) throws AlgebricksException {
+ try {
+ return metadataNode.getAllDatasets(ctx.getTxnId());
+ } catch (RemoteException e) {
+ throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
+ }
+ }
+
private static class CCMetadataManagerImpl extends MetadataManager {
private final MetadataProperties metadataProperties;
private final ICcApplicationContext appCtx;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
index 70bf83a..8db32d6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
@@ -203,7 +203,7 @@
public void dropFunction(FunctionSignature signature) {
Function function = new Function(signature, null, null, null, null, null, null, null, null, null, null, false,
- false, null, null, null);
+ false, null, null, null, false);
droppedCache.addFunctionIfNotExists(function);
logAndApply(new MetadataLogicalOperation(function, false));
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index fdd36de..4ce8742 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -919,4 +919,6 @@
String feedName) throws AlgebricksException;
long getMaxTxnId();
+
+ List<Dataset> getAllDatasets(MetadataTransactionContext ctx) throws AlgebricksException;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index ee4a2c9..bc6789c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -1017,4 +1017,6 @@
List<FeedConnection> getFeedConnections(TxnId txnId, String database, DataverseName dataverseName, String feedName)
throws AlgebricksException, RemoteException;
+
+ List<Dataset> getAllDatasets(TxnId txnId) throws AlgebricksException, RemoteException;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 338e00d..31a7f9b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -53,6 +53,7 @@
public static final String FIELD_NAME_DEFAULT = "Default";
public static final String FIELD_NAME_DEFINITION = "Definition";
public static final String FIELD_NAME_DEPENDENCIES = "Dependencies";
+ public static final String FIELD_NAME_IS_TRANSFORM = "IsTransform";
public static final String FIELD_NAME_DERIVED = "Derived";
public static final String FIELD_NAME_DESCRIPTION = "Description";
public static final String FIELD_NAME_EXTERNAL_DETAILS = "ExternalDetails";
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f11a338..63eda21 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -2033,4 +2033,8 @@
off += Character.charCount(codePointChar);
}
}
+
+ public List<Dataset> getAllDatasets() throws AlgebricksException {
+ return MetadataManager.INSTANCE.getAllDatasets(mdTxnCtx);
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
index 3c1515b..c19b542 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
@@ -51,12 +51,13 @@
private final Map<String, String> resources;
private final List<List<DependencyFullyQualifiedName>> dependencies;
private final Creator creator;
+ private final boolean transform;
public Function(FunctionSignature signature, List<String> paramNames, List<TypeSignature> paramTypes,
TypeSignature returnType, String functionBody, String functionKind, String language,
String libraryDatabaseName, DataverseName libraryDataverseName, String libraryName,
List<String> externalIdentifier, Boolean nullCall, Boolean deterministic, Map<String, String> resources,
- List<List<DependencyFullyQualifiedName>> dependencies, Creator creator) {
+ List<List<DependencyFullyQualifiedName>> dependencies, Creator creator, boolean transform) {
this.signature = signature;
this.paramNames = paramNames;
this.paramTypes = paramTypes;
@@ -75,6 +76,7 @@
? Arrays.asList(Collections.emptyList(), Collections.emptyList(), Collections.emptyList())
: dependencies;
this.creator = creator;
+ this.transform = transform;
}
public FunctionSignature getSignature() {
@@ -168,6 +170,10 @@
return creator;
}
+ public boolean isTransform() {
+ return transform;
+ }
+
@Override
public Function addToCache(MetadataCache cache) {
return cache.addFunctionIfNotExists(this);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FunctionTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FunctionTupleTranslator.java
index 2741d12..2319cb0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FunctionTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FunctionTupleTranslator.java
@@ -206,9 +206,14 @@
FunctionSignature signature = new FunctionSignature(databaseName, dataverseName, functionName, arity);
Creator creator = Creator.createOrDefault(functionRecord);
+ int isTransformIndex = functionRecord.getType().getFieldIndex(MetadataRecordTypes.FIELD_NAME_IS_TRANSFORM);
+ boolean transform = false;
+ if (isTransformIndex >= 0) {
+ transform = ((ABoolean) functionRecord.getValueByPos(isTransformIndex)).getBoolean();
+ }
return new Function(signature, paramNames, paramTypes, returnType, definition, functionKind, language,
libraryDatabaseName, libraryDataverseName, libraryName, externalIdentifier, nullCall, deterministic,
- resources, dependencies, creator);
+ resources, dependencies, creator, transform);
}
private List<TypeSignature> getParamTypes(ARecord functionRecord, String functionDatabaseName,
@@ -435,6 +440,7 @@
writeNullCall(function);
writeDeterministic(function);
writeFunctionCreator(function);
+ writeIsTransform(function);
}
protected void writeResources(Function function) throws HyracksDataException {
@@ -722,4 +728,15 @@
recordBuilder.addField(fieldName, fieldValue);
}
}
+
+ private void writeIsTransform(Function function) throws HyracksDataException {
+ if (function.isTransform()) {
+ fieldName.reset();
+ aString.setValue(MetadataRecordTypes.FIELD_NAME_IS_TRANSFORM);
+ stringSerde.serialize(aString, fieldName.getDataOutput());
+ fieldValue.reset();
+ booleanSerde.serialize(ABoolean.TRUE, fieldValue.getDataOutput());
+ recordBuilder.addField(fieldName, fieldValue);
+ }
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
index 1701e64..005bdf5 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
@@ -69,6 +69,18 @@
this.funID = funID;
}
+ public IScalarEvaluatorFactory getListEvalFactory() {
+ return listEvalFactory;
+ }
+
+ public SourceLocation getSourceLoc() {
+ return sourceLoc;
+ }
+
+ public FunctionIdentifier getFunID() {
+ return funID;
+ }
+
@Override
public IUnnestingEvaluator createUnnestingEvaluator(IEvaluatorContext ctx) throws HyracksDataException {
return new IUnnestingEvaluator() {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index 6b25b1f..21d9ea2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -84,15 +84,15 @@
* 5. the cardinality is some unknown value.
*/
public class CardinalityInferenceVisitor implements ILogicalOperatorVisitor<Long, Void> {
- private static final long ZERO_OR_ONE = 0L;
- private static final long ONE = 1L;
- private static final long ZERO_OR_ONE_GROUP = 2L;
- private static final long ONE_GROUP = 3L;
- private static final long UNKNOWN = 100L; // so it fits into the auto-boxing cache
+ protected static final long ZERO_OR_ONE = 0L;
+ protected static final long ONE = 1L;
+ protected static final long ZERO_OR_ONE_GROUP = 2L;
+ protected static final long ONE_GROUP = 3L;
+ protected static final long UNKNOWN = 100L; // so it fits into the auto-boxing cache
private final Set<LogicalVariable> keyVariables = new HashSet<>();
- private CardinalityInferenceVisitor() {
+ public CardinalityInferenceVisitor() {
}
@Override
@@ -379,7 +379,7 @@
}
// For operators including SELECT and LIMIT.
- private long adjustCardinalityForTupleReductionOperator(long inputCardinality) {
+ protected long adjustCardinalityForTupleReductionOperator(long inputCardinality) {
if (inputCardinality == ONE) {
return ZERO_OR_ONE;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 0c74260..9b643cd 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.dataflow.common.comm.io.AbstractFrameAppender;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -38,10 +39,12 @@
protected IFrame frame;
protected FrameTupleAccessor tAccess;
protected FrameTupleReference tRef;
+ protected boolean ignoreFailures = false;
protected final void initAccessAppend(IHyracksTaskContext ctx) throws HyracksDataException {
frame = new VSizeFrame(ctx);
appender = new FrameTupleAppender(frame);
+ ((AbstractFrameAppender) appender).setIgnoreFailures(ignoreFailures);
tAccess = new FrameTupleAccessor(inputRecordDesc);
}
@@ -59,8 +62,10 @@
try {
flushIfNotFailed();
} catch (Exception e) {
- closeException = e;
- fail(closeException);
+ if (!ignoreFailures) {
+ closeException = e;
+ fail(closeException);
+ }
} finally {
closeException = CleanupUtils.close(writer, closeException);
}
@@ -115,4 +120,15 @@
throws HyracksDataException {
FrameUtils.appendConcatToWriter(writer, getTupleAppender(), accessor0, tIndex0, accessor1, tIndex1);
}
+
+ public void setIgnoreFailures(boolean ignoreFailures) {
+ this.ignoreFailures = ignoreFailures;
+ if (appender != null) {
+ ((AbstractFrameAppender) appender).setIgnoreFailures(ignoreFailures);
+ }
+ }
+
+ public boolean isIgnoreFailures() {
+ return ignoreFailures;
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index 202c087..c2e8473 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.base.ProfiledPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -46,15 +47,23 @@
private final int outputArity;
private final AlgebricksPipeline pipeline;
private final Map<IPushRuntimeFactory, IPushRuntime[]> runtimeMap;
+ private final boolean ignoreFailures;
public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity,
RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor pipelineOutputRecordDescriptor) {
+ this(pipeline, inputArity, outputArity, pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor, false);
+ }
+
+ public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity,
+ RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor pipelineOutputRecordDescriptor,
+ boolean ignoreFailures) {
this.pipeline = pipeline;
this.pipelineInputRecordDescriptor = pipelineInputRecordDescriptor;
this.pipelineOutputRecordDescriptor = pipelineOutputRecordDescriptor;
this.inputArity = inputArity;
this.outputArity = outputArity;
this.runtimeMap = new HashMap<>();
+ this.ignoreFailures = ignoreFailures;
}
public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException {
@@ -99,6 +108,9 @@
} else {
newRuntimes[j].setOutputFrameWriter(0, start, recordDescriptors[i]);
}
+ if (newRuntimes[j] instanceof AbstractOneInputOneOutputOneFramePushRuntime) {
+ ((AbstractOneInputOneOutputOneFramePushRuntime) newRuntimes[j]).setIgnoreFailures(ignoreFailures);
+ }
}
runtimeMap.put(runtimeFactory, newRuntimes);
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 7feca3c..242c603 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -90,6 +90,22 @@
return pipelines;
}
+ public RecordDescriptor getInputRecordDesc() {
+ return inputRecordDesc;
+ }
+
+ public RecordDescriptor getOutputRecordDesc() {
+ return outputRecordDesc;
+ }
+
+ public IMissingWriterFactory[] getMissingWriterFactories() {
+ return missingWriterFactories;
+ }
+
+ public Map<IPushRuntimeFactory, IOperatorStats> getStats() {
+ return stats;
+ }
+
public void setStats(Map<IPushRuntimeFactory, IOperatorStats> stats) {
this.stats.putAll(stats);
}
@@ -97,20 +113,20 @@
@Override
public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws HyracksDataException {
- return new SubplanPushRuntime(ctx);
+ return new SubplanPushRuntime(ctx, false);
}
- private class SubplanPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+ public class SubplanPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
- final IHyracksTaskContext ctx;
+ protected final IHyracksTaskContext ctx;
- final NestedTupleSourceRuntime[] startOfPipelines;
+ protected final NestedTupleSourceRuntime[] startOfPipelines;
boolean first;
boolean profile;
- SubplanPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+ protected SubplanPushRuntime(IHyracksTaskContext ctx, boolean ignoreFailures) throws HyracksDataException {
this.ctx = ctx;
this.profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
this.first = true;
@@ -150,7 +166,8 @@
outputRecordDescriptor = pipelineLastRecordDescriptor;
}
- PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, outputRecordDescriptor);
+ PipelineAssembler pa =
+ new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, outputRecordDescriptor, ignoreFailures);
IFrameWriter head = pa.assemblePipeline(outputWriter, ctx, stats);
startOfPipelines[i] = (NestedTupleSourceRuntime) head;
pipelineAssemblers[i] = pa;
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 0874f2b..9cfc070 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -52,8 +52,8 @@
private static final long serialVersionUID = 1L;
- private final int outCol;
- private final int positionalCol;
+ protected final int outCol;
+ protected final int positionalCol;
private final IUnnestingEvaluatorFactory unnestingFactory;
private final IUnnestingPositionWriterFactory positionWriterFactory;
private final boolean leftOuter;
@@ -76,6 +76,26 @@
this.missingWriterFactory = missingWriterFactory;
}
+ public int[] getProjectionList() {
+ return projectionList;
+ }
+
+ public int getOutCol() {
+ return outCol;
+ }
+
+ public int getPositionalCol() {
+ return positionalCol;
+ }
+
+ public IUnnestingEvaluatorFactory getUnnestingFactory() {
+ return unnestingFactory;
+ }
+
+ public IUnnestingPositionWriterFactory getPositionWriterFactory() {
+ return positionWriterFactory;
+ }
+
@Override
public String toString() {
return "unnest " + outCol + (positionalCol >= 0 ? " at " + positionalCol : "") + " <- " + unnestingFactory
@@ -85,92 +105,102 @@
@Override
public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws HyracksDataException {
- ByteArrayAccessibleOutputStream missingBytes = leftOuter ? writeMissingBytes() : null;
- IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
- return new AbstractOneInputOneOutputOneFramePushRuntime() {
- private IPointable p = VoidPointable.FACTORY.createPointable();
- private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
- private IUnnestingEvaluator unnest = unnestingFactory.createUnnestingEvaluator(evalCtx);
- private final IUnnestingPositionWriter positionWriter =
- positionWriterFactory != null ? positionWriterFactory.createUnnestingPositionWriter() : null;
-
- @Override
- public void open() throws HyracksDataException {
- super.open();
- if (tRef == null) {
- initAccessAppendRef(ctx);
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- tAccess.reset(buffer);
- int nTuple = tAccess.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
- tRef.reset(tAccess, t);
- try {
- unnest.init(tRef);
- unnesting(t);
- } catch (IOException ae) {
- throw HyracksDataException.create(ae);
- }
- }
- }
-
- private void unnesting(int t) throws IOException {
- // Assumes that when unnesting the tuple, each step() call for each element
- // in the tuple will increase the positionIndex, and the positionIndex will
- // be reset when a new tuple is to be processed.
- int positionIndex = 1;
- boolean emitted = false;
- do {
- if (!unnest.step(p)) {
- break;
- }
- writeOutput(t, positionIndex++, false);
- emitted = true;
- } while (true);
- if (leftOuter && !emitted) {
- writeOutput(t, -1, true);
- }
- }
-
- private void writeOutput(int t, int positionIndex, boolean missing)
- throws HyracksDataException, IOException {
- tupleBuilder.reset();
- for (int f = 0; f < projectionList.length; f++) {
- int col = projectionList[f];
- if (col == outCol) {
- if (missing) {
- tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
- } else {
- tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
- }
- } else if (col == positionalCol) {
- if (missing) {
- tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
- } else {
- positionWriter.write(tupleBuilder.getDataOutput(), positionIndex);
- tupleBuilder.addFieldEndOffset();
- }
- } else {
- tupleBuilder.addField(tAccess, t, projectionList[f]);
- }
- }
- appendToFrameFromTupleBuilder(tupleBuilder);
- }
-
- @Override
- public void flush() throws HyracksDataException {
- appender.flush(writer);
- }
- };
+ return new UnnestPushRuntime(ctx);
}
- private ByteArrayAccessibleOutputStream writeMissingBytes() throws HyracksDataException {
+ public class UnnestPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+ private IPointable p = VoidPointable.FACTORY.createPointable();
+ protected ArrayTupleBuilder tupleBuilder;
+ protected IUnnestingEvaluator unnest;
+ private final IUnnestingPositionWriter positionWriter;
+ private final IHyracksTaskContext ctx;
+ protected ByteArrayAccessibleOutputStream missingBytes;
+
+ public UnnestPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+ this.ctx = ctx;
+ IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
+ unnest = unnestingFactory.createUnnestingEvaluator(evalCtx);
+ tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+ positionWriter =
+ positionWriterFactory != null ? positionWriterFactory.createUnnestingPositionWriter() : null;
+ missingBytes = leftOuter ? writeMissingBytes() : null;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ super.open();
+ if (tRef == null) {
+ initAccessAppendRef(ctx);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ try {
+ unnest.init(tRef);
+ unnesting(t);
+ } catch (IOException ae) {
+ throw HyracksDataException.create(ae);
+ }
+ }
+ }
+
+ protected void unnesting(int t) throws IOException {
+ // Assumes that when unnesting the tuple, each step() call for each element
+ // in the tuple will increase the positionIndex, and the positionIndex will
+ // be reset when a new tuple is to be processed.
+ int positionIndex = 1;
+ boolean emitted = false;
+ do {
+ if (!unnest.step(p)) {
+ break;
+ }
+ writeOutput(t, positionIndex++, false);
+ emitted = true;
+ } while (true);
+ if (leftOuter && !emitted) {
+ writeOutput(t, -1, true);
+ }
+ }
+
+ private void writeOutput(int t, int positionIndex, boolean missing) throws HyracksDataException, IOException {
+ tupleBuilder.reset();
+ for (int f = 0; f < projectionList.length; f++) {
+ int col = projectionList[f];
+ if (col == outCol) {
+ if (missing) {
+ tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
+ } else {
+ tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+ }
+ } else if (col == positionalCol) {
+ if (missing) {
+ tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
+ } else {
+ positionWriter.write(tupleBuilder.getDataOutput(), positionIndex);
+ tupleBuilder.addFieldEndOffset();
+ }
+ } else {
+ tupleBuilder.addField(tAccess, t, projectionList[f]);
+ }
+ }
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ appender.flush(writer);
+ }
+ };
+
+ protected ByteArrayAccessibleOutputStream writeMissingBytes() throws HyracksDataException {
ByteArrayAccessibleOutputStream baos = new ByteArrayAccessibleOutputStream();
IMissingWriter missingWriter = missingWriterFactory.createMissingWriter();
missingWriter.writeMissing(new DataOutputStream(baos));
return baos;
}
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index e1acaee..7338cc7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -48,6 +48,7 @@
protected int tupleCount;
protected int tupleDataEndOffset;
+ protected boolean ignoreFailures = false;
@Override
public void reset(IFrame frame, boolean clear) throws HyracksDataException {
@@ -91,10 +92,21 @@
public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
failIfInterrupted();
getBuffer().clear();
- outWriter.nextFrame(getBuffer());
- if (clearFrame) {
- frame.reset();
- reset(getBuffer(), true);
+ if (!ignoreFailures) {
+ outWriter.nextFrame(getBuffer());
+ if (clearFrame) {
+ frame.reset();
+ reset(getBuffer(), true);
+ }
+ } else {
+ try {
+ outWriter.nextFrame(getBuffer());
+ } finally {
+ if (clearFrame) {
+ frame.reset();
+ reset(getBuffer(), true);
+ }
+ }
}
}
@@ -125,4 +137,12 @@
throw HyracksDataException.create(new InterruptedException());
}
}
+
+ public void setIgnoreFailures(boolean ignoreFailures) {
+ this.ignoreFailures = ignoreFailures;
+ }
+
+ public boolean isIgnoreFailures() {
+ return ignoreFailures;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 3ef8b28..58194f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -299,5 +299,4 @@
}
return false;
}
-
}