[NO ISSUE][TEST] Introduce dynamic expected result poll test type

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- This change allows for a new type of tests. The new test can
  poll for both expected results and a query and compare the two
  values.
- A test case is added.

Change-Id: Ifc132b2d2286eea1d1e119984c33ca5eef9be92a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2438
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IExpectedResultPoller.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IExpectedResultPoller.java
new file mode 100644
index 0000000..ce2760d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IExpectedResultPoller.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.common;
+
+@FunctionalInterface
+public interface IExpectedResultPoller {
+    /**
+     * @return the expected result as a string
+     */
+    String poll() throws Exception;
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IPollTask.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IPollTask.java
new file mode 100644
index 0000000..6d32518
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IPollTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.common;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.context.TestFileContext;
+import org.apache.asterix.testframework.xml.TestCase.CompilationUnit;
+import org.apache.commons.lang3.mutable.MutableInt;
+
+public interface IPollTask {
+
+    /**
+     * Execute the poll task
+     *
+     * @param testCaseCtx
+     * @param ctx
+     * @param variableCtx
+     * @param statement
+     * @param isDmlRecoveryTest
+     * @param pb
+     * @param cUnit
+     * @param queryCount
+     * @param expectedResultFileCtxs
+     * @param testFile
+     * @param actualPath
+     */
+    void execute(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx, String statement,
+            boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount,
+            List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath) throws Exception;
+
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index e39a9f3..48603a5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -50,6 +50,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
@@ -129,6 +130,12 @@
     public static final Set<String> NON_CANCELLABLE =
             Collections.unmodifiableSet(new HashSet<>(Arrays.asList("store", "validate")));
 
+    private final IPollTask plainExecutor = (testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit,
+            queryCount, expectedResultFileCtxs, testFile, actualPath) -> {
+        executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
+                expectedResultFileCtxs, testFile, actualPath);
+    };
+
     public static final String DELIVERY_ASYNC = "async";
     public static final String DELIVERY_DEFERRED = "deferred";
     public static final String DELIVERY_IMMEDIATE = "immediate";
@@ -886,6 +893,11 @@
             case "pollget":
             case "pollquery":
                 poll(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
+                        expectedResultFileCtxs, testFile, actualPath, ctx.getType().substring("poll".length()),
+                        plainExecutor);
+                break;
+            case "polldynamic":
+                polldynamic(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
                         expectedResultFileCtxs, testFile, actualPath);
                 break;
             case "query":
@@ -923,16 +935,8 @@
                 break;
             case "validate":
                 // This is a query that validates the output against a previously executed query
-                key = getKey(statement);
-                expectedResultFile = (File) variableCtx.remove(key);
-                if (expectedResultFile == null) {
-                    throw new IllegalStateException("There is no stored result with the key: " + key);
-                }
-                actualResultFile = new File(actualPath, testCaseCtx.getTestCase().getFilePath() + File.separatorChar
-                        + cUnit.getName() + '.' + ctx.getSeqNum() + ".adm");
-                executeQuery(OutputFormat.forCompilationUnit(cUnit), statement, variableCtx, ctx.getType(), testFile,
-                        expectedResultFile, actualResultFile, queryCount, expectedResultFileCtxs.size(),
-                        cUnit.getParameter(), ComparisonEnum.TEXT);
+                validate(actualPath, testCaseCtx, cUnit, statement, variableCtx, testFile, ctx, queryCount,
+                        expectedResultFileCtxs);
                 break;
             case "txnqbc": // qbc represents query before crash
                 InputStream resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
@@ -1158,6 +1162,21 @@
         }
     }
 
+    private void validate(String actualPath, TestCaseContext testCaseCtx, CompilationUnit cUnit, String statement,
+            Map<String, Object> variableCtx, File testFile, TestFileContext ctx, MutableInt queryCount,
+            List<TestFileContext> expectedResultFileCtxs) throws Exception {
+        String key = getKey(statement);
+        File expectedResultFile = (File) variableCtx.remove(key);
+        if (expectedResultFile == null) {
+            throw new IllegalStateException("There is no stored result with the key: " + key);
+        }
+        File actualResultFile = new File(actualPath, testCaseCtx.getTestCase().getFilePath() + File.separatorChar
+                + cUnit.getName() + '.' + ctx.getSeqNum() + ".adm");
+        executeQuery(OutputFormat.forCompilationUnit(cUnit), statement, variableCtx, "validate", testFile,
+                expectedResultFile, actualResultFile, queryCount, expectedResultFileCtxs.size(), cUnit.getParameter(),
+                ComparisonEnum.TEXT);
+    }
+
     protected void executeHttpRequest(OutputFormat fmt, String statement, Map<String, Object> variableCtx,
             String reqType, File testFile, File expectedResultFile, File actualResultFile, MutableInt queryCount,
             int numResultFiles, String extension, ComparisonEnum compare) throws Exception {
@@ -1259,29 +1278,88 @@
         actualResultFile.getParentFile().delete();
     }
 
-    private void poll(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
+    private void polldynamic(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
             String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit,
             MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath)
             throws Exception {
+        IExpectedResultPoller poller = getExpectedResultPoller(statement);
+        final String key = getKey(statement);
+        poll(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount, expectedResultFileCtxs,
+                testFile, actualPath, "validate", new IPollTask() {
+                    @Override
+                    public void execute(TestCaseContext testCaseCtx, TestFileContext ctx,
+                            Map<String, Object> variableCtx, String statement, boolean isDmlRecoveryTest,
+                            ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount,
+                            List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath)
+                            throws Exception {
+                        File actualResultFile = new File(actualPath, testCaseCtx.getTestCase().getFilePath()
+                                + File.separatorChar + cUnit.getName() + '.' + ctx.getSeqNum() + ".polled.adm");
+                        if (actualResultFile.exists() && !actualResultFile.delete()) {
+                            throw new Exception(
+                                    "Failed to delete an existing result file: " + actualResultFile.getAbsolutePath());
+                        }
+                        writeOutputToFile(actualResultFile,
+                                new ByteArrayInputStream(poller.poll().getBytes(StandardCharsets.UTF_8)));
+                        variableCtx.put(key, actualResultFile);
+                        validate(actualPath, testCaseCtx, cUnit, statement, variableCtx, testFile, ctx, queryCount,
+                                expectedResultFileCtxs);
+                    }
+                });
+    }
+
+    protected IExpectedResultPoller getExpectedResultPoller(String statement) {
+        String key = "poller=";
+        String value = null;
+        String[] lines = statement.split("\n");
+        for (String line : lines) {
+            if (line.contains(key)) {
+                value = line.substring(line.indexOf(key) + key.length()).trim();
+            }
+        }
+        if (value == null) {
+            throw new IllegalArgumentException("ERROR: poller=<...> must be present in poll-dynamic file");
+        }
+        String staticPoller = "static:";
+        if (value.startsWith(staticPoller)) {
+            String polled = value.substring(staticPoller.length());
+            return () -> polled;
+        }
+        throw new IllegalArgumentException("ERROR: unknown poller: " + value);
+    }
+
+    private void poll(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
+            String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit,
+            MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath,
+            String newType, IPollTask pollTask) throws Exception {
         // polltimeoutsecs=nnn, polldelaysecs=nnn
         int timeoutSecs = getTimeoutSecs(statement);
         int retryDelaySecs = getRetryDelaySecs(statement);
         long startTime = System.currentTimeMillis();
         long limitTime = startTime + TimeUnit.SECONDS.toMillis(timeoutSecs);
-        ctx.setType(ctx.getType().substring("poll".length()));
+        Semaphore endSemaphore = new Semaphore(1);
+        final ExecutorService executorService = Executors.newSingleThreadExecutor();
+        String originalType = ctx.getType();
+        ctx.setType(newType);
         try {
             boolean expectedException = false;
             Exception finalException = null;
             LOGGER.debug("polling for up to " + timeoutSecs + " seconds w/ " + retryDelaySecs + " second(s) delay");
             int responsesReceived = 0;
-            final ExecutorService executorService = Executors.newSingleThreadExecutor();
             while (true) {
                 try {
+                    endSemaphore.acquire();
+                    Semaphore startSemaphore = new Semaphore(0);
                     Future<Void> execution = executorService.submit(() -> {
-                        executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit,
-                                queryCount, expectedResultFileCtxs, testFile, actualPath);
+                        try {
+                            startSemaphore.release();
+                            pollTask.execute(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit,
+                                    queryCount, expectedResultFileCtxs, testFile, actualPath);
+                        } finally {
+                            endSemaphore.release();
+                        }
                         return null;
                     });
+                    startSemaphore.acquire();
                     execution.get(limitTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                     responsesReceived++;
                     finalException = null;
@@ -1334,11 +1412,13 @@
                 throw new Exception("Poll limit (" + timeoutSecs + "s) exceeded without obtaining expected result",
                         finalException);
             }
-
         } finally {
-            ctx.setType("poll" + ctx.getType());
+            executorService.shutdownNow();
+            // ensure no leftover task is running. This avoids re-polling due to
+            // resetting the ctx type to poll while the last attempt is being made
+            endSemaphore.acquire();
+            ctx.setType(originalType);
         }
-
     }
 
     public InputStream executeSqlppUpdateOrDdl(String statement, OutputFormat outputFormat) throws Exception {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/poll-dynamic/poll-dynamic.1.polldynamic.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/poll-dynamic/poll-dynamic.1.polldynamic.sqlpp
new file mode 100644
index 0000000..e8dc9e8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/poll-dynamic/poll-dynamic.1.polldynamic.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+-- key=aKey
+-- poller=static:{ "one": 1 }
+-- polltimeoutsecs=120
+select 1 as one;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 09fc832..dd1ae0c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -3538,6 +3538,11 @@
   </test-group>
   <test-group name="misc">
     <test-case FilePath="misc">
+      <compilation-unit name="poll-dynamic">
+        <output-dir compare="Text">poll-dynamic</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="misc">
       <compilation-unit name="validate-expected">
         <output-dir compare="Text">validate-expected</output-dir>
       </compilation-unit>