Merge branch 'gerrit/mad-hatter'

Change-Id: I5cd06cc3cd69068faecced6a2b1cc558eecd0784
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
index b689e8f..c3fc56c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
@@ -149,7 +149,7 @@
     public static final String REWRITE_IN_AS_OR_OPTION = "rewrite_in_as_or";
     private static final boolean REWRITE_IN_AS_OR_OPTION_DEFAULT = true;
 
-    private Deque<Mutable<ILogicalOperator>> uncorrelatedLeftBranchStack = new ArrayDeque<>();
+    private Deque<Mutable<ILogicalOperator>> uncorrelatedRightBranchStack = new ArrayDeque<>();
     private final Map<VarIdentifier, IAObject> externalVars;
     private final boolean translateInAsOr;
 
@@ -297,10 +297,12 @@
             throws CompilationException {
         Mutable<ILogicalOperator> inputSrc = arg;
         Pair<ILogicalOperator, LogicalVariable> topUnnest = null;
+        uncorrelatedRightBranchStack.push(inputSrc);
         for (FromTerm fromTerm : fromClause.getFromTerms()) {
             topUnnest = fromTerm.accept(this, inputSrc);
             inputSrc = new MutableObject<>(topUnnest.first);
         }
+        uncorrelatedRightBranchStack.pop();
         return topUnnest;
     }
 
@@ -328,27 +330,20 @@
         Mutable<ILogicalOperator> topOpRef = new MutableObject<>(unnestOp);
         if (fromTerm.hasCorrelateClauses()) {
             for (AbstractBinaryCorrelateClause correlateClause : fromTerm.getCorrelateClauses()) {
-                if (correlateClause.getClauseType() == ClauseType.UNNEST_CLAUSE) {
-                    // Correlation is allowed.
-                    topOpRef = new MutableObject<>(correlateClause.accept(this, topOpRef).first);
-                } else {
-                    // Correlation is dis-allowed.
-                    uncorrelatedLeftBranchStack.push(topOpRef);
-                    topOpRef = new MutableObject<>(correlateClause.accept(this, tupSource).first);
-                }
+                topOpRef = new MutableObject<>(correlateClause.accept(this, topOpRef).first);
             }
         }
         return new Pair<>(topOpRef.getValue(), fromVar);
     }
 
     @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(JoinClause joinClause, Mutable<ILogicalOperator> inputRef)
+    public Pair<ILogicalOperator, LogicalVariable> visit(JoinClause joinClause, Mutable<ILogicalOperator> leftInputRef)
             throws CompilationException {
         SourceLocation sourceLoc = joinClause.getSourceLocation();
-        Mutable<ILogicalOperator> leftInputRef = uncorrelatedLeftBranchStack.pop();
         if (joinClause.getJoinType() == JoinType.INNER) {
+            Mutable<ILogicalOperator> rightInputRef = uncorrelatedRightBranchStack.peek();
             Pair<ILogicalOperator, LogicalVariable> rightBranch =
-                    generateUnnestForBinaryCorrelateRightBranch(joinClause, inputRef, true);
+                    generateUnnestForBinaryCorrelateRightBranch(joinClause, rightInputRef, true);
             // A join operator with condition TRUE.
             AbstractBinaryJoinOperator joinOperator = new InnerJoinOperator(
                     new MutableObject<>(ConstantExpression.TRUE), leftInputRef, new MutableObject<>(rightBranch.first));
@@ -362,7 +357,7 @@
             filter.getInputs().add(conditionExprOpPair.second);
             filter.setSourceLocation(conditionExprOpPair.first.getSourceLocation());
             return new Pair<>(filter, rightBranch.second);
-        } else {
+        } else if (joinClause.getJoinType() == JoinType.LEFTOUTER) {
             // Creates a subplan operator.
             SubplanOperator subplanOp = new SubplanOperator();
             subplanOp.getInputs().add(leftInputRef);
@@ -498,6 +493,9 @@
                 context.setVar(joinClause.getRightVariable(), outerUnnestVar);
             }
             return new Pair<>(currentTopOp, null);
+        } else {
+            throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, joinClause.getSourceLocation(),
+                    String.valueOf(joinClause.getJoinType().toString()));
         }
     }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/TypeTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/TypeTranslator.java
index aa00fb3..6e02fc2 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/TypeTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/TypeTranslator.java
@@ -28,6 +28,8 @@
 
 import org.apache.asterix.common.annotations.IRecordFieldDataGen;
 import org.apache.asterix.common.annotations.RecordDataGenAnnotation;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.lang.common.expression.OrderedListTypeDefinition;
 import org.apache.asterix.lang.common.expression.RecordTypeDefinition;
@@ -52,6 +54,7 @@
 import org.apache.asterix.om.types.TypeSignature;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class TypeTranslator {
 
@@ -73,9 +76,9 @@
         Map<TypeSignature, List<AbstractCollectionType>> incompleteItemTypes = new HashMap<>();
         Map<TypeSignature, List<TypeSignature>> incompleteTopLevelTypeReferences = new HashMap<>();
         firstPass(typeDataverse, typeName, typeExpr, outTypeMap, incompleteFieldTypes, incompleteItemTypes,
-                incompleteTopLevelTypeReferences, defaultDataverse);
+                incompleteTopLevelTypeReferences, typeDataverse);
         secondPass(mdTxnCtx, outTypeMap, incompleteFieldTypes, incompleteItemTypes, incompleteTopLevelTypeReferences,
-                typeDataverse);
+                typeDataverse, typeExpr.getSourceLocation());
 
         for (IAType type : outTypeMap.values()) {
             if (type.getTypeTag().isDerivedType()) {
@@ -91,7 +94,8 @@
             throws AlgebricksException {
 
         if (BuiltinTypeMap.getBuiltinType(typeName) != null) {
-            throw new AlgebricksException("Cannot redefine builtin type " + typeName + " .");
+            throw new CompilationException(ErrorCode.COMPILATION_ERROR, typeExpr.getSourceLocation(),
+                    "Cannot redefine builtin type " + typeName);
         }
         TypeSignature typeSignature = new TypeSignature(typeDataverse, typeName);
         switch (typeExpr.getTypeKind()) {
@@ -136,15 +140,15 @@
     private static void secondPass(MetadataTransactionContext mdTxnCtx, Map<TypeSignature, IAType> typeMap,
             Map<String, Map<ARecordType, List<Integer>>> incompleteFieldTypes,
             Map<TypeSignature, List<AbstractCollectionType>> incompleteItemTypes,
-            Map<TypeSignature, List<TypeSignature>> incompleteTopLevelTypeReferences, DataverseName typeDataverse)
-            throws AlgebricksException {
+            Map<TypeSignature, List<TypeSignature>> incompleteTopLevelTypeReferences, DataverseName typeDataverse,
+            SourceLocation sourceLoc) throws AlgebricksException {
         // solve remaining top level references
         for (TypeSignature typeSignature : incompleteTopLevelTypeReferences.keySet()) {
             IAType t;
             Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, typeSignature.getDataverseName(),
                     typeSignature.getName());
             if (dt == null) {
-                throw new AlgebricksException("Could not resolve type " + typeSignature);
+                throw new CompilationException(ErrorCode.UNKNOWN_TYPE, sourceLoc, typeSignature.getName());
             } else {
                 t = dt.getDatatype();
             }
@@ -161,7 +165,7 @@
                         trefName);
             }
             if (dt == null) {
-                throw new AlgebricksException("Could not resolve type " + trefName);
+                throw new CompilationException(ErrorCode.UNKNOWN_TYPE, sourceLoc, trefName);
             } else {
                 t = dt.getDatatype();
             }
@@ -188,7 +192,7 @@
                 dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, typeSignature.getDataverseName(),
                         typeSignature.getName());
                 if (dt == null) {
-                    throw new AlgebricksException("Could not resolve type " + typeSignature);
+                    throw new CompilationException(ErrorCode.UNKNOWN_TYPE, sourceLoc, typeSignature.getName());
                 }
                 t = dt.getDatatype();
             } else {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index bd70ed4..40ed7a4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -106,8 +106,8 @@
 
     protected Void doRecover(IRetryPolicy policy) throws AlgebricksException, InterruptedException {
         LOGGER.log(level, "Actual Recovery task has started");
-        Exception failure = null;
-        while (policy.retry(failure)) {
+        Exception failure;
+        do {
             synchronized (listener) {
                 while (!cancelRecovery && clusterStateManager.getState() != ClusterState.ACTIVE) {
                     listener.wait();
@@ -139,7 +139,7 @@
             } finally {
                 releaseRecoveryLocks(metadataProvider);
             }
-        }
+        } while (policy.retry(failure));
         // Recovery task is essntially over now either through failure or through cancellation(stop)
         synchronized (listener) {
             listener.notifyAll();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index c0a719b..a5fef11 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -691,10 +691,11 @@
                     throw new CompilationException(ErrorCode.DATASET_EXISTS, sourceLoc, datasetName, dataverseName);
                 }
             }
+            Datatype itemTypeEntity;
             IAType itemType;
             switch (itemTypeExpr.getTypeKind()) {
                 case TYPEREFERENCE:
-                    Datatype itemTypeEntity = metadataProvider.findTypeEntity(itemTypeDataverseName, itemTypeName);
+                    itemTypeEntity = metadataProvider.findTypeEntity(itemTypeDataverseName, itemTypeName);
                     if (itemTypeEntity == null || itemTypeEntity.getIsAnonymous()) {
                         // anonymous types cannot be referred from CREATE DATASET
                         throw new AsterixException(ErrorCode.UNKNOWN_TYPE, sourceLoc,
@@ -706,8 +707,8 @@
                 case RECORD:
                     itemType = translateType(itemTypeDataverseName, itemTypeName, itemTypeExpr, mdTxnCtx);
                     validateDatasetItemType(dsType, itemType, false, sourceLoc);
-                    MetadataManager.INSTANCE.addDatatype(mdTxnCtx,
-                            new Datatype(itemTypeDataverseName, itemTypeName, itemType, true));
+                    itemTypeEntity = new Datatype(itemTypeDataverseName, itemTypeName, itemType, true);
+                    MetadataManager.INSTANCE.addDatatype(mdTxnCtx, itemTypeEntity);
                     break;
                 default:
                     throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc,
@@ -783,7 +784,8 @@
                             createExternalDatasetProperties(dataverseName, dd, metadataProvider, mdTxnCtx);
                     ExternalDataUtils.normalize(properties);
                     ExternalDataUtils.validate(properties);
-                    validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx);
+                    validateExternalDatasetProperties(externalDetails, properties, itemTypeEntity,
+                            dd.getSourceLocation(), mdTxnCtx);
                     datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
                             TransactionState.COMMIT);
                     break;
@@ -3507,8 +3509,8 @@
     }
 
     protected void validateExternalDatasetProperties(ExternalDetailsDecl externalDetails,
-            Map<String, String> properties, SourceLocation srcLoc, MetadataTransactionContext mdTxnCtx)
-            throws AlgebricksException, HyracksDataException {
+            Map<String, String> properties, Datatype itemType, SourceLocation srcLoc,
+            MetadataTransactionContext mdTxnCtx) throws AlgebricksException, HyracksDataException {
         // Validate adapter specific properties
         String adapter = externalDetails.getAdapter();
         Map<String, String> details = new HashMap<>(properties);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/create-dataset-inline-type-2/create-dataset-inline-type-2.5.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/create-dataset-inline-type-2/create-dataset-inline-type-2.5.ddl.sqlpp
new file mode 100644
index 0000000..62c0694
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/create-dataset-inline-type-2/create-dataset-inline-type-2.5.ddl.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* Create dataset that attempts to use unknown type
+   in inline type definition */
+
+USE test;
+
+CREATE DATASET Cust3X(
+  c_custkey integer not unknown,
+  c_name my_unknown_type
+) PRIMARY KEY c_custkey;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.1.query.sqlpp
new file mode 100644
index 0000000..c608831
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.1.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+with
+  t1 as [ { "x": 1, "y": [10, 11, 12] }, { "x": 2, "y": [20, 21, 22] } ],
+  t2 as [ 100, 101, 102, 103 ]
+from t1 as t1, t1.y as y join t2 as t2 on true
+select value count(*)
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.2.query.sqlpp
new file mode 100644
index 0000000..0909785
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.2.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+with
+  t1 as [ { "x": 1, "y": [10, 11, 12] }, { "x": 2, "y": [20, 21, 22] } ],
+  t2 as [ 100, 101, 102, 103 ],
+  t3 as [ 1000, 1001, 1002, 1003, 1004 ]
+from t1 as t1, t1.y as y join t2 as t2 on true join t3 as t3 on true
+select value count(*)
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.1.adm
new file mode 100644
index 0000000..cabf43b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.1.adm
@@ -0,0 +1 @@
+24
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.2.adm
new file mode 100644
index 0000000..8bc6583
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/unnest/ASTERIXDB-2750_unnest_join/ASTERIXDB-2750_unnest_join.2.adm
@@ -0,0 +1 @@
+120
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 40093f2..bcd9f18 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -3983,6 +3983,7 @@
         <output-dir compare="Text">create-dataset-inline-type-2</output-dir>
         <expected-error>ASX1082: Cannot find datatype with name test.$d$t$i$Cust1</expected-error>
         <expected-error>ASX1082: Cannot find datatype with name test.$d$t$i$Cust2</expected-error>
+        <expected-error>ASX1082: Cannot find datatype with name my_unknown_type</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="ddl">
@@ -13298,6 +13299,11 @@
   </test-group>
   <test-group name="unnest">
     <test-case FilePath="unnest">
+      <compilation-unit name="ASTERIXDB-2750_unnest_join">
+        <output-dir compare="Text">ASTERIXDB-2750_unnest_join</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="unnest">
       <compilation-unit name="left-outer-unnest">
         <output-dir compare="Text">left-outer-unnest</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 6f756c7..e1b93b9 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -888,10 +888,10 @@
 {
   RecordTypeDefinition recordTypeDef = null;
   RecordTypeDefinition.RecordKind recordKind = null;
-  Token recordKindToken = null;
+  Token startToken = null, recordKindToken = null;
 }
 {
-   <LEFTPAREN> recordTypeDef = DatasetRecordTypeDef() <RIGHTPAREN>
+   <LEFTPAREN> { startToken = token; } recordTypeDef = DatasetRecordTypeDef() <RIGHTPAREN>
    ( recordKind = RecordTypeKind() { recordKindToken = token; } <TYPE> )?
    {
      if (recordKind == null) {
@@ -900,7 +900,7 @@
        throw createUnexpectedTokenError(recordKindToken);
      }
      recordTypeDef.setRecordKind(recordKind);
-     return recordTypeDef;
+     return addSourceLocation(recordTypeDef, startToken);
    }
 }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-util/pom.xml b/hyracks-fullstack/hyracks/hyracks-util/pom.xml
index e2da20a..621e6f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-util/pom.xml
@@ -95,6 +95,10 @@
       <groupId>org.apache.httpcomponents</groupId>
       <artifactId>httpclient</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
index 29469d5..0d18a2b 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
@@ -22,7 +22,7 @@
 public interface IRetryPolicy {
     /**
      * @param failure
-     *            the cause of the failure
+     *            the cause of the failure (this cannot be null)
      * @return true if one more attempt should be done
      */
     boolean retry(Throwable failure);
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java
index bfc5fdd..be9874a 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java
@@ -18,7 +18,26 @@
  */
 package org.apache.hyracks.util;
 
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
 @FunctionalInterface
 public interface ThrowingConsumer<V> {
     void process(V value) throws Exception;
+
+    @SuppressWarnings("Duplicates")
+    static <T> Consumer<T> asUnchecked(ThrowingConsumer<T> consumer) {
+        return input -> {
+            try {
+                consumer.process(input);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new UncheckedExecutionException(e);
+            } catch (Exception e) {
+                throw new UncheckedExecutionException(e);
+            }
+        };
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIntConsumer.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIntConsumer.java
new file mode 100644
index 0000000..07575bb
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIntConsumer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.util.function.IntConsumer;
+
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+@FunctionalInterface
+public interface ThrowingIntConsumer {
+    void process(int value) throws Exception;
+
+    @SuppressWarnings("Duplicates")
+    static IntConsumer asUnchecked(ThrowingIntConsumer consumer) {
+        return input -> {
+            try {
+                consumer.process(input);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new UncheckedExecutionException(e);
+            } catch (Exception e) {
+                throw new UncheckedExecutionException(e);
+            }
+        };
+    }
+
+}