[ASTERIXDB-3391][OTH] Make DML statements cancellable

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
With this patch INSERT/UPSERT/DELETE and COPY statements are marked
cancellable. For atomic statements, the statements are cancelled as
long as the ingestion job is running i.e., ACKs from all nodes/partitions
are not received and the CC has not decided to commit the statement yet.

Change-Id: I16410ab9353c24597f77ab38ce06996fc5dfacd0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18257
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index 0436ea8..84a6488 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -54,9 +54,9 @@
         if (complete) {
             return;
         }
-        complete();
-        state = State.CANCELLED;
         if (cancellable) {
+            complete();
+            state = State.CANCELLED;
             doCancel(appCtx);
         }
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index 0786895..9d1e108 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -215,6 +215,7 @@
     @Override
     public void abortTransaction(JobId jobId) throws Exception {
         IGlobalTransactionContext context = getTransactionContext(jobId);
+        context.resetAcksReceived();
         if (context.getTxnStatus() == TransactionStatus.PREPARED) {
             sendJobRollbackMessages(context);
         }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 34648dd..3035496 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4009,10 +4009,12 @@
                 final ClientRequest clientRequest =
                         (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
                 clientRequest.setJobId(jobId);
+                clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
                     Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
                     hcc.waitForCompletion(jobId);
+                    ensureNotCancelled(clientRequest);
                 } finally {
                     Thread.currentThread().setName(nameBefore);
                 }
@@ -4151,7 +4153,7 @@
         ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqParams.getRequestReference().getUuid());
         if (stmtInsertUpsert.getReturnExpression() != null) {
             deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
-                    reqParams, false, stmt, clientRequest);
+                    reqParams, true, stmt, clientRequest);
         } else {
             locker.lock();
             JobId jobId = null;
@@ -4176,10 +4178,12 @@
                 }
                 jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
                 clientRequest.setJobId(jobId);
+                clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
                     Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
                     hcc.waitForCompletion(jobId);
+                    ensureNotCancelled(clientRequest);
                 } finally {
                     Thread.currentThread().setName(nameBefore);
                 }
@@ -4246,10 +4250,12 @@
                 final ClientRequest clientRequest =
                         (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
                 clientRequest.setJobId(jobId);
+                clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
                     Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
                     hcc.waitForCompletion(jobId);
+                    ensureNotCancelled(clientRequest);
                 } finally {
                     Thread.currentThread().setName(nameBefore);
                 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsCancellationTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsCancellationTest.java
new file mode 100644
index 0000000..1cb87d8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsCancellationTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.atomic_statements;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.asterix.api.http.server.QueryServiceRequestParameters.Parameter.CLIENT_ID;
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+
+import java.io.File;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.api.common.LocalCloudUtil;
+import org.apache.asterix.common.TestDataUtil;
+import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.ParameterTypeEnum;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.http.HttpResponse;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AtomicStatementsCancellationTest {
+    private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+    private static final String TEST_CONFIG_FILE_NAME = "cc-cloud-storage.conf";
+    private static final TestExecutor TEST_EXECUTOR = new TestExecutor();
+
+    private static final String TEST_CONFIG_PATH =
+            joinPath(System.getProperty("user.dir"), "src", "test", "resources");;
+    private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME;
+    private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
+    private static final String DATASET_NAME = "ds_0";
+    private static final int BATCH_SIZE = 20000;
+    private static final int NUM_UPSERTS = 100;
+
+    @Before
+    public void setUp() throws Exception {
+        boolean cleanStart = true;
+        LocalCloudUtil.startS3CloudEnvironment(cleanStart);
+        integrationUtil.setGracefulShutdown(true);
+        integrationUtil.init(true, TEST_CONFIG_FILE_PATH);
+        createDatasets();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        integrationUtil.deinit(true);
+    }
+
+    private void createDatasets() throws Exception {
+        TestDataUtil.createDatasetWithoutType(DATASET_NAME, Map.of("id", "uuid"), true);
+        TestDataUtil.createSecondaryBTreeIndex(DATASET_NAME, DATASET_NAME + "_sidx", "name:string");
+    }
+
+    public String generateInsertStatement(String dataset, long count) throws Exception {
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < count; i++) {
+            stringBuilder.append("{\"name\": \"name_" + i + "\"},");
+        }
+        stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+        return "INSERT INTO " + dataset + "([" + stringBuilder + "]);";
+    }
+
+    private int cancelQuery(URI uri, List<TestCase.CompilationUnit.Parameter> params) throws Exception {
+        HttpResponse response = TEST_EXECUTOR.executeHttpRequest(TestExecutor.constructDeleteMethod(uri, params));
+        return response.getStatusLine().getStatusCode();
+    }
+
+    @Test
+    public void testAtomicityWithCancellation() throws Exception {
+        Random rnd = new Random();
+        for (int j = 0; j < NUM_UPSERTS; j++) {
+            String clientContextId = UUID.randomUUID().toString();
+            final List<TestCase.CompilationUnit.Parameter> params = new ArrayList<>();
+            TestCase.CompilationUnit.Parameter newParam = new TestCase.CompilationUnit.Parameter();
+            newParam.setName(CLIENT_ID.str());
+            newParam.setType(ParameterTypeEnum.STRING);
+            newParam.setValue(clientContextId);
+            params.add(newParam);
+            String statement = generateInsertStatement(DATASET_NAME, BATCH_SIZE);
+            Callable<InputStream> upsert = () -> {
+                try {
+                    return TEST_EXECUTOR.executeQueryService(statement, TestCaseContext.OutputFormat.CLEAN_JSON,
+                            TEST_EXECUTOR.getEndpoint(Servlets.QUERY_SERVICE), params, false, UTF_8);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    throw e;
+                }
+            };
+            Future<InputStream> future = executor.submit(upsert);
+            if (!future.isDone()) {
+                Thread.sleep(rnd.nextInt(900) + 800);
+                // Cancels the query request while the query is executing.
+                int rc = cancelQuery(TEST_EXECUTOR.getEndpoint(Servlets.RUNNING_REQUESTS), params);
+                Assert.assertTrue(rc == 200 || rc == 404 || rc == 403);
+            }
+            while (!future.isDone()) {
+                Thread.sleep(100);
+            }
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index e6b4662..f95111e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -692,7 +692,7 @@
         return checkResponse(executeBasicAuthHttpRequest(method, credentials), responseCodeValidator);
     }
 
-    protected HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception {
+    public HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception {
         // https://issues.apache.org/jira/browse/ASTERIXDB-2315
         ExecutorService executor = Executors.newSingleThreadExecutor();
         CloseableHttpClient client = HttpClients.custom().addInterceptorFirst(new PreemptiveAuthInterceptor())
@@ -933,7 +933,7 @@
         return false;
     }
 
-    protected List<Parameter> upsertParam(List<Parameter> params, String name, ParameterTypeEnum type, String value) {
+    public List<Parameter> upsertParam(List<Parameter> params, String name, ParameterTypeEnum type, String value) {
         boolean replaced = false;
         List<Parameter> result = new ArrayList<>();
         for (Parameter param : params) {
@@ -2983,7 +2983,7 @@
     }
 
     // adapted from https://stackoverflow.com/questions/2014700/preemptive-basic-authentication-with-apache-httpclient-4
-    static class PreemptiveAuthInterceptor implements HttpRequestInterceptor {
+    public static class PreemptiveAuthInterceptor implements HttpRequestInterceptor {
 
         public void process(final HttpRequest request, final HttpContext context) throws HttpException, IOException {
             AuthState authState = (AuthState) context.getAttribute(HttpClientContext.TARGET_AUTH_STATE);