[ASTERIXDB-2631][COMP][RT] Limit number of reported warnings

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Limit number of reported warnings.

- added a new compiler property "COMPILER.RUNTIME.WARNINGS"
  with default value set to 0.
- updated Stat class to have a field for the count of runtime
  warnings.
- updated few classes to propagate the warnings limit number
  to Hyracks Task class.
- added new Option to parse unsigned long values.
- TaskProfile has a field for the count.
- updated TestExecutor to allow specifying parameters in
the body of the POST method.

Change-Id: Ie8756f1b9dcb7bbce92bd77692490fbada9a2482
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3535
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
index c8751f1..a9680cb 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
@@ -87,6 +87,7 @@
 import org.apache.hyracks.algebricks.runtime.evaluators.EvaluatorContext;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
@@ -142,7 +143,8 @@
                 BinaryComparatorFactoryProvider.INSTANCE, TypeTraitProvider.INSTANCE, BinaryBooleanInspector.FACTORY,
                 BinaryIntegerInspector.FACTORY, ADMPrinterFactoryProvider.INSTANCE, MissingWriterFactory.INSTANCE, null,
                 new ExpressionRuntimeProvider(new QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager())),
-                ExpressionTypeComputer.INSTANCE, null, null, null, null, GlobalConfig.DEFAULT_FRAME_SIZE, null);
+                ExpressionTypeComputer.INSTANCE, null, null, null, null, GlobalConfig.DEFAULT_FRAME_SIZE, null,
+                appCtx.getCompilerProperties().getNumRuntimeWarnings());
     }
 
     @Override
@@ -305,9 +307,11 @@
                         isDuplicate = isDuplicateField(fieldName, fieldNameIdx, expr.getArguments());
                     }
                     if (isDuplicate) {
-                        optContext.getWarningCollector()
-                                .warn(WarningUtil.forAsterix(fieldNameExpr.second.getSourceLocation(),
-                                        ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME, fieldName));
+                        IWarningCollector warningCollector = optContext.getWarningCollector();
+                        if (warningCollector.shouldWarn()) {
+                            warningCollector.warn(WarningUtil.forAsterix(fieldNameExpr.second.getSourceLocation(),
+                                    ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME, fieldName));
+                        }
                         iterator.remove();
                         iterator.next();
                         iterator.remove();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveDuplicateFieldsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveDuplicateFieldsRule.java
index d457ce5..0a3b9c4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveDuplicateFieldsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveDuplicateFieldsRule.java
@@ -40,6 +40,7 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
 
 /**
  * <pre>
@@ -112,8 +113,11 @@
                 ILogicalExpression fieldNameExpr = iterator.next().getValue();
                 String fieldName = ConstantExpressionUtil.getStringConstant(fieldNameExpr);
                 if (fieldName != null && !fieldNames.add(fieldName)) {
-                    context.getWarningCollector().warn(WarningUtil.forAsterix(fieldNameExpr.getSourceLocation(),
-                            ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME, fieldName));
+                    IWarningCollector warningCollector = context.getWarningCollector();
+                    if (warningCollector.shouldWarn()) {
+                        warningCollector.warn(WarningUtil.forAsterix(fieldNameExpr.getSourceLocation(),
+                                ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME, fieldName));
+                    }
                     iterator.remove();
                     iterator.next();
                     iterator.remove();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index d7281e3..3718340 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -84,6 +84,7 @@
         private long size;
         private long processedObjects;
         private long diskIoCount;
+        private long totalWarningsCount;
 
         public long getCount() {
             return count;
@@ -116,6 +117,14 @@
         public void setDiskIoCount(long diskIoCount) {
             this.diskIoCount = diskIoCount;
         }
+
+        public long getTotalWarningsCount() {
+            return totalWarningsCount;
+        }
+
+        public void setTotalWarningsCount(long totalWarningsCount) {
+            this.totalWarningsCount = totalWarningsCount;
+        }
     }
 
     /**
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 69d998f..9e1dcdf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -130,17 +130,17 @@
 
     // A white list of supported configurable parameters.
     public static final String PREFIX_INTERNAL_PARAMETERS = "_internal";
-    private static final Set<String> CONFIGURABLE_PARAMETER_NAMES =
-            ImmutableSet.of(CompilerProperties.COMPILER_JOINMEMORY_KEY, CompilerProperties.COMPILER_GROUPMEMORY_KEY,
-                    CompilerProperties.COMPILER_SORTMEMORY_KEY, CompilerProperties.COMPILER_WINDOWMEMORY_KEY,
-                    CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY, CompilerProperties.COMPILER_PARALLELISM_KEY,
-                    CompilerProperties.COMPILER_SORT_PARALLEL_KEY, CompilerProperties.COMPILER_SORT_SAMPLES_KEY,
-                    FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, FuzzyUtils.SIM_FUNCTION_PROP_NAME,
-                    FuzzyUtils.SIM_THRESHOLD_PROP_NAME, StartFeedStatement.WAIT_FOR_COMPLETION,
-                    FeedActivityDetails.FEED_POLICY_NAME, FeedActivityDetails.COLLECT_LOCATIONS,
-                    SqlppQueryRewriter.INLINE_WITH_OPTION, SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION,
-                    "hash_merge", "output-record-type", AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION,
-                    DisjunctivePredicateToJoinRule.REWRITE_OR_AS_JOIN_OPTION);
+    private static final Set<String> CONFIGURABLE_PARAMETER_NAMES = ImmutableSet.of(
+            CompilerProperties.COMPILER_JOINMEMORY_KEY, CompilerProperties.COMPILER_GROUPMEMORY_KEY,
+            CompilerProperties.COMPILER_SORTMEMORY_KEY, CompilerProperties.COMPILER_WINDOWMEMORY_KEY,
+            CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY, CompilerProperties.COMPILER_PARALLELISM_KEY,
+            CompilerProperties.COMPILER_SORT_PARALLEL_KEY, CompilerProperties.COMPILER_SORT_SAMPLES_KEY,
+            FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, FuzzyUtils.SIM_FUNCTION_PROP_NAME,
+            FuzzyUtils.SIM_THRESHOLD_PROP_NAME, StartFeedStatement.WAIT_FOR_COMPLETION,
+            FeedActivityDetails.FEED_POLICY_NAME, FeedActivityDetails.COLLECT_LOCATIONS,
+            SqlppQueryRewriter.INLINE_WITH_OPTION, SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION,
+            "hash_merge", "output-record-type", AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION,
+            DisjunctivePredicateToJoinRule.REWRITE_OR_AS_JOIN_OPTION, CompilerProperties.COMPILER_RUNTIME_WARNINGS_KEY);
 
     private final IRewriterFactory rewriterFactory;
     private final IAstPrintVisitorFactory astPrintVisitorFactory;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
index 3ce09a6..a036696 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
@@ -31,6 +31,7 @@
     private long processedObjects;
     private long diskIoCount;
     private Set<Warning> warnings;
+    private long totalWarningsCount;
 
     public ResultMetadata(SessionConfig.OutputFormat format) {
         this.format = format;
@@ -56,10 +57,20 @@
         this.warnings = warnings;
     }
 
+    /**
+     * Sets the count of all warnings generated including unreported ones.
+     */
+    public void setTotalWarningsCount(long totalWarningsCount) {
+        this.totalWarningsCount = totalWarningsCount;
+    }
+
     public long getJobDuration() {
         return jobDuration;
     }
 
+    /**
+     * @return The reported warnings.
+     */
     public Set<Warning> getWarnings() {
         return warnings;
     }
@@ -72,6 +83,13 @@
         return diskIoCount;
     }
 
+    /**
+     * @return Total count of all warnings generated including unreported ones.
+     */
+    public long getTotalWarningsCount() {
+        return totalWarningsCount;
+    }
+
     @Override
     public String toString() {
         return "ResultMetadata{" + "format=" + format + ", jobDuration=" + jobDuration + ", processedObjects="
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index f395931..aff7441 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -135,6 +135,7 @@
         String executeQuery = request.getParameter("execute-query");
         response.setStatus(HttpResponseStatus.OK);
         try {
+            // TODO: warnings should be retrieved from warnings collectors
             IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
             IResultSet resultSet = ServletUtil.getResultSet(hcc, appCtx, ctx);
             IParser parser = parserFactory.createParser(query);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index a39c716..52c62af 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -100,7 +100,7 @@
                 printer.printResults();
                 ResponseMertics mertics = ResponseMertics.of(System.nanoTime() - elapsedStart,
                         metadata.getJobDuration(), stats.getCount(), stats.getSize(), metadata.getProcessedObjects(), 0,
-                        0, metadata.getDiskIoCount());
+                        metadata.getTotalWarningsCount(), metadata.getDiskIoCount());
                 printer.addFooterPrinter(new MetricsPrinter(mertics, HttpUtil.getPreferredCharset(request)));
                 printer.printFooters();
                 printer.end();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index c9aeabc..1e8f111 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -521,8 +521,7 @@
             execution.finish();
         }
         responsePrinter.printResults();
-        buildResponseFooters(elapsedStart, errorCount, stats, execution, warnings, resultCharset, responsePrinter,
-                delivery);
+        buildResponseFooters(elapsedStart, errorCount, stats, execution, resultCharset, responsePrinter, delivery);
         responsePrinter.printFooters();
         responsePrinter.end();
         if (sessionOutput.out().checkError()) {
@@ -556,15 +555,15 @@
     }
 
     protected void buildResponseFooters(long elapsedStart, long errorCount, Stats stats,
-            RequestExecutionState execution, List<Warning> warnings, Charset resultCharset,
-            ResponsePrinter responsePrinter, ResultDelivery delivery) {
+            RequestExecutionState execution, Charset resultCharset, ResponsePrinter responsePrinter,
+            ResultDelivery delivery) {
         if (ResultDelivery.ASYNC != delivery) {
             // in case of ASYNC delivery, the status is printed by query translator
             responsePrinter.addFooterPrinter(new StatusPrinter(execution.getResultStatus()));
         }
         final ResponseMertics mertics = ResponseMertics.of(System.nanoTime() - elapsedStart, execution.duration(),
-                stats.getCount(), stats.getSize(), stats.getProcessedObjects(), errorCount, warnings.size(),
-                stats.getDiskIoCount());
+                stats.getCount(), stats.getSize(), stats.getProcessedObjects(), errorCount,
+                stats.getTotalWarningsCount(), stats.getDiskIoCount());
         responsePrinter.addFooterPrinter(new MetricsPrinter(mertics, resultCharset));
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
index b152fc9..55c1f8e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
@@ -20,6 +20,7 @@
 
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.asterix.api.common.ResultMetadata;
@@ -69,24 +70,34 @@
     private void aggregateJobStats(JobId jobId, ResultMetadata metadata) {
         long processedObjects = 0;
         long diskIoCount = 0;
-        Set<Warning> warnings = new HashSet<>();
+        long aggregateTotalWarningsCount = 0;
+        Set<Warning> AggregateWarnings = new HashSet<>();
         IJobManager jobManager =
                 ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobManager();
         final JobRun run = jobManager.get(jobId);
         if (run != null) {
             final JobProfile jobProfile = run.getJobProfile();
             final Collection<JobletProfile> jobletProfiles = jobProfile.getJobletProfiles().values();
+            final long runtimeWarningsLimit = run.getJobSpecification().getRuntimeWarningsLimit();
             for (JobletProfile jp : jobletProfiles) {
                 final Collection<TaskProfile> jobletTasksProfile = jp.getTaskProfiles().values();
                 for (TaskProfile tp : jobletTasksProfile) {
                     processedObjects += tp.getStatsCollector().getAggregatedStats().getTupleCounter().get();
                     diskIoCount += tp.getStatsCollector().getAggregatedStats().getDiskIoCounter().get();
-                    warnings.addAll(tp.getWarnings());
+                    aggregateTotalWarningsCount += tp.getTotalWarningsCount();
+                    Set<Warning> taskWarnings = tp.getWarnings();
+                    if (AggregateWarnings.size() < runtimeWarningsLimit && !taskWarnings.isEmpty()) {
+                        Iterator<Warning> taskWarningsIt = taskWarnings.iterator();
+                        while (AggregateWarnings.size() < runtimeWarningsLimit && taskWarningsIt.hasNext()) {
+                            AggregateWarnings.add(taskWarningsIt.next());
+                        }
+                    }
                 }
             }
         }
         metadata.setProcessedObjects(processedObjects);
-        metadata.setWarnings(warnings);
+        metadata.setWarnings(AggregateWarnings);
         metadata.setDiskIoCount(diskIoCount);
+        metadata.setTotalWarningsCount(aggregateTotalWarningsCount);
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java
index 531e39e..90b0656 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java
@@ -63,6 +63,7 @@
         if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE && !resultSets.isEmpty()) {
             stats.setProcessedObjects(responseMsg.getStats().getProcessedObjects());
             stats.setDiskIoCount(responseMsg.getStats().getDiskIoCount());
+            stats.setTotalWarningsCount(responseMsg.getStats().getTotalWarningsCount());
             for (int i = 0; i < resultSets.size(); i++) {
                 Triple<JobId, ResultSetId, ARecordType> rsmd = resultSets.get(i);
                 ResultReader resultReader = new ResultReader(resultSet, rsmd.getLeft(), rsmd.getMiddle());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 74997f7..1838c4c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2550,6 +2550,7 @@
                         .getResultMetadata(jobId, rsId);
         stats.setProcessedObjects(resultMetadata.getProcessedObjects());
         stats.setDiskIoCount(resultMetadata.getDiskIoCount());
+        stats.setTotalWarningsCount(resultMetadata.getTotalWarningsCount());
         warningCollector.warn(resultMetadata.getWarnings());
     }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 344a3ba..55b8146 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -152,6 +152,7 @@
     public static final Set<String> NON_CANCELLABLE =
             Collections.unmodifiableSet(new HashSet<>(Arrays.asList("store", "validate")));
     private static final int MAX_NON_UTF_8_STATEMENT_SIZE = 64 * 1024;
+    private static final ContentType TEXT_PLAIN_UTF8 = ContentType.create(HttpUtil.ContentType.APPLICATION_JSON, UTF_8);
 
     private final IPollTask plainExecutor = (testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit,
             queryCount, expectedResultFileCtxs, testFile, actualPath, actualWarnCount) -> executeTestFile(testCaseCtx,
@@ -721,20 +722,21 @@
         return builder.build();
     }
 
-    private HttpUriRequest buildRequest(String method, URI uri, List<Parameter> params, Optional<String> body) {
+    private HttpUriRequest buildRequest(String method, URI uri, List<Parameter> params, Optional<String> body,
+            ContentType contentType) {
         RequestBuilder builder = RequestBuilder.create(method);
         builder.setUri(uri);
         for (Parameter param : params) {
             builder.addParameter(param.getName(), param.getValue());
         }
         builder.setCharset(UTF_8);
-        body.ifPresent(s -> builder.setEntity(new StringEntity(s, UTF_8)));
+        body.ifPresent(s -> builder.setEntity(new StringEntity(s, contentType)));
         return builder.build();
     }
 
     private HttpUriRequest buildRequest(String method, URI uri, OutputFormat fmt, List<Parameter> params,
-            Optional<String> body) {
-        HttpUriRequest request = buildRequest(method, uri, params, body);
+            Optional<String> body, ContentType contentType) {
+        HttpUriRequest request = buildRequest(method, uri, params, body, contentType);
         // Set accepted output response type
         request.setHeader("Accept", fmt.mimeType());
         return request;
@@ -807,21 +809,23 @@
 
     public InputStream executeJSONGet(OutputFormat fmt, URI uri, List<Parameter> params,
             Predicate<Integer> responseCodeValidator) throws Exception {
-        return executeJSON(fmt, "GET", uri, params, responseCodeValidator, Optional.empty());
+        return executeJSON(fmt, "GET", uri, params, responseCodeValidator, Optional.empty(), TEXT_PLAIN_UTF8);
     }
 
     public InputStream executeJSON(OutputFormat fmt, String method, URI uri, List<Parameter> params) throws Exception {
-        return executeJSON(fmt, method, uri, params, code -> code == HttpStatus.SC_OK, Optional.empty());
+        return executeJSON(fmt, method, uri, params, code -> code == HttpStatus.SC_OK, Optional.empty(),
+                TEXT_PLAIN_UTF8);
     }
 
     public InputStream executeJSON(OutputFormat fmt, String method, URI uri, Predicate<Integer> responseCodeValidator)
             throws Exception {
-        return executeJSON(fmt, method, uri, Collections.emptyList(), responseCodeValidator, Optional.empty());
+        return executeJSON(fmt, method, uri, Collections.emptyList(), responseCodeValidator, Optional.empty(),
+                TEXT_PLAIN_UTF8);
     }
 
-    public InputStream executeJSON(OutputFormat fmt, String method, URI uri, List<Parameter> params,
-            Predicate<Integer> responseCodeValidator, Optional<String> body) throws Exception {
-        HttpUriRequest request = buildRequest(method, uri, fmt, params, body);
+    private InputStream executeJSON(OutputFormat fmt, String method, URI uri, List<Parameter> params,
+            Predicate<Integer> responseCodeValidator, Optional<String> body, ContentType contentType) throws Exception {
+        HttpUriRequest request = buildRequest(method, uri, fmt, params, body, contentType);
         HttpResponse response = executeAndCheckHttpRequest(request, responseCodeValidator);
         return response.getEntity().getContent();
     }
@@ -1234,11 +1238,14 @@
         final Optional<String> body = extractBody(statement);
         final Predicate<Integer> statusCodePredicate = extractStatusCodePredicate(statement);
         final boolean extracResult = isExtracResult(statement);
+        final String mimeReqType = extractHttpRequestType(statement);
+        ContentType contentType = mimeReqType != null ? ContentType.create(mimeReqType, UTF_8) : TEXT_PLAIN_UTF8;
         InputStream resultStream;
         if ("http".equals(extension)) {
-            resultStream = executeHttp(reqType, variablesReplaced, fmt, params, statusCodePredicate, body);
+            resultStream = executeHttp(reqType, variablesReplaced, fmt, params, statusCodePredicate, body, contentType);
         } else if ("uri".equals(extension)) {
-            resultStream = executeURI(reqType, URI.create(variablesReplaced), fmt, params, statusCodePredicate, body);
+            resultStream = executeURI(reqType, URI.create(variablesReplaced), fmt, params, statusCodePredicate, body,
+                    contentType);
             if (extracResult) {
                 resultStream = ResultExtractor.extract(resultStream, UTF_8).getResult();
             }
@@ -1624,10 +1631,10 @@
     }
 
     protected InputStream executeHttp(String ctxType, String endpoint, OutputFormat fmt, List<Parameter> params,
-            Predicate<Integer> statusCodePredicate, Optional<String> body) throws Exception {
+            Predicate<Integer> statusCodePredicate, Optional<String> body, ContentType contentType) throws Exception {
         String[] split = endpoint.split("\\?");
         URI uri = createEndpointURI(split[0], split.length > 1 ? split[1] : null);
-        return executeURI(ctxType, uri, fmt, params, statusCodePredicate, body);
+        return executeURI(ctxType, uri, fmt, params, statusCodePredicate, body, contentType);
     }
 
     private InputStream executeURI(String ctxType, URI uri, OutputFormat fmt, List<Parameter> params) throws Exception {
@@ -1635,8 +1642,8 @@
     }
 
     private InputStream executeURI(String ctxType, URI uri, OutputFormat fmt, List<Parameter> params,
-            Predicate<Integer> responseCodeValidator, Optional<String> body) throws Exception {
-        return executeJSON(fmt, ctxType.toUpperCase(), uri, params, responseCodeValidator, body);
+            Predicate<Integer> responseCodeValidator, Optional<String> body, ContentType contentType) throws Exception {
+        return executeJSON(fmt, ctxType.toUpperCase(), uri, params, responseCodeValidator, body, contentType);
     }
 
     public void killNC(String nodeId, CompilationUnit cUnit) throws Exception {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
index 4f5fe7a..611d5ea 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
@@ -55,6 +55,7 @@
 import org.apache.asterix.testframework.xml.ComparisonEnum;
 import org.apache.asterix.testframework.xml.TestCase.CompilationUnit;
 import org.apache.asterix.testframework.xml.TestGroup;
+import org.apache.hyracks.test.support.TestUtils;
 import org.junit.Assert;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -150,8 +151,7 @@
                     Query query = (Query) st;
                     IQueryRewriter rewriter = sqlppRewriterFactory.createQueryRewriter();
                     rewrite(rewriter, functions, query, metadataProvider,
-                            new LangRewritingContext(query.getVarCounter(), w -> {
-                            }));
+                            new LangRewritingContext(query.getVarCounter(), TestUtils.NOOP_WARNING_COLLECTOR));
 
                     // Tests deep copy and deep equality.
                     Query copiedQuery = (Query) SqlppRewriteUtil.deepCopy(query);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/min-max-incompatible-types/min-max-incompatible-types.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/min-max-incompatible-types/min-max-incompatible-types.1.query.sqlpp
index a392016..90c5419 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/min-max-incompatible-types/min-max-incompatible-types.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/min-max-incompatible-types/min-max-incompatible-types.1.query.sqlpp
@@ -17,4 +17,6 @@
  * under the License.
  */
 
+SET `compiler.runtime.warnings` "10";
+
 SELECT VALUE ARRAY_MIN([1, '2']);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/min-max-incompatible-types/min-max-incompatible-types.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/min-max-incompatible-types/min-max-incompatible-types.2.query.sqlpp
index b6eb3ed..5775d55 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/min-max-incompatible-types/min-max-incompatible-types.2.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/min-max-incompatible-types/min-max-incompatible-types.2.query.sqlpp
@@ -17,4 +17,6 @@
  * under the License.
  */
 
+SET `compiler.runtime.warnings` "1000";
+
 SELECT VALUE MIN(ds.InternalDetails) FROM Metadata.`Dataset` ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.01.ddl.sqlpp
new file mode 100644
index 0000000..0b3ca1e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.01.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create type t1 AS {id:int};
+create dataset ds(t1) primary key id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.02.update.sqlpp
new file mode 100644
index 0000000..92d4992
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.02.update.sqlpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// 10 runtime warnings
+
+use test;
+
+insert into ds([
+{"id": 1,  "f": 1},
+{"id": 2,  "f": "str1"},
+{"id": 3,  "f": 1},
+{"id": 4,  "f": "str2"},
+{"id": 5,  "f": 1},
+{"id": 6,  "f": "str3"},
+{"id": 7,  "f": "str4"},
+{"id": 8,  "f": "str5"},
+{"id": 9,  "f": "str6"},
+{"id": 10, "f": "str7"},
+{"id": 11, "f": 1},
+{"id": 12, "f": "str8"},
+{"id": 13, "f": "str9"},
+{"id": 14, "f": "str10"}
+]);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.03.post.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.03.post.http
new file mode 100644
index 0000000..804c80d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.03.post.http
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description     : test limiting number of reported runtime warnings
+ * Expected Result : success with 10 runtime warnings count
+ */
+-- requesttype=application/json
+/query/service
+--body={"statement":"USE test; SET `compiler.runtime.warnings` \"3\"; SELECT {\"a\":1, \"a\":2} as F1, isbitset(6, ds.f) AS F2 \/*+ hint*\/ FROM ds order by ds.id;"}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.04.post.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.04.post.http
new file mode 100644
index 0000000..3c308e1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.04.post.http
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description     : test limiting number of reported runtime warnings
+ * Expected Result : success with 10 runtime warnings count
+ */
+-- requesttype=application/json
+/query/service
+--body={"statement":"USE test; SET `compiler.runtime.warnings` \"0\"; SELECT {\"a\":1 } as F1, isbitset(6, ds.f) AS F2 FROM ds order by ds.id;"}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.05.post.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.05.post.http
new file mode 100644
index 0000000..b2ed679
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.05.post.http
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description     : test limiting number of reported runtime warnings (DEFAULT)
+ * Expected Result : success with 10 runtime warnings count
+ */
+-- requesttype=application/json
+/query/service
+--body={"statement":"USE test; SELECT {\"a\":1 , \"a\":2} as F1, isbitset(6, ds.f) AS F2 FROM ds order by ds.id;"}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.06.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.06.ddl.sqlpp
new file mode 100644
index 0000000..548e632
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/warnings/warnings_limit/warnings_limit.06.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index a23829f..31eeba0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -12,6 +12,7 @@
     "compiler\.groupmemory" : 163840,
     "compiler\.joinmemory" : 262144,
     "compiler\.parallelism" : 0,
+    "compiler\.runtime\.warnings" : 0,
     "compiler\.sort\.parallel" : false,
     "compiler\.sort\.samples" : 100,
     "compiler\.sortmemory" : 327680,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 48e92b0..b913a81 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -12,6 +12,7 @@
     "compiler\.groupmemory" : 163840,
     "compiler\.joinmemory" : 262144,
     "compiler\.parallelism" : -1,
+    "compiler\.runtime\.warnings" : 0,
     "compiler\.sort\.parallel" : true,
     "compiler\.sort\.samples" : 100,
     "compiler\.sortmemory" : 327680,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index b631b8c..f4ab69a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -12,6 +12,7 @@
     "compiler\.groupmemory" : 163840,
     "compiler\.joinmemory" : 262144,
     "compiler\.parallelism" : 3,
+    "compiler\.runtime\.warnings" : 0,
     "compiler\.sort\.parallel" : true,
     "compiler\.sort\.samples" : 100,
     "compiler\.sortmemory" : 327680,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/warnings/warnings_limit/warnings_limit.03.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/warnings/warnings_limit/warnings_limit.03.regexadm
new file mode 100644
index 0000000..ae67f93
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/warnings/warnings_limit/warnings_limit.03.regexadm
@@ -0,0 +1,39 @@
+\Q{\E
+\s*\Q"requestID": "\E[a-zA-Z0-9-]+\Q",\E
+\s*\Q"signature": {\E
+\s*\Q"*": "*"\E
+\s*\Q},\E
+\s*\Q"results": [ { "F1": { "a": 1 }, "F2": false }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": false }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": false }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": false }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q]\E
+\s*\Q,\E
+\s*\Q"plans":{},\E
+\s*\Q"warnings": [{\E\s*
+\s*\Q"code": 1,\E\s*\Q"msg": "\E[^}]+\Q}\E\s*
+\s*\Q,{\E\s*
+\s*\Q"code": 1,\E\s*\Q"msg": "\E[^}]+\Q}\E\s*
+\s*\Q,{\E\s*
+\s*\Q"code": 1,\E\s*\Q"msg": "\E[^}]+\Q}\E\s*
+\s*\Q],\E
+\s*\Q"status": "success",\E
+\s*\Q"metrics": {\E
+\s*\Q"elapsedTime": "\E[^"]+\Q",\E
+\s*\Q"executionTime": "\E[^"]+\Q",\E
+\s*\Q"resultCount": \E[0-9]+\Q,\E
+\s*\Q"resultSize": \E[0-9]+\Q,\E
+\s*\Q"processedObjects": \E[0-9]+\Q,\E
+\s*\Q"warningCount": 10\E
+\s*\Q}\E
+\s*\Q}\E\s*
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/warnings/warnings_limit/warnings_limit.04.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/warnings/warnings_limit/warnings_limit.04.regexadm
new file mode 100644
index 0000000..8eb28ea
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/warnings/warnings_limit/warnings_limit.04.regexadm
@@ -0,0 +1,32 @@
+\Q{\E
+\s*\Q"requestID": "\E[a-zA-Z0-9-]+\Q",\E
+\s*\Q"signature": {\E
+\s*\Q"*": "*"\E
+\s*\Q},\E
+\s*\Q"results": [ { "F1": { "a": 1 }, "F2": false }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": false }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": false }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": false }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q]\E
+\s*\Q,\E
+\s*\Q"plans":{},\E
+\s*\Q"status": "success",\E
+\s*\Q"metrics": {\E
+\s*\Q"elapsedTime": "\E[^"]+\Q",\E
+\s*\Q"executionTime": "\E[^"]+\Q",\E
+\s*\Q"resultCount": \E[0-9]+\Q,\E
+\s*\Q"resultSize": \E[0-9]+\Q,\E
+\s*\Q"processedObjects": \E[0-9]+\Q,\E
+\s*\Q"warningCount": 10\E
+\s*\Q}\E
+\s*\Q}\E\s*
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/warnings/warnings_limit/warnings_limit.05.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/warnings/warnings_limit/warnings_limit.05.regexadm
new file mode 100644
index 0000000..3637bde
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/warnings/warnings_limit/warnings_limit.05.regexadm
@@ -0,0 +1,35 @@
+\Q{\E
+\s*\Q"requestID": "\E[a-zA-Z0-9-]+\Q",\E
+\s*\Q"signature": {\E
+\s*\Q"*": "*"\E
+\s*\Q},\E
+\s*\Q"results": [ { "F1": { "a": 1 }, "F2": false }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": false }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": false }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": false }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q, { "F1": { "a": 1 }, "F2": null }\E
+\s*\Q]\E
+\s*\Q,\E
+\s*\Q"plans":{},\E
+\s*\Q"warnings": [{\E\s*
+\s*\Q"code": 1,\E\s*\Q"msg": "\E[^}]+\Q}\E\s*
+\s*\Q],\E
+\s*\Q"status": "success",\E
+\s*\Q"metrics": {\E
+\s*\Q"elapsedTime": "\E[^"]+\Q",\E
+\s*\Q"executionTime": "\E[^"]+\Q",\E
+\s*\Q"resultCount": \E[0-9]+\Q,\E
+\s*\Q"resultSize": \E[0-9]+\Q,\E
+\s*\Q"processedObjects": \E[0-9]+\Q,\E
+\s*\Q"warningCount": 10\E
+\s*\Q}\E
+\s*\Q}\E\s*
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 6227109..30ac1e3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -12497,5 +12497,10 @@
         <expected-warn>ASX1107: Unexpected hint: unknown_hint_elsewhere. None expected at this location</expected-warn>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="warnings">
+      <compilation-unit name="warnings_limit">
+        <output-dir compare="Clean-JSON">warnings_limit</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
index b8d73be..f30956e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
@@ -18,7 +18,13 @@
  */
 package org.apache.asterix.common.config;
 
-import static org.apache.hyracks.control.common.config.OptionTypes.*;
+import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
+import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
+import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
+import static org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT;
+import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
+import static org.apache.hyracks.control.common.config.OptionTypes.UNSIGNED_INTEGER;
+import static org.apache.hyracks.control.common.config.OptionTypes.UNSIGNED_LONG;
 import static org.apache.hyracks.util.StorageUtil.StorageUnit.KILOBYTE;
 import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE;
 
@@ -68,7 +74,8 @@
         COMPILER_SORT_SAMPLES(
                 POSITIVE_INTEGER,
                 AlgebricksConfig.SORT_SAMPLES,
-                "The number of samples which parallel sorting should take from each partition");
+                "The number of samples which parallel sorting should take from each partition"),
+        COMPILER_RUNTIME_WARNINGS(UNSIGNED_LONG, 0L, "The maximum number of runtime warnings to be reported");
 
         private final IOptionType type;
         private final Object defaultValue;
@@ -122,6 +129,8 @@
 
     public static final String COMPILER_SORT_SAMPLES_KEY = Option.COMPILER_SORT_SAMPLES.ini();
 
+    public static final String COMPILER_RUNTIME_WARNINGS_KEY = Option.COMPILER_RUNTIME_WARNINGS.ini();
+
     public static final int COMPILER_PARALLELISM_AS_STORAGE = 0;
 
     public CompilerProperties(PropertiesAccessor accessor) {
@@ -166,7 +175,10 @@
     }
 
     public int getSortSamples() {
-        int numSamples = accessor.getInt(Option.COMPILER_SORT_SAMPLES);
-        return numSamples > 0 ? numSamples : AlgebricksConfig.SORT_SAMPLES;
+        return accessor.getInt(Option.COMPILER_SORT_SAMPLES);
+    }
+
+    public long getNumRuntimeWarnings() {
+        return accessor.getLong(Option.COMPILER_RUNTIME_WARNINGS);
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
index b29cb61..93ab5db 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
@@ -59,6 +59,7 @@
         int textSearchFrameLimit = getTextSearchNumFrames(compilerProperties, querySpecificConfig, sourceLoc);
         int sortNumSamples = getSortSamples(compilerProperties, querySpecificConfig, sourceLoc);
         boolean fullParallelSort = getSortParallel(compilerProperties, querySpecificConfig);
+        long runtimeWarningsLimit = getRuntimeWarningsLimit(compilerProperties, querySpecificConfig, sourceLoc);
 
         PhysicalOptimizationConfig physOptConf = new PhysicalOptimizationConfig();
         physOptConf.setFrameSize(frameSize);
@@ -69,6 +70,7 @@
         physOptConf.setMaxFramesForTextSearch(textSearchFrameLimit);
         physOptConf.setSortParallel(fullParallelSort);
         physOptConf.setSortSamples(sortNumSamples);
+        physOptConf.setRuntimeWarningsLimit(runtimeWarningsLimit);
 
         return physOptConf;
     }
@@ -102,7 +104,7 @@
         int frameLimit = (int) (memBudget / frameSize);
         if (frameLimit < minFrameLimit) {
             throw AsterixException.create(ErrorCode.COMPILATION_BAD_QUERY_PARAMETER_VALUE, sourceLoc, parameterName,
-                    frameSize * minFrameLimit);
+                    frameSize * minFrameLimit, "bytes");
         }
         // sets the frame limit to the minimum frame limit if the calculated frame limit is too small.
         return Math.max(frameLimit, minFrameLimit);
@@ -126,7 +128,20 @@
                     : OptionTypes.POSITIVE_INTEGER.parse(valueInQuery);
         } catch (IllegalArgumentException e) {
             throw AsterixException.create(ErrorCode.COMPILATION_BAD_QUERY_PARAMETER_VALUE, sourceLoc,
-                    CompilerProperties.COMPILER_SORT_SAMPLES_KEY, 1);
+                    CompilerProperties.COMPILER_SORT_SAMPLES_KEY, 1, "samples");
+        }
+    }
+
+    @SuppressWarnings("squid:S1166") // Either log or rethrow this exception
+    private static long getRuntimeWarningsLimit(CompilerProperties compilerProperties,
+            Map<String, Object> querySpecificConfig, SourceLocation sourceLoc) throws AsterixException {
+        String valueInQuery = (String) querySpecificConfig.get(CompilerProperties.COMPILER_RUNTIME_WARNINGS_KEY);
+        try {
+            return valueInQuery == null ? compilerProperties.getNumRuntimeWarnings()
+                    : OptionTypes.UNSIGNED_LONG.parse(valueInQuery);
+        } catch (IllegalArgumentException e) {
+            throw AsterixException.create(ErrorCode.COMPILATION_BAD_QUERY_PARAMETER_VALUE, sourceLoc,
+                    CompilerProperties.COMPILER_RUNTIME_WARNINGS_KEY, 0, "warnings");
         }
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/WarningCollector.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/WarningCollector.java
index a624e71..53fa760 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/WarningCollector.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/WarningCollector.java
@@ -39,6 +39,17 @@
         this.warnings.add(warning);
     }
 
+    @Override
+    public boolean shouldWarn() {
+        // this warning collector currently always collects warnings
+        return true;
+    }
+
+    @Override
+    public long getTotalWarningsCount() {
+        return warnings.size();
+    }
+
     public void warn(Collection<Warning> warnings) {
         this.warnings.addAll(warnings);
     }
@@ -49,7 +60,9 @@
 
     public void getWarnings(IWarningCollector outWarningCollector) {
         for (Warning warning : warnings) {
-            outWarningCollector.warn(warning);
+            if (outWarningCollector.shouldWarn()) {
+                outWarningCollector.warn(warning);
+            }
         }
     }
 }
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 73f87df..cb48e19 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -115,7 +115,7 @@
 1034 = Tokenizer is not applicable to the given index kind %1$s
 1035 = Incompatible search modifier %1$s for index type %2$s
 1036 = Unknown search modifier type %1$s
-1037 = Invalid query parameter %1$s -- value has to be greater than or equal to %2$s bytes
+1037 = Invalid query parameter %1$s -- value has to be greater than or equal to %2$s %3$s
 1038 = Illegal state. %1$s
 1039 = Two-phase locking violation -- locks can not be acquired after unlocking
 1040 = Dataset id space is exhausted
diff --git a/asterixdb/asterix-doc/src/site/markdown/ncservice.md b/asterixdb/asterix-doc/src/site/markdown/ncservice.md
index 8d1a8cd..940c2d9 100644
--- a/asterixdb/asterix-doc/src/site/markdown/ncservice.md
+++ b/asterixdb/asterix-doc/src/site/markdown/ncservice.md
@@ -349,6 +349,7 @@
 | common  | compiler.sort.samples                     | The number of samples taken from each partition to guide the sort operation when full parallel sort is enabled | 100 |
 | common  | compiler.textsearchmemory                 | The memory budget (in bytes) for an inverted-index-search operator instance in a partition | 33554432 (32 MB) |
 | common  | compiler.windowmemory                     | The memory budget (in bytes) for a window operator instance in a partition | 33554432 (32 MB) |
+| common  | compiler.runtime.warnings                 | The maximum number of runtime warnings to be reported | 0 |
 | common  | log.level                                 | The logging level for master and slave processes | WARNING |
 | common  | max.wait.active.cluster                   | The max pending time (in seconds) for cluster startup. After the threshold, if the cluster still is not up and running, it is considered unavailable | 60 |
 | common  | messaging.frame.count                     | Number of reusable frames for NC to NC messaging | 512 |
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/AQLVariableSubstitutionUtil.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/AQLVariableSubstitutionUtil.java
index 16e081f..eb087b8 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/AQLVariableSubstitutionUtil.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/AQLVariableSubstitutionUtil.java
@@ -27,13 +27,32 @@
 import org.apache.asterix.lang.common.expression.VariableExpr;
 import org.apache.asterix.lang.common.rewrites.LangRewritingContext;
 import org.apache.asterix.lang.common.rewrites.VariableSubstitutionEnvironment;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
 
 public class AQLVariableSubstitutionUtil {
 
+    private AQLVariableSubstitutionUtil() {
+    }
+
     public static ILangExpression substituteVariable(ILangExpression expression,
             Map<VariableExpr, Expression> varExprMap) throws CompilationException {
         AQLCloneAndSubstituteVariablesVisitor visitor =
-                new AQLCloneAndSubstituteVariablesVisitor(new LangRewritingContext(0, w -> {
+                new AQLCloneAndSubstituteVariablesVisitor(new LangRewritingContext(0, new IWarningCollector() {
+                    @Override
+                    public void warn(Warning warning) {
+                        // no-op
+                    }
+
+                    @Override
+                    public boolean shouldWarn() {
+                        return false;
+                    }
+
+                    @Override
+                    public long getTotalWarningsCount() {
+                        return 0;
+                    }
                 }));
         VariableSubstitutionEnvironment env = new VariableSubstitutionEnvironment(varExprMap);
         return expression.accept(visitor, env).first;
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 256e96d..1dc30fd 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -467,7 +467,9 @@
     }
 
     private void warnUnexpectedHint(String actualHint, SourceLocation sourceLoc, String expectedHints) {
-        warningCollector.warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.UNEXPECTED_HINT, actualHint, expectedHints));
+        if (warningCollector.shouldWarn()) {
+            warningCollector.warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.UNEXPECTED_HINT, actualHint, expectedHints));
+        }
     }
 }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
index 2c49f71..ce7e3c8 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
@@ -161,14 +162,19 @@
     }
 
     private void handleIncompatibleInput(ATypeTag typeTag) {
-        context.getWarningCollector()
-                .warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.TYPE_INCOMPATIBLE, "min/max", aggType, typeTag));
+        IWarningCollector warningCollector = context.getWarningCollector();
+        if (warningCollector.shouldWarn()) {
+            warningCollector
+                    .warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.TYPE_INCOMPATIBLE, "min/max", aggType, typeTag));
+        }
         this.aggType = ATypeTag.NULL;
     }
 
     private void handleUnsupportedInput(ATypeTag typeTag) {
-        context.getWarningCollector()
-                .warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.TYPE_UNSUPPORTED, "min/max", typeTag));
+        IWarningCollector warningCollector = context.getWarningCollector();
+        if (warningCollector.shouldWarn()) {
+            warningCollector.warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.TYPE_UNSUPPORTED, "min/max", typeTag));
+        }
         this.aggType = ATypeTag.NULL;
     }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractScalarEval.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractScalarEval.java
index 6cb3666..72d4bdc 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractScalarEval.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractScalarEval.java
@@ -19,8 +19,15 @@
 
 package org.apache.asterix.runtime.evaluators.functions;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.WarningUtil;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.runtime.exceptions.ExceptionUtil;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public abstract class AbstractScalarEval implements IScalarEvaluator {
@@ -31,4 +38,25 @@
         this.sourceLoc = sourceLoc;
         this.functionIdentifier = functionIdentifier;
     }
+
+    protected void handleTypeMismatchInput(IEvaluatorContext context, int inputPosition, ATypeTag expected,
+            byte[] actualTypeBytes, int startOffset) {
+        IWarningCollector warningCollector = context.getWarningCollector();
+        if (warningCollector.shouldWarn()) {
+            ATypeTag actual = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(actualTypeBytes[startOffset]);
+            warningCollector.warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.TYPE_MISMATCH_FUNCTION,
+                    functionIdentifier, ExceptionUtil.indexToPosition(inputPosition), expected, actual));
+        }
+    }
+
+    protected void handleTypeMismatchInput(IEvaluatorContext context, int inputPosition, byte[] expected,
+            byte[] actualTypeBytes, int startOffset) {
+        IWarningCollector warningCollector = context.getWarningCollector();
+        if (warningCollector.shouldWarn()) {
+            ATypeTag actual = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(actualTypeBytes[startOffset]);
+            warningCollector.warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.TYPE_MISMATCH_FUNCTION,
+                    functionIdentifier, ExceptionUtil.indexToPosition(inputPosition),
+                    ExceptionUtil.toExpectedTypeString(expected), actual));
+        }
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/AbstractBitMultipleValuesEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/AbstractBitMultipleValuesEvaluator.java
index a9f9ea8..a72de56 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/AbstractBitMultipleValuesEvaluator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/AbstractBitMultipleValuesEvaluator.java
@@ -19,17 +19,13 @@
 
 package org.apache.asterix.runtime.evaluators.functions.bitwise;
 
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.WarningUtil;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.om.base.AMutableInt64;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.EnumDeserializer;
 import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
 import org.apache.asterix.runtime.evaluators.functions.AbstractScalarEval;
 import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
-import org.apache.asterix.runtime.exceptions.ExceptionUtil;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
@@ -119,7 +115,7 @@
 
         // Type and value validity check
         if (!PointableHelper.isValidLongValue(bytes, startOffset, true)) {
-            handleTypeMismatchInput(0, ATypeTag.BIGINT, bytes, startOffset);
+            handleTypeMismatchInput(context, 0, ATypeTag.BIGINT, bytes, startOffset);
             PointableHelper.setNull(result);
             return;
         }
@@ -134,7 +130,7 @@
 
             // Type and value validity check
             if (!PointableHelper.isValidLongValue(bytes, startOffset, true)) {
-                handleTypeMismatchInput(i, ATypeTag.BIGINT, bytes, startOffset);
+                handleTypeMismatchInput(context, i, ATypeTag.BIGINT, bytes, startOffset);
                 PointableHelper.setNull(result);
                 return;
             }
@@ -149,10 +145,4 @@
         aInt64Serde.serialize(resultMutableInt64, resultStorage.getDataOutput());
         result.set(resultStorage);
     }
-
-    private void handleTypeMismatchInput(int inputPosition, ATypeTag expected, byte[] bytes, int startOffset) {
-        ATypeTag actual = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes[startOffset]);
-        context.getWarningCollector().warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.TYPE_MISMATCH_FUNCTION,
-                functionIdentifier, ExceptionUtil.indexToPosition(inputPosition), expected, actual));
-    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/AbstractBitSingleValueEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/AbstractBitSingleValueEvaluator.java
index d37d726..c21bbe0 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/AbstractBitSingleValueEvaluator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/AbstractBitSingleValueEvaluator.java
@@ -19,14 +19,10 @@
 
 package org.apache.asterix.runtime.evaluators.functions.bitwise;
 
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.WarningUtil;
 import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.EnumDeserializer;
 import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
 import org.apache.asterix.runtime.evaluators.functions.AbstractScalarEval;
 import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
-import org.apache.asterix.runtime.exceptions.ExceptionUtil;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
@@ -88,7 +84,7 @@
 
         // Validity check
         if (!PointableHelper.isValidLongValue(bytes, startOffset, true)) {
-            handleTypeMismatchInput(0, ATypeTag.BIGINT, bytes, startOffset);
+            handleTypeMismatchInput(context, 0, ATypeTag.BIGINT, bytes, startOffset);
             PointableHelper.setNull(result);
             return;
         }
@@ -98,10 +94,4 @@
 
         writeResult(result);
     }
-
-    private void handleTypeMismatchInput(int inputPosition, ATypeTag expected, byte[] bytes, int startOffset) {
-        ATypeTag actual = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes[startOffset]);
-        context.getWarningCollector().warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.TYPE_MISMATCH_FUNCTION,
-                functionIdentifier, ExceptionUtil.indexToPosition(inputPosition), expected, actual));
-    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/AbstractBitValuePositionEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/AbstractBitValuePositionEvaluator.java
index d1d3c93..d9d5382 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/AbstractBitValuePositionEvaluator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/AbstractBitValuePositionEvaluator.java
@@ -39,6 +39,7 @@
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
@@ -125,7 +126,7 @@
 
         // Type and value validity check
         if (!PointableHelper.isValidLongValue(valueBytes, valueStartOffset, true)) {
-            handleTypeMismatchInput(0, ATypeTag.BIGINT, valueBytes, valueStartOffset);
+            handleTypeMismatchInput(context, 0, ATypeTag.BIGINT, valueBytes, valueStartOffset);
             PointableHelper.setNull(result);
             return;
         }
@@ -138,7 +139,7 @@
 
         // Type validity check (for position argument, array is a valid type as well)
         if (!ATypeHierarchy.canPromote(positionTypeTag, ATypeTag.DOUBLE) && positionTypeTag != ATypeTag.ARRAY) {
-            handleTypeMismatchInput(1, secondArgumentExpectedTypes, positionBytes, positionStartOffset);
+            handleTypeMismatchInput(context, 1, secondArgumentExpectedTypes, positionBytes, positionStartOffset);
             PointableHelper.setNull(result);
             return;
         }
@@ -219,7 +220,7 @@
 
         // Value validity check
         if (!PointableHelper.isValidLongValue(bytes, startOffset, true)) {
-            handleTypeMismatchInput(1, ATypeTag.BIGINT, bytes, startOffset);
+            handleTypeMismatchInput(context, 1, ATypeTag.BIGINT, bytes, startOffset);
             return false;
         }
 
@@ -236,23 +237,11 @@
         return true;
     }
 
-    private void handleTypeMismatchInput(int inputPosition, ATypeTag expected, byte[] actualBytes,
-            int actualStartOffset) {
-        ATypeTag actual = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(actualBytes[actualStartOffset]);
-        context.getWarningCollector().warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.TYPE_MISMATCH_FUNCTION,
-                functionIdentifier, ExceptionUtil.indexToPosition(inputPosition), expected, actual));
-    }
-
-    private void handleTypeMismatchInput(int inputPosition, byte[] expected, byte[] bytes, int actualStartOffset) {
-        ATypeTag actual = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes[actualStartOffset]);
-        context.getWarningCollector()
-                .warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.TYPE_MISMATCH_FUNCTION, functionIdentifier,
-                        ExceptionUtil.indexToPosition(inputPosition), ExceptionUtil.toExpectedTypeString(expected),
-                        actual));
-    }
-
     private void handleOutOfRangeInput(int inputPosition, int startLimit, int endLimit, long actual) {
-        context.getWarningCollector().warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.VALUE_OUT_OF_RANGE,
-                functionIdentifier, ExceptionUtil.indexToPosition(inputPosition), startLimit, endLimit, actual));
+        IWarningCollector warningCollector = context.getWarningCollector();
+        if (warningCollector.shouldWarn()) {
+            warningCollector.warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.VALUE_OUT_OF_RANGE, functionIdentifier,
+                    ExceptionUtil.indexToPosition(inputPosition), startLimit, endLimit, actual));
+        }
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/BitValueCountFlagEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/BitValueCountFlagEvaluator.java
index cc3033f..fccefe8 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/BitValueCountFlagEvaluator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/BitValueCountFlagEvaluator.java
@@ -19,8 +19,6 @@
 
 package org.apache.asterix.runtime.evaluators.functions.bitwise;
 
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.WarningUtil;
 import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.om.base.AMutableInt64;
@@ -30,7 +28,6 @@
 import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
 import org.apache.asterix.runtime.evaluators.functions.AbstractScalarEval;
 import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
-import org.apache.asterix.runtime.exceptions.ExceptionUtil;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
@@ -118,7 +115,7 @@
 
         // Type and value validity check
         if (!PointableHelper.isValidLongValue(valueBytes, valueStartOffset, true)) {
-            handleTypeMismatchInput(0, ATypeTag.BIGINT, valueBytes, valueStartOffset);
+            handleTypeMismatchInput(context, 0, ATypeTag.BIGINT, valueBytes, valueStartOffset);
             PointableHelper.setNull(result);
             return;
         }
@@ -129,7 +126,7 @@
 
         // Type and Value validity check
         if (!PointableHelper.isValidLongValue(countBytes, countStartOffset, true)) {
-            handleTypeMismatchInput(1, ATypeTag.BIGINT, countBytes, countStartOffset);
+            handleTypeMismatchInput(context, 1, ATypeTag.BIGINT, countBytes, countStartOffset);
             PointableHelper.setNull(result);
             return;
         }
@@ -142,7 +139,7 @@
             ATypeTag flagTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(flagBytes[flagStartOffset]);
 
             if (flagTypeTag != ATypeTag.BOOLEAN) {
-                handleTypeMismatchInput(2, ATypeTag.BOOLEAN, flagBytes, flagStartOffset);
+                handleTypeMismatchInput(context, 2, ATypeTag.BOOLEAN, flagBytes, flagStartOffset);
                 PointableHelper.setNull(result);
                 return;
             }
@@ -178,10 +175,4 @@
         aInt64Serde.serialize(resultMutableInt64, resultStorage.getDataOutput());
         result.set(resultStorage);
     }
-
-    private void handleTypeMismatchInput(int inputPosition, ATypeTag expected, byte[] bytes, int startOffset) {
-        ATypeTag actual = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes[startOffset]);
-        context.getWarningCollector().warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.TYPE_MISMATCH_FUNCTION,
-                functionIdentifier, ExceptionUtil.indexToPosition(inputPosition), expected, actual));
-    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/BitValuePositionFlagEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/BitValuePositionFlagEvaluator.java
index aee037d..7e5045e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/BitValuePositionFlagEvaluator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/bitwise/BitValuePositionFlagEvaluator.java
@@ -40,6 +40,7 @@
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
@@ -147,7 +148,7 @@
 
         // Type and value validity check
         if (!PointableHelper.isValidLongValue(valueBytes, valueStartOffset, true)) {
-            handleTypeMismatchInput(0, ATypeTag.BIGINT, valueBytes, valueStartOffset);
+            handleTypeMismatchInput(context, 0, ATypeTag.BIGINT, valueBytes, valueStartOffset);
             PointableHelper.setNull(result);
             return;
         }
@@ -160,7 +161,7 @@
 
         // Type validity check (for position argument, array is a valid type as well)
         if (!ATypeHierarchy.canPromote(positionTypeTag, ATypeTag.DOUBLE) && positionTypeTag != ATypeTag.ARRAY) {
-            handleTypeMismatchInput(1, secondArgumentExpectedTypes, positionBytes, positionStartOffset);
+            handleTypeMismatchInput(context, 1, secondArgumentExpectedTypes, positionBytes, positionStartOffset);
             PointableHelper.setNull(result);
             return;
         }
@@ -174,7 +175,7 @@
             ATypeTag flagTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(flagBytes[flagStartOffset]);
 
             if (flagTypeTag != ATypeTag.BOOLEAN) {
-                handleTypeMismatchInput(2, ATypeTag.BOOLEAN, flagBytes, flagStartOffset);
+                handleTypeMismatchInput(context, 2, ATypeTag.BOOLEAN, flagBytes, flagStartOffset);
                 PointableHelper.setNull(result);
                 return;
             }
@@ -267,7 +268,7 @@
 
         // Value validity check
         if (!PointableHelper.isValidLongValue(bytes, startOffset, true)) {
-            handleTypeMismatchInput(1, ATypeTag.BIGINT, bytes, startOffset);
+            handleTypeMismatchInput(context, 1, ATypeTag.BIGINT, bytes, startOffset);
             return false;
         }
 
@@ -287,23 +288,11 @@
         return true;
     }
 
-    private void handleTypeMismatchInput(int inputPosition, ATypeTag expected, byte[] actualBytes,
-            int actualStartOffset) {
-        ATypeTag actual = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(actualBytes[actualStartOffset]);
-        context.getWarningCollector().warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.TYPE_MISMATCH_FUNCTION,
-                functionIdentifier, ExceptionUtil.indexToPosition(inputPosition), expected, actual));
-    }
-
-    private void handleTypeMismatchInput(int inputPosition, byte[] expected, byte[] bytes, int actualStartOffset) {
-        ATypeTag actual = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes[actualStartOffset]);
-        context.getWarningCollector()
-                .warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.TYPE_MISMATCH_FUNCTION, functionIdentifier,
-                        ExceptionUtil.indexToPosition(inputPosition), ExceptionUtil.toExpectedTypeString(expected),
-                        actual));
-    }
-
     private void handleOutOfRangeInput(int inputPosition, int startLimit, int endLimit, long actual) {
-        context.getWarningCollector().warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.VALUE_OUT_OF_RANGE,
-                functionIdentifier, ExceptionUtil.indexToPosition(inputPosition), startLimit, endLimit, actual));
+        IWarningCollector warningCollector = context.getWarningCollector();
+        if (warningCollector.shouldWarn()) {
+            warningCollector.warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.VALUE_OUT_OF_RANGE, functionIdentifier,
+                    ExceptionUtil.indexToPosition(inputPosition), startLimit, endLimit, actual));
+        }
     }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 6aed2d6..5c5f9a9 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -104,7 +104,7 @@
                                 normalizedKeyComputerFactoryProvider, expressionRuntimeProvider, expressionTypeComputer,
                                 oc, expressionEvalSizeComputer, partialAggregationTypeComputer,
                                 predEvaluatorFactoryProvider, physicalOptimizationConfig.getFrameSize(),
-                                clusterLocations);
+                                clusterLocations, physicalOptimizationConfig.getRuntimeWarningsLimit());
 
                         PlanCompiler pc = new PlanCompiler(context);
                         return pc.compilePlan(plan, jobEventListenerFactory);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
index f46fcaa..d2d5ae1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -70,6 +70,7 @@
     private AlgebricksAbsolutePartitionConstraint clusterLocations;
     private int varCounter;
     private final ITypingContext typingContext;
+    private final long runtimeWarningsLimit;
 
     public JobGenContext(IOperatorSchema outerFlowSchema, IMetadataProvider<?, ?> metadataProvider, Object appContext,
             ISerializerDeserializerProvider serializerDeserializerProvider,
@@ -84,7 +85,7 @@
             ITypingContext typingContext, IExpressionEvalSizeComputer expressionEvalSizeComputer,
             IPartialAggregationTypeComputer partialAggregationTypeComputer,
             IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider, int frameSize,
-            AlgebricksAbsolutePartitionConstraint clusterLocations) {
+            AlgebricksAbsolutePartitionConstraint clusterLocations, long runtimeWarningsLimit) {
         this.outerFlowSchema = outerFlowSchema;
         this.metadataProvider = metadataProvider;
         this.appContext = appContext;
@@ -107,6 +108,7 @@
         this.predEvaluatorFactoryProvider = predEvaluatorFactoryProvider;
         this.frameSize = frameSize;
         this.varCounter = 0;
+        this.runtimeWarningsLimit = runtimeWarningsLimit;
     }
 
     public IOperatorSchema getOuterFlowSchema() {
@@ -207,4 +209,7 @@
         return typingContext.getOutputTypeEnvironment(op);
     }
 
+    public long getRuntimeWarningsLimit() {
+        return runtimeWarningsLimit;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index ddda258..2b2428c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -37,8 +37,7 @@
 
 public class PlanCompiler {
     private JobGenContext context;
-    private Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> operatorVisitedToParents =
-            new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();
+    private Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> operatorVisitedToParents = new HashMap<>();
 
     public PlanCompiler(JobGenContext context) {
         this.context = context;
@@ -61,6 +60,7 @@
     private JobSpecification compilePlanImpl(ILogicalPlan plan, boolean isNestedPlan, IOperatorSchema outerPlanSchema,
             IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
         JobSpecification spec = new JobSpecification(context.getFrameSize());
+        spec.setRuntimeWarningsLimit(context.getRuntimeWarningsLimit());
         if (jobEventListenerFactory != null) {
             spec.setJobletEventListenerFactory(jobEventListenerFactory);
         }
@@ -97,8 +97,9 @@
                 compileOpRef(opChild, spec, builder, outerPlanSchema);
                 schemas[i++] = context.getSchema(opChild.getValue());
             } else {
-                if (!parents.contains(opRef))
+                if (!parents.contains(opRef)) {
                     parents.add(opRef);
+                }
                 schemas[i++] = context.getSchema(opChild.getValue());
                 continue;
             }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index f9ea0c4..f72da84 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -39,6 +39,7 @@
     private static final String DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE = "DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE";
     private static final String SORT_PARALLEL = "SORT_PARALLEL";
     private static final String SORT_SAMPLES = "SORT_SAMPLES";
+    private static final String RUNTIME_WARNINGS_LIMIT = "RUNTIME_WARNINGS_LIMIT";
 
     private Properties properties = new Properties();
 
@@ -172,16 +173,30 @@
         setInt(SORT_SAMPLES, sortSamples);
     }
 
+    public long getRuntimeWarningsLimit() {
+        return getLong(RUNTIME_WARNINGS_LIMIT, 0);
+    }
+
+    public void setRuntimeWarningsLimit(long runtimeWarningsLimit) {
+        setLong(RUNTIME_WARNINGS_LIMIT, runtimeWarningsLimit);
+    }
+
     private void setInt(String property, int value) {
         properties.setProperty(property, Integer.toString(value));
     }
 
     private int getInt(String property, int defaultValue) {
         String value = properties.getProperty(property);
-        if (value == null)
-            return defaultValue;
-        else
-            return Integer.parseInt(value);
+        return value == null ? defaultValue : Integer.parseInt(value);
+    }
+
+    private void setLong(String property, long value) {
+        properties.setProperty(property, Long.toString(value));
+    }
+
+    private long getLong(String property, long defaultValue) {
+        String value = properties.getProperty(property);
+        return value == null ? defaultValue : Long.parseLong(value);
     }
 
     private void setDouble(String property, double value) {
@@ -190,10 +205,7 @@
 
     private double getDouble(String property, double defaultValue) {
         String value = properties.getProperty(property);
-        if (value == null)
-            return defaultValue;
-        else
-            return Double.parseDouble(value);
+        return value == null ? defaultValue : Double.parseDouble(value);
     }
 
     private void setBoolean(String property, boolean value) {
@@ -202,10 +214,6 @@
 
     private boolean getBoolean(String property, boolean defaultValue) {
         String value = properties.getProperty(property);
-        if (value == null) {
-            return defaultValue;
-        } else {
-            return Boolean.parseBoolean(value);
-        }
+        return value == null ? defaultValue : Boolean.parseBoolean(value);
     }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 4795233..42f9aba 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -122,6 +122,7 @@
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.algebricks.rewriter.util.JoinUtils;
 import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.Warning;
 
 public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule {
@@ -220,8 +221,11 @@
                     if (extGby != null) {
                         return extGby;
                     } else if (gby.getSourceLocation() != null) {
-                        context.getWarningCollector().warn(Warning.forHyracks(gby.getSourceLocation(),
-                                ErrorCode.INAPPLICABLE_HINT, "Group By", "hash"));
+                        IWarningCollector warningCollector = context.getWarningCollector();
+                        if (warningCollector.shouldWarn()) {
+                            warningCollector.warn(Warning.forHyracks(gby.getSourceLocation(),
+                                    ErrorCode.INAPPLICABLE_HINT, "Group By", "hash"));
+                        }
                     }
                 }
             }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index 34ce4f2..7dfeeb5 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -48,6 +48,7 @@
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
 import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.exceptions.Warning;
 
@@ -243,7 +244,10 @@
     private static void warnIfCrossProduct(ILogicalExpression conditionExpr, SourceLocation sourceLoc,
             IOptimizationContext context) {
         if (OperatorPropertiesUtil.isAlwaysTrueCond(conditionExpr) && sourceLoc != null) {
-            context.getWarningCollector().warn(Warning.forHyracks(sourceLoc, ErrorCode.CROSS_PRODUCT_JOIN));
+            IWarningCollector warningCollector = context.getWarningCollector();
+            if (warningCollector.shouldWarn()) {
+                warningCollector.warn(Warning.forHyracks(sourceLoc, ErrorCode.CROSS_PRODUCT_JOIN));
+            }
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index ddf0ce8..d71ac57 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -72,6 +72,7 @@
         final ActivityClusterGraph acg = acgb.inferActivityClusters(jag);
         acg.setFrameSize(spec.getFrameSize());
         acg.setMaxReattempts(spec.getMaxReattempts());
+        acg.setRuntimeWarningsLimit(spec.getRuntimeWarningsLimit());
         acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
         acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
         acg.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/IWarningCollector.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/IWarningCollector.java
index 4c42dd2..467406e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/IWarningCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/IWarningCollector.java
@@ -19,11 +19,20 @@
 
 package org.apache.hyracks.api.exceptions;
 
-@FunctionalInterface
 public interface IWarningCollector {
     /**
      * Adds a warning
      * @param warning
      */
     void warn(Warning warning);
+
+    /**
+     * @return {@code true} to indicate that the user can issue a warning through {@link #warn(Warning)} method.
+     */
+    boolean shouldWarn();
+
+    /**
+     * @return Total warnings count (reported and unreported ones).
+     */
+    long getTotalWarningsCount();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
index 9f23bf0..e4e4ff7 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
@@ -46,6 +46,8 @@
 
     private int frameSize;
 
+    private long runtimeWarningsLimit;
+
     private int maxReattempts;
 
     private IJobletEventListenerFactory jobletEventListenerFactory;
@@ -104,6 +106,14 @@
         return frameSize;
     }
 
+    public void setRuntimeWarningsLimit(long runtimeWarningsLimit) {
+        this.runtimeWarningsLimit = runtimeWarningsLimit;
+    }
+
+    public long getRuntimeWarningsLimit() {
+        return runtimeWarningsLimit;
+    }
+
     public void setMaxReattempts(int maxReattempts) {
         this.maxReattempts = maxReattempts;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 3b4bcb8..cd8277a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -77,6 +77,8 @@
 
     private int maxReattempts;
 
+    private long runtimeWarningsLimit;
+
     private IJobletEventListenerFactory jobletEventListenerFactory;
 
     private IGlobalJobDataFactory globalJobDataFactory;
@@ -260,6 +262,14 @@
         return frameSize;
     }
 
+    public void setRuntimeWarningsLimit(long runtimeWarningsLimit) {
+        this.runtimeWarningsLimit = runtimeWarningsLimit;
+    }
+
+    public long getRuntimeWarningsLimit() {
+        return runtimeWarningsLimit;
+    }
+
     public void setMaxReattempts(int maxReattempts) {
         this.maxReattempts = maxReattempts;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
index 7088e08..0a378b0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
@@ -362,6 +362,28 @@
         }
     };
 
+    public static final IOptionType<Long> UNSIGNED_LONG = new IOptionType<Long>() {
+        @Override
+        public Long parse(String s) {
+            return Long.parseUnsignedLong(s);
+        }
+
+        @Override
+        public Long parse(JsonNode node) {
+            return node.isNull() ? null : parse(node.asText());
+        }
+
+        @Override
+        public Class<Long> targetType() {
+            return Long.class;
+        }
+
+        @Override
+        public void serializeJSONField(String fieldName, Object value, ObjectNode node) {
+            node.put(fieldName, (long) value);
+        }
+    };
+
     public static final IOptionType<Integer> POSITIVE_INTEGER = new IOptionType<Integer>() {
         @Override
         public Integer parse(String s) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
index 2a20624..6dfb71d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -49,6 +49,8 @@
 
     private Set<Warning> warnings;
 
+    private long totalWarningsCount;
+
     public static TaskProfile create(DataInput dis) throws IOException {
         TaskProfile taskProfile = new TaskProfile();
         taskProfile.readFields(dis);
@@ -60,11 +62,12 @@
     }
 
     public TaskProfile(TaskAttemptId taskAttemptId, Map<PartitionId, PartitionProfile> partitionSendProfile,
-            IStatsCollector statsCollector, Set<Warning> warnings) {
+            IStatsCollector statsCollector, Set<Warning> warnings, long totalWarningsCount) {
         this.taskAttemptId = taskAttemptId;
         this.partitionSendProfile = new HashMap<>(partitionSendProfile);
         this.statsCollector = statsCollector;
         this.warnings = warnings;
+        this.totalWarningsCount = totalWarningsCount;
     }
 
     public TaskAttemptId getTaskId() {
@@ -125,6 +128,10 @@
         return warnings;
     }
 
+    public long getTotalWarningsCount() {
+        return totalWarningsCount;
+    }
+
     @Override
     public void readFields(DataInput input) throws IOException {
         super.readFields(input);
@@ -139,6 +146,7 @@
         statsCollector = StatsCollector.create(input);
         warnings = new HashSet<>();
         deserializeWarnings(input, warnings);
+        totalWarningsCount = input.readLong();
     }
 
     @Override
@@ -152,6 +160,7 @@
         }
         statsCollector.writeFields(output);
         serializeWarnings(output);
+        output.writeLong(totalWarningsCount);
     }
 
     private void serializeWarnings(DataOutput output) throws IOException {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 8f02f2d..8afdc9e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -105,6 +105,8 @@
 
     private final long jobStartTime;
 
+    private final long runtimeWarningsLimit;
+
     public Joblet(NodeControllerService nodeController, DeploymentId deploymentId, JobId jobId,
             INCServiceContext serviceCtx, ActivityClusterGraph acg,
             IJobletEventListenerFactory jobletEventListenerFactory, long jobStartTime) {
@@ -134,6 +136,7 @@
         IGlobalJobDataFactory gjdf = acg.getGlobalJobDataFactory();
         globalJobData = gjdf != null ? gjdf.createGlobalJobData(this) : null;
         this.jobStartTime = jobStartTime;
+        this.runtimeWarningsLimit = acg.getRuntimeWarningsLimit();
     }
 
     @Override
@@ -206,7 +209,8 @@
         counterMap.forEach((key, value) -> counters.put(key, value.get()));
         for (Task task : taskMap.values()) {
             TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(),
-                    new Hashtable<>(task.getPartitionSendProfile()), new StatsCollector(), task.getWarnings());
+                    new Hashtable<>(task.getPartitionSendProfile()), new StatsCollector(), task.getWarnings(),
+                    task.getWarningCollector().getTotalWarningsCount());
             task.dumpProfile(taskProfile);
             jProfile.getTaskProfiles().put(task.getTaskAttemptId(), taskProfile);
         }
@@ -263,6 +267,10 @@
         return frameManager.getInitialFrameSize();
     }
 
+    public final long getRuntimeWarningsLimit() {
+        return runtimeWarningsLimit;
+    }
+
     public IIOManager getIOManager() {
         return serviceCtx.getIoManager();
     }
@@ -320,6 +328,7 @@
         }
     }
 
+    @SuppressWarnings("squid:S1166") // Either log or rethrow this exception
     private void performCleanup() {
         nodeController.getJobletMap().remove(jobId);
         IJobletEventListener listener = getJobletEventListener();
@@ -331,7 +340,7 @@
         try {
             nodeController.getClusterController(jobId.getCcId()).notifyJobletCleanup(jobId, nodeController.getId());
         } catch (Exception e) {
-            e.printStackTrace();
+            LOGGER.info(e);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 06f3aa9..81b9d5e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -34,6 +34,7 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hyracks.api.comm.IFrameReader;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -149,7 +150,7 @@
         this.inputChannelsFromConnectors = inputChannelsFromConnectors;
         statsCollector = new StatsCollector();
         warnings = ConcurrentHashMap.newKeySet();
-        warningCollector = warnings::add;
+        warningCollector = createWarningCollector(joblet.getRuntimeWarningsLimit());
     }
 
     public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
@@ -530,6 +531,29 @@
         return warnings;
     }
 
+    private IWarningCollector createWarningCollector(long warningsLimit) {
+        return new IWarningCollector() {
+
+            private final AtomicLong warningsCount = new AtomicLong();
+
+            @Override
+            public void warn(Warning warning) {
+                warnings.add(warning);
+            }
+
+            @Override
+            public boolean shouldWarn() {
+                long currentCount = warningsCount.getAndUpdate(count -> count < Long.MAX_VALUE ? count + 1 : count);
+                return currentCount < warningsLimit;
+            }
+
+            @Override
+            public long getTotalWarningsCount() {
+                return warningsCount.get();
+            }
+        };
+    }
+
     @Override
     public String toString() {
         return "{ \"class\" : \"" + getClass().getSimpleName() + "\", \"node\" : \"" + ncs.getId() + "\" \"jobId\" : \""
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
index b8c5030..0545139 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -39,7 +39,7 @@
     @Override
     public void run() {
         TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile(),
-                task.getStatsCollector(), task.getWarnings());
+                task.getStatsCollector(), task.getWarnings(), task.getWarningCollector().getTotalWarningsCount());
         try {
             ncs.getClusterController(task.getJobletContext().getJobId().getCcId()).notifyTaskComplete(
                     task.getJobletContext().getJobId(), task.getTaskAttemptId(), ncs.getId(), taskProfile);
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index 420f7e6..e44480c 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -38,6 +38,7 @@
 import org.apache.hyracks.api.dataflow.TaskId;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.util.CleanupUtils;
@@ -50,7 +51,21 @@
 
 public class TestUtils {
 
-    public static final IWarningCollector NOOP_WARNING_COLLECTOR = w -> {
+    public static final IWarningCollector NOOP_WARNING_COLLECTOR = new IWarningCollector() {
+        @Override
+        public void warn(Warning warning) {
+            // no-op
+        }
+
+        @Override
+        public boolean shouldWarn() {
+            return false;
+        }
+
+        @Override
+        public long getTotalWarningsCount() {
+            return 0;
+        }
     };
 
     public static IHyracksTaskContext create(int frameSize) {