[ASTERIXDB-3391][OTH] Make DML statements cancellable
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
With this patch INSERT/UPSERT/DELETE and COPY statements are marked
cancellable. For atomic statements, the statements are cancelled as
long as the ingestion job is running i.e., ACKs from all nodes/partitions
are not received and the CC has not decided to commit the statement yet.
Change-Id: I16410ab9353c24597f77ab38ce06996fc5dfacd0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18257
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index 0436ea8..84a6488 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -54,9 +54,9 @@
if (complete) {
return;
}
- complete();
- state = State.CANCELLED;
if (cancellable) {
+ complete();
+ state = State.CANCELLED;
doCancel(appCtx);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index 0786895..9d1e108 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -215,6 +215,7 @@
@Override
public void abortTransaction(JobId jobId) throws Exception {
IGlobalTransactionContext context = getTransactionContext(jobId);
+ context.resetAcksReceived();
if (context.getTxnStatus() == TransactionStatus.PREPARED) {
sendJobRollbackMessages(context);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 34648dd..3035496 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4009,10 +4009,12 @@
final ClientRequest clientRequest =
(ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
clientRequest.setJobId(jobId);
+ clientRequest.markCancellable();
String nameBefore = Thread.currentThread().getName();
try {
Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
hcc.waitForCompletion(jobId);
+ ensureNotCancelled(clientRequest);
} finally {
Thread.currentThread().setName(nameBefore);
}
@@ -4151,7 +4153,7 @@
ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqParams.getRequestReference().getUuid());
if (stmtInsertUpsert.getReturnExpression() != null) {
deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
- reqParams, false, stmt, clientRequest);
+ reqParams, true, stmt, clientRequest);
} else {
locker.lock();
JobId jobId = null;
@@ -4176,10 +4178,12 @@
}
jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
clientRequest.setJobId(jobId);
+ clientRequest.markCancellable();
String nameBefore = Thread.currentThread().getName();
try {
Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
hcc.waitForCompletion(jobId);
+ ensureNotCancelled(clientRequest);
} finally {
Thread.currentThread().setName(nameBefore);
}
@@ -4246,10 +4250,12 @@
final ClientRequest clientRequest =
(ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
clientRequest.setJobId(jobId);
+ clientRequest.markCancellable();
String nameBefore = Thread.currentThread().getName();
try {
Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
hcc.waitForCompletion(jobId);
+ ensureNotCancelled(clientRequest);
} finally {
Thread.currentThread().setName(nameBefore);
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsCancellationTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsCancellationTest.java
new file mode 100644
index 0000000..1cb87d8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsCancellationTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.atomic_statements;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.asterix.api.http.server.QueryServiceRequestParameters.Parameter.CLIENT_ID;
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+
+import java.io.File;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.api.common.LocalCloudUtil;
+import org.apache.asterix.common.TestDataUtil;
+import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.ParameterTypeEnum;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.http.HttpResponse;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AtomicStatementsCancellationTest {
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ private static final String TEST_CONFIG_FILE_NAME = "cc-cloud-storage.conf";
+ private static final TestExecutor TEST_EXECUTOR = new TestExecutor();
+
+ private static final String TEST_CONFIG_PATH =
+ joinPath(System.getProperty("user.dir"), "src", "test", "resources");;
+ private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME;
+ private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
+ private static final String DATASET_NAME = "ds_0";
+ private static final int BATCH_SIZE = 20000;
+ private static final int NUM_UPSERTS = 100;
+
+ @Before
+ public void setUp() throws Exception {
+ boolean cleanStart = true;
+ LocalCloudUtil.startS3CloudEnvironment(cleanStart);
+ integrationUtil.setGracefulShutdown(true);
+ integrationUtil.init(true, TEST_CONFIG_FILE_PATH);
+ createDatasets();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ integrationUtil.deinit(true);
+ }
+
+ private void createDatasets() throws Exception {
+ TestDataUtil.createDatasetWithoutType(DATASET_NAME, Map.of("id", "uuid"), true);
+ TestDataUtil.createSecondaryBTreeIndex(DATASET_NAME, DATASET_NAME + "_sidx", "name:string");
+ }
+
+ public String generateInsertStatement(String dataset, long count) throws Exception {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < count; i++) {
+ stringBuilder.append("{\"name\": \"name_" + i + "\"},");
+ }
+ stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+ return "INSERT INTO " + dataset + "([" + stringBuilder + "]);";
+ }
+
+ private int cancelQuery(URI uri, List<TestCase.CompilationUnit.Parameter> params) throws Exception {
+ HttpResponse response = TEST_EXECUTOR.executeHttpRequest(TestExecutor.constructDeleteMethod(uri, params));
+ return response.getStatusLine().getStatusCode();
+ }
+
+ @Test
+ public void testAtomicityWithCancellation() throws Exception {
+ Random rnd = new Random();
+ for (int j = 0; j < NUM_UPSERTS; j++) {
+ String clientContextId = UUID.randomUUID().toString();
+ final List<TestCase.CompilationUnit.Parameter> params = new ArrayList<>();
+ TestCase.CompilationUnit.Parameter newParam = new TestCase.CompilationUnit.Parameter();
+ newParam.setName(CLIENT_ID.str());
+ newParam.setType(ParameterTypeEnum.STRING);
+ newParam.setValue(clientContextId);
+ params.add(newParam);
+ String statement = generateInsertStatement(DATASET_NAME, BATCH_SIZE);
+ Callable<InputStream> upsert = () -> {
+ try {
+ return TEST_EXECUTOR.executeQueryService(statement, TestCaseContext.OutputFormat.CLEAN_JSON,
+ TEST_EXECUTOR.getEndpoint(Servlets.QUERY_SERVICE), params, false, UTF_8);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ };
+ Future<InputStream> future = executor.submit(upsert);
+ if (!future.isDone()) {
+ Thread.sleep(rnd.nextInt(900) + 800);
+ // Cancels the query request while the query is executing.
+ int rc = cancelQuery(TEST_EXECUTOR.getEndpoint(Servlets.RUNNING_REQUESTS), params);
+ Assert.assertTrue(rc == 200 || rc == 404 || rc == 403);
+ }
+ while (!future.isDone()) {
+ Thread.sleep(100);
+ }
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index e6b4662..f95111e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -692,7 +692,7 @@
return checkResponse(executeBasicAuthHttpRequest(method, credentials), responseCodeValidator);
}
- protected HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception {
+ public HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception {
// https://issues.apache.org/jira/browse/ASTERIXDB-2315
ExecutorService executor = Executors.newSingleThreadExecutor();
CloseableHttpClient client = HttpClients.custom().addInterceptorFirst(new PreemptiveAuthInterceptor())
@@ -933,7 +933,7 @@
return false;
}
- protected List<Parameter> upsertParam(List<Parameter> params, String name, ParameterTypeEnum type, String value) {
+ public List<Parameter> upsertParam(List<Parameter> params, String name, ParameterTypeEnum type, String value) {
boolean replaced = false;
List<Parameter> result = new ArrayList<>();
for (Parameter param : params) {
@@ -2983,7 +2983,7 @@
}
// adapted from https://stackoverflow.com/questions/2014700/preemptive-basic-authentication-with-apache-httpclient-4
- static class PreemptiveAuthInterceptor implements HttpRequestInterceptor {
+ public static class PreemptiveAuthInterceptor implements HttpRequestInterceptor {
public void process(final HttpRequest request, final HttpContext context) throws HttpException, IOException {
AuthState authState = (AuthState) context.getAttribute(HttpClientContext.TARGET_AUTH_STATE);