Merge branch 'gerrit/mad-hatter'
Change-Id: I5cd06cc3cd69068faecced6a2b1cc558eecd0784
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
index b689e8f..c3fc56c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
@@ -149,7 +149,7 @@
public static final String REWRITE_IN_AS_OR_OPTION = "rewrite_in_as_or";
private static final boolean REWRITE_IN_AS_OR_OPTION_DEFAULT = true;
- private Deque<Mutable<ILogicalOperator>> uncorrelatedLeftBranchStack = new ArrayDeque<>();
+ private Deque<Mutable<ILogicalOperator>> uncorrelatedRightBranchStack = new ArrayDeque<>();
private final Map<VarIdentifier, IAObject> externalVars;
private final boolean translateInAsOr;
@@ -297,10 +297,12 @@
throws CompilationException {
Mutable<ILogicalOperator> inputSrc = arg;
Pair<ILogicalOperator, LogicalVariable> topUnnest = null;
+ uncorrelatedRightBranchStack.push(inputSrc);
for (FromTerm fromTerm : fromClause.getFromTerms()) {
topUnnest = fromTerm.accept(this, inputSrc);
inputSrc = new MutableObject<>(topUnnest.first);
}
+ uncorrelatedRightBranchStack.pop();
return topUnnest;
}
@@ -328,27 +330,20 @@
Mutable<ILogicalOperator> topOpRef = new MutableObject<>(unnestOp);
if (fromTerm.hasCorrelateClauses()) {
for (AbstractBinaryCorrelateClause correlateClause : fromTerm.getCorrelateClauses()) {
- if (correlateClause.getClauseType() == ClauseType.UNNEST_CLAUSE) {
- // Correlation is allowed.
- topOpRef = new MutableObject<>(correlateClause.accept(this, topOpRef).first);
- } else {
- // Correlation is dis-allowed.
- uncorrelatedLeftBranchStack.push(topOpRef);
- topOpRef = new MutableObject<>(correlateClause.accept(this, tupSource).first);
- }
+ topOpRef = new MutableObject<>(correlateClause.accept(this, topOpRef).first);
}
}
return new Pair<>(topOpRef.getValue(), fromVar);
}
@Override
- public Pair<ILogicalOperator, LogicalVariable> visit(JoinClause joinClause, Mutable<ILogicalOperator> inputRef)
+ public Pair<ILogicalOperator, LogicalVariable> visit(JoinClause joinClause, Mutable<ILogicalOperator> leftInputRef)
throws CompilationException {
SourceLocation sourceLoc = joinClause.getSourceLocation();
- Mutable<ILogicalOperator> leftInputRef = uncorrelatedLeftBranchStack.pop();
if (joinClause.getJoinType() == JoinType.INNER) {
+ Mutable<ILogicalOperator> rightInputRef = uncorrelatedRightBranchStack.peek();
Pair<ILogicalOperator, LogicalVariable> rightBranch =
- generateUnnestForBinaryCorrelateRightBranch(joinClause, inputRef, true);
+ generateUnnestForBinaryCorrelateRightBranch(joinClause, rightInputRef, true);
// A join operator with condition TRUE.
AbstractBinaryJoinOperator joinOperator = new InnerJoinOperator(
new MutableObject<>(ConstantExpression.TRUE), leftInputRef, new MutableObject<>(rightBranch.first));
@@ -362,7 +357,7 @@
filter.getInputs().add(conditionExprOpPair.second);
filter.setSourceLocation(conditionExprOpPair.first.getSourceLocation());
return new Pair<>(filter, rightBranch.second);
- } else {
+ } else if (joinClause.getJoinType() == JoinType.LEFTOUTER) {
// Creates a subplan operator.
SubplanOperator subplanOp = new SubplanOperator();
subplanOp.getInputs().add(leftInputRef);
@@ -498,6 +493,9 @@
context.setVar(joinClause.getRightVariable(), outerUnnestVar);
}
return new Pair<>(currentTopOp, null);
+ } else {
+ throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, joinClause.getSourceLocation(),
+ String.valueOf(joinClause.getJoinType().toString()));
}
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/TypeTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/TypeTranslator.java
index aa00fb3..6e02fc2 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/TypeTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/TypeTranslator.java
@@ -28,6 +28,8 @@
import org.apache.asterix.common.annotations.IRecordFieldDataGen;
import org.apache.asterix.common.annotations.RecordDataGenAnnotation;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.lang.common.expression.OrderedListTypeDefinition;
import org.apache.asterix.lang.common.expression.RecordTypeDefinition;
@@ -52,6 +54,7 @@
import org.apache.asterix.om.types.TypeSignature;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.SourceLocation;
public class TypeTranslator {
@@ -73,9 +76,9 @@
Map<TypeSignature, List<AbstractCollectionType>> incompleteItemTypes = new HashMap<>();
Map<TypeSignature, List<TypeSignature>> incompleteTopLevelTypeReferences = new HashMap<>();
firstPass(typeDataverse, typeName, typeExpr, outTypeMap, incompleteFieldTypes, incompleteItemTypes,
- incompleteTopLevelTypeReferences, defaultDataverse);
+ incompleteTopLevelTypeReferences, typeDataverse);
secondPass(mdTxnCtx, outTypeMap, incompleteFieldTypes, incompleteItemTypes, incompleteTopLevelTypeReferences,
- typeDataverse);
+ typeDataverse, typeExpr.getSourceLocation());
for (IAType type : outTypeMap.values()) {
if (type.getTypeTag().isDerivedType()) {
@@ -91,7 +94,8 @@
throws AlgebricksException {
if (BuiltinTypeMap.getBuiltinType(typeName) != null) {
- throw new AlgebricksException("Cannot redefine builtin type " + typeName + " .");
+ throw new CompilationException(ErrorCode.COMPILATION_ERROR, typeExpr.getSourceLocation(),
+ "Cannot redefine builtin type " + typeName);
}
TypeSignature typeSignature = new TypeSignature(typeDataverse, typeName);
switch (typeExpr.getTypeKind()) {
@@ -136,15 +140,15 @@
private static void secondPass(MetadataTransactionContext mdTxnCtx, Map<TypeSignature, IAType> typeMap,
Map<String, Map<ARecordType, List<Integer>>> incompleteFieldTypes,
Map<TypeSignature, List<AbstractCollectionType>> incompleteItemTypes,
- Map<TypeSignature, List<TypeSignature>> incompleteTopLevelTypeReferences, DataverseName typeDataverse)
- throws AlgebricksException {
+ Map<TypeSignature, List<TypeSignature>> incompleteTopLevelTypeReferences, DataverseName typeDataverse,
+ SourceLocation sourceLoc) throws AlgebricksException {
// solve remaining top level references
for (TypeSignature typeSignature : incompleteTopLevelTypeReferences.keySet()) {
IAType t;
Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, typeSignature.getDataverseName(),
typeSignature.getName());
if (dt == null) {
- throw new AlgebricksException("Could not resolve type " + typeSignature);
+ throw new CompilationException(ErrorCode.UNKNOWN_TYPE, sourceLoc, typeSignature.getName());
} else {
t = dt.getDatatype();
}
@@ -161,7 +165,7 @@
trefName);
}
if (dt == null) {
- throw new AlgebricksException("Could not resolve type " + trefName);
+ throw new CompilationException(ErrorCode.UNKNOWN_TYPE, sourceLoc, trefName);
} else {
t = dt.getDatatype();
}
@@ -188,7 +192,7 @@
dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, typeSignature.getDataverseName(),
typeSignature.getName());
if (dt == null) {
- throw new AlgebricksException("Could not resolve type " + typeSignature);
+ throw new CompilationException(ErrorCode.UNKNOWN_TYPE, sourceLoc, typeSignature.getName());
}
t = dt.getDatatype();
} else {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index bd70ed4..40ed7a4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -106,8 +106,8 @@
protected Void doRecover(IRetryPolicy policy) throws AlgebricksException, InterruptedException {
LOGGER.log(level, "Actual Recovery task has started");
- Exception failure = null;
- while (policy.retry(failure)) {
+ Exception failure;
+ do {
synchronized (listener) {
while (!cancelRecovery && clusterStateManager.getState() != ClusterState.ACTIVE) {
listener.wait();
@@ -139,7 +139,7 @@
} finally {
releaseRecoveryLocks(metadataProvider);
}
- }
+ } while (policy.retry(failure));
// Recovery task is essntially over now either through failure or through cancellation(stop)
synchronized (listener) {
listener.notifyAll();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index c0a719b..a5fef11 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -691,10 +691,11 @@
throw new CompilationException(ErrorCode.DATASET_EXISTS, sourceLoc, datasetName, dataverseName);
}
}
+ Datatype itemTypeEntity;
IAType itemType;
switch (itemTypeExpr.getTypeKind()) {
case TYPEREFERENCE:
- Datatype itemTypeEntity = metadataProvider.findTypeEntity(itemTypeDataverseName, itemTypeName);
+ itemTypeEntity = metadataProvider.findTypeEntity(itemTypeDataverseName, itemTypeName);
if (itemTypeEntity == null || itemTypeEntity.getIsAnonymous()) {
// anonymous types cannot be referred from CREATE DATASET
throw new AsterixException(ErrorCode.UNKNOWN_TYPE, sourceLoc,
@@ -706,8 +707,8 @@
case RECORD:
itemType = translateType(itemTypeDataverseName, itemTypeName, itemTypeExpr, mdTxnCtx);
validateDatasetItemType(dsType, itemType, false, sourceLoc);
- MetadataManager.INSTANCE.addDatatype(mdTxnCtx,
- new Datatype(itemTypeDataverseName, itemTypeName, itemType, true));
+ itemTypeEntity = new Datatype(itemTypeDataverseName, itemTypeName, itemType, true);
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, itemTypeEntity);
break;
default:
throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc,
@@ -783,7 +784,8 @@
createExternalDatasetProperties(dataverseName, dd, metadataProvider, mdTxnCtx);
ExternalDataUtils.normalize(properties);
ExternalDataUtils.validate(properties);
- validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx);
+ validateExternalDatasetProperties(externalDetails, properties, itemTypeEntity,
+ dd.getSourceLocation(), mdTxnCtx);
datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
TransactionState.COMMIT);
break;
@@ -3507,8 +3509,8 @@
}
protected void validateExternalDatasetProperties(ExternalDetailsDecl externalDetails,
- Map<String, String> properties, SourceLocation srcLoc, MetadataTransactionContext mdTxnCtx)
- throws AlgebricksException, HyracksDataException {
+ Map<String, String> properties, Datatype itemType, SourceLocation srcLoc,
+ MetadataTransactionContext mdTxnCtx) throws AlgebricksException, HyracksDataException {
// Validate adapter specific properties
String adapter = externalDetails.getAdapter();
Map<String, String> details = new HashMap<>(properties);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/create-dataset-inline-type-2/create-dataset-inline-type-2.5.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/create-dataset-inline-type-2/create-dataset-inline-type-2.5.ddl.sqlpp
new file mode 100644
index 0000000..62c0694
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/create-dataset-inline-type-2/create-dataset-inline-type-2.5.ddl.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* Create dataset that attempts to use unknown type
+ in inline type definition */
+
+USE test;
+
+CREATE DATASET Cust3X(
+ c_custkey integer not unknown,
+ c_name my_unknown_type
+) PRIMARY KEY c_custkey;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.1.query.sqlpp
new file mode 100644
index 0000000..c608831
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.1.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+with
+ t1 as [ { "x": 1, "y": [10, 11, 12] }, { "x": 2, "y": [20, 21, 22] } ],
+ t2 as [ 100, 101, 102, 103 ]
+from t1 as t1, t1.y as y join t2 as t2 on true
+select value count(*)
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.2.query.sqlpp
new file mode 100644
index 0000000..0909785
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.2.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+with
+ t1 as [ { "x": 1, "y": [10, 11, 12] }, { "x": 2, "y": [20, 21, 22] } ],
+ t2 as [ 100, 101, 102, 103 ],
+ t3 as [ 1000, 1001, 1002, 1003, 1004 ]
+from t1 as t1, t1.y as y join t2 as t2 on true join t3 as t3 on true
+select value count(*)
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.1.adm
new file mode 100644
index 0000000..cabf43b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.1.adm
@@ -0,0 +1 @@
+24
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.2.adm
new file mode 100644
index 0000000..8bc6583
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.2.adm
@@ -0,0 +1 @@
+120
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 40093f2..bcd9f18 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -3983,6 +3983,7 @@
<output-dir compare="Text">create-dataset-inline-type-2</output-dir>
<expected-error>ASX1082: Cannot find datatype with name test.$d$t$i$Cust1</expected-error>
<expected-error>ASX1082: Cannot find datatype with name test.$d$t$i$Cust2</expected-error>
+ <expected-error>ASX1082: Cannot find datatype with name my_unknown_type</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="ddl">
@@ -13298,6 +13299,11 @@
</test-group>
<test-group name="unnest">
<test-case FilePath="unnest">
+ <compilation-unit name="ASTERIXDB-2750_unnest_join">
+ <output-dir compare="Text">ASTERIXDB-2750_unnest_join</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="unnest">
<compilation-unit name="left-outer-unnest">
<output-dir compare="Text">left-outer-unnest</output-dir>
</compilation-unit>
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 6f756c7..e1b93b9 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -888,10 +888,10 @@
{
RecordTypeDefinition recordTypeDef = null;
RecordTypeDefinition.RecordKind recordKind = null;
- Token recordKindToken = null;
+ Token startToken = null, recordKindToken = null;
}
{
- <LEFTPAREN> recordTypeDef = DatasetRecordTypeDef() <RIGHTPAREN>
+ <LEFTPAREN> { startToken = token; } recordTypeDef = DatasetRecordTypeDef() <RIGHTPAREN>
( recordKind = RecordTypeKind() { recordKindToken = token; } <TYPE> )?
{
if (recordKind == null) {
@@ -900,7 +900,7 @@
throw createUnexpectedTokenError(recordKindToken);
}
recordTypeDef.setRecordKind(recordKind);
- return recordTypeDef;
+ return addSourceLocation(recordTypeDef, startToken);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/pom.xml b/hyracks-fullstack/hyracks/hyracks-util/pom.xml
index e2da20a..621e6f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-util/pom.xml
@@ -95,6 +95,10 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
index 29469d5..0d18a2b 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
@@ -22,7 +22,7 @@
public interface IRetryPolicy {
/**
* @param failure
- * the cause of the failure
+ * the cause of the failure (this cannot be null)
* @return true if one more attempt should be done
*/
boolean retry(Throwable failure);
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java
index bfc5fdd..be9874a 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java
@@ -18,7 +18,26 @@
*/
package org.apache.hyracks.util;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
@FunctionalInterface
public interface ThrowingConsumer<V> {
void process(V value) throws Exception;
+
+ @SuppressWarnings("Duplicates")
+ static <T> Consumer<T> asUnchecked(ThrowingConsumer<T> consumer) {
+ return input -> {
+ try {
+ consumer.process(input);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedExecutionException(e);
+ } catch (Exception e) {
+ throw new UncheckedExecutionException(e);
+ }
+ };
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIntConsumer.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIntConsumer.java
new file mode 100644
index 0000000..07575bb
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIntConsumer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.util.function.IntConsumer;
+
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+@FunctionalInterface
+public interface ThrowingIntConsumer {
+ void process(int value) throws Exception;
+
+ @SuppressWarnings("Duplicates")
+ static IntConsumer asUnchecked(ThrowingIntConsumer consumer) {
+ return input -> {
+ try {
+ consumer.process(input);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedExecutionException(e);
+ } catch (Exception e) {
+ throw new UncheckedExecutionException(e);
+ }
+ };
+ }
+
+}