[ASTERIXDB-1943][API][STO] Make rebalance idempotent.
- user model changes:
added rebalance cancellation HTTP API.
- storage format changes: no
- interface changes: no
Details:
- add a HTTP API for cancelling a rebalance request;
- clean up leftover states at the beginning of a
rebalance request;
- add tests for rebalance cancellation.
Change-Id: I0d14a07978e106cd497cc35538fafef318b2fcf7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1821
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
index 53d8f6b..3bd1be5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
@@ -21,12 +21,18 @@
import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
import java.io.PrintWriter;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -61,20 +67,39 @@
private static final String METADATA = "Metadata";
private final ICcApplicationContext appCtx;
+ // One-at-a-time thread executor, for rebalance tasks.
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ // A queue that maintains submitted rebalance requests.
+ private final Queue<Future> rebalanceTasks = new ArrayDeque<>();
+
+ // A queue that tracks the termination of rebalance threads.
+ private final Queue<CountDownLatch> rebalanceFutureTerminated = new ArrayDeque<>();
+
public RebalanceApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx) {
super(ctx, paths);
this.appCtx = appCtx;
}
@Override
- protected void post(IServletRequest request, IServletResponse response) {
- PrintWriter out = response.writer();
- ObjectMapper om = new ObjectMapper();
- ObjectNode jsonResponse = om.createObjectNode();
+ protected void delete(IServletRequest request, IServletResponse response) {
try {
// Sets the content type.
HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
+ // Cancels all rebalance requests.
+ cancelRebalance();
+ // Sends the response back.
+ sendResponse(response, HttpResponseStatus.OK, "rebalance tasks are cancelled");
+ } catch (Exception e) {
+ // Sends back and logs internal error if any exception happens during cancellation.
+ sendResponse(response, HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected void post(IServletRequest request, IServletResponse response) {
+ try {
// Gets dataverse, dataset, and target nodes for rebalance.
String dataverseName = request.getParameter("dataverseName");
String datasetName = request.getParameter("datasetName");
@@ -82,31 +107,66 @@
// Parses and check target nodes.
if (nodes == null) {
- sendResponse(out, jsonResponse, response, HttpResponseStatus.BAD_REQUEST,
- "nodes are not given");
+ sendResponse(response, HttpResponseStatus.BAD_REQUEST, "nodes are not given");
return;
}
String nodesString = StringUtils.strip(nodes, "\"'").trim();
String[] targetNodes = nodesString.split(",");
if ("".equals(nodesString)) {
- sendResponse(out, jsonResponse, response, HttpResponseStatus.BAD_REQUEST,
- "target nodes should not be empty");
+ sendResponse(response, HttpResponseStatus.BAD_REQUEST, "target nodes should not be empty");
return;
}
// If a user gives parameter datasetName, she should give dataverseName as well.
if (dataverseName == null && datasetName != null) {
- sendResponse(out, jsonResponse, response, HttpResponseStatus.BAD_REQUEST,
+ sendResponse(response, HttpResponseStatus.BAD_REQUEST,
"to rebalance a particular dataset, the parameter dataverseName must be given");
return;
}
// Does not allow rebalancing a metadata dataset.
if (METADATA.equals(dataverseName)) {
- sendResponse(out, jsonResponse, response, HttpResponseStatus.BAD_REQUEST,
- "cannot rebalance a metadata dataset");
+ sendResponse(response, HttpResponseStatus.BAD_REQUEST, "cannot rebalance a metadata dataset");
return;
}
+ // Schedules a rebalance task and wait for its completion.
+ CountDownLatch terminated = scheduleRebalance(dataverseName, datasetName, targetNodes, response);
+ terminated.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ sendResponse(response, HttpResponseStatus.INTERNAL_SERVER_ERROR, "the rebalance service is interrupted", e);
+ }
+ }
+
+ // Cancels all rebalance tasks.
+ private synchronized void cancelRebalance() throws InterruptedException {
+ for (Future rebalanceTask : rebalanceTasks) {
+ rebalanceTask.cancel(true);
+ }
+ }
+
+ // Removes a terminated task and its termination latch -- the heads.
+ private synchronized void removeTermintedTask() {
+ rebalanceTasks.remove();
+ rebalanceFutureTerminated.remove();
+ }
+
+ // Schedules a rebalance task.
+ private synchronized CountDownLatch scheduleRebalance(String dataverseName, String datasetName,
+ String[] targetNodes, IServletResponse response) {
+ CountDownLatch terminated = new CountDownLatch(1);
+ Future task = executor.submit(() -> doRebalance(dataverseName, datasetName, targetNodes, response, terminated));
+ rebalanceTasks.add(task);
+ rebalanceFutureTerminated.add(terminated);
+ return terminated;
+ }
+
+ // Performs the actual rebalance.
+ private void doRebalance(String dataverseName, String datasetName, String[] targetNodes, IServletResponse response,
+ CountDownLatch terminated) {
+ try {
+ // Sets the content type.
+ HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
if (datasetName == null) {
// Rebalances datasets in a given dataverse or all non-metadata datasets.
@@ -123,10 +183,19 @@
}
// Sends response.
- sendResponse(out, jsonResponse, response, HttpResponseStatus.OK, "successful");
+ sendResponse(response, HttpResponseStatus.OK, "successful");
+ } catch (InterruptedException e) {
+ sendResponse(response, HttpResponseStatus.INTERNAL_SERVER_ERROR,
+ "the rebalance task is cancelled by a user", e);
} catch (Exception e) {
- sendResponse(out, jsonResponse, response, HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
- LOGGER.log(Level.WARNING, e.getMessage(), e);
+ sendResponse(response, HttpResponseStatus.INTERNAL_SERVER_ERROR, e.toString(), e);
+ } finally {
+ // Removes the heads of the task queue and the latch queue.
+ // Since the ExecutorService is one-at-a-time, the execution order of rebalance tasks is
+ // the same as the request submission order.
+ removeTermintedTask();
+ // Notify that the rebalance task is terminated.
+ terminated.countDown();
}
}
@@ -177,10 +246,24 @@
}
// Sends HTTP response to the request client.
- private void sendResponse(PrintWriter out, ObjectNode jsonResponse, IServletResponse response,
- HttpResponseStatus status, String message) {
+ private void sendResponse(IServletResponse response, HttpResponseStatus status, String message, Exception e) {
+ if (status != HttpResponseStatus.OK) {
+ if (e != null) {
+ LOGGER.log(Level.WARNING, message, e);
+ } else {
+ LOGGER.log(Level.WARNING, message);
+ }
+ }
+ PrintWriter out = response.writer();
+ ObjectMapper om = new ObjectMapper();
+ ObjectNode jsonResponse = om.createObjectNode();
jsonResponse.put("results", message);
response.setStatus(status);
out.write(jsonResponse.toString());
}
+
+ // Sends HTTP response to the request client.
+ private void sendResponse(IServletResponse response, HttpResponseStatus status, String message) {
+ sendResponse(response, status, message, null);
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 691be50..275b055 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -65,6 +65,7 @@
import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -691,7 +692,15 @@
} else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.UPSERT_BYTE) {
// undo, upsert the old value if found, otherwise, physical delete
if (logRecord.getOldValue() == null) {
- indexAccessor.forcePhysicalDelete(logRecord.getNewValue());
+ try {
+ indexAccessor.forcePhysicalDelete(logRecord.getNewValue());
+ } catch (HyracksDataException hde) {
+ // Since we're undoing according the write-ahead log, the actual upserting tuple
+ // might not have been written to memory yet.
+ if (hde.getErrorCode() != ErrorCode.UPDATE_OR_DELETE_NON_EXISTENT_KEY) {
+ throw hde;
+ }
+ }
} else {
indexAccessor.forceUpsert(logRecord.getOldValue());
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index dae88c2..3b17a94 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -24,6 +24,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import java.util.stream.IntStream;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -61,6 +63,7 @@
* A utility class for the rebalance operation.
*/
public class RebalanceUtil {
+ private static final Logger LOGGER = Logger.getLogger(RebalanceUtil.class.getName());
private RebalanceUtil() {
@@ -83,12 +86,13 @@
*/
public static void rebalance(String dataverseName, String datasetName, Set<String> targetNcNames,
MetadataProvider metadataProvider, IHyracksClientConnection hcc) throws Exception {
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
Dataset sourceDataset;
Dataset targetDataset;
+ // Executes the first Metadata transaction.
// Generates the rebalance target files. While doing that, hold read locks on the dataset so
// that no one can drop the rebalance source dataset.
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
// The source dataset.
sourceDataset = metadataProvider.findDataset(dataverseName, datasetName);
@@ -125,13 +129,57 @@
metadataProvider.getLocks().reset();
}
- // Starts another transaction for switching the metadata entity.
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ // Up to this point, since the bulk part of a rebalance operation is done,
+ // the following two operations will retry after interrupt and finally rethrow InterruptedException,
+ // which means that they will always succeed and could possibly throw InterruptedException as the last step.
+ // TODO(yingyi): ASTERIXDB-1948, in case a crash happens, currently the system will either:
+ // 1. (crash before metadata switch) think the rebalance is not done, and the target data files are leaked until
+ // the next rebalance request.
+ // 2. (crash after metadata switch) think the rebalance is done, and the source data files are leaked;
+ runWithRetryAfterInterrupt(() -> {
+ // Executes the 2nd Metadata transaction for switching the metadata entity.
+ // It detaches the source dataset and attaches the target dataset to metadata's point of view.
+ runMetadataTransaction(metadataProvider,
+ () -> rebalanceSwitch(sourceDataset, targetDataset, metadataProvider, hcc));
+ // Executes the 3rd Metadata transaction to drop the source dataset files and the node group for
+ // the source dataset.
+ runMetadataTransaction(metadataProvider, () -> dropSourceDataset(sourceDataset, metadataProvider, hcc));
+ });
+ }
+
+ @FunctionalInterface
+ private interface Work {
+ void run() throws Exception;
+ }
+
+ // Runs works.run() and lets it sustain interrupts.
+ private static void runWithRetryAfterInterrupt(Work work) throws Exception {
+ int retryCount = 0;
+ InterruptedException interruptedException = null;
+ boolean done = false;
+ do {
+ try {
+ work.run();
+ done = true;
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARNING, "Retry with attempt " + (++retryCount), e);
+ interruptedException = e;
+ }
+ } while (!done);
+
+ // Rethrows the interrupted exception.
+ if (interruptedException != null) {
+ throw interruptedException;
+ }
+ }
+
+ // Executes a metadata transaction.
+ private static void runMetadataTransaction(MetadataProvider metadataProvider, Work work) throws Exception {
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
- // Atomically switches the rebalance target to become the source dataset.
- rebalanceSwitch(sourceDataset, targetDataset, metadataProvider, hcc);
-
+ // Performs the actual work.
+ work.run();
// Complete the metadata transaction.
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
@@ -145,6 +193,9 @@
// Rebalances from the source to the target.
private static void rebalance(Dataset source, Dataset target, MetadataProvider metadataProvider,
IHyracksClientConnection hcc) throws Exception {
+ // Drops the target dataset files (if any) to make rebalance idempotent.
+ dropDatasetFiles(target, metadataProvider, hcc);
+
// Creates the rebalance target.
createRebalanceTarget(target, metadataProvider, hcc);
@@ -155,6 +206,7 @@
createAndLoadSecondaryIndexesForTarget(source, target, metadataProvider, hcc);
}
+ // Switches the metadata entity from the source dataset to the target dataset.
private static void rebalanceSwitch(Dataset source, Dataset target, MetadataProvider metadataProvider,
IHyracksClientConnection hcc) throws Exception {
MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
@@ -164,6 +216,7 @@
Dataset sourceDataset = MetadataManagerUtil.findDataset(mdTxnCtx, source.getDataverseName(),
source.getDatasetName());
+
if (sourceDataset == null) {
// The dataset has already been dropped.
// In this case, we should drop the generated target dataset files.
@@ -171,18 +224,24 @@
return;
}
- // Drops the source dataset files.
- dropDatasetFiles(source, metadataProvider, hcc);
-
// Updates the dataset entry in the metadata storage
MetadataManager.INSTANCE.updateDataset(mdTxnCtx, target);
+ }
+
+ // Drops the source dataset.
+ private static void dropSourceDataset(Dataset source, MetadataProvider metadataProvider,
+ IHyracksClientConnection hcc) throws Exception {
+ // Drops the source dataset files. No need to lock the dataset entity here because the source dataset has
+ // been detached at this point.
+ dropDatasetFiles(source, metadataProvider, hcc);
// Drops the metadata entry of source dataset's node group.
String sourceNodeGroup = source.getNodeGroupName();
MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(), sourceNodeGroup);
- MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, sourceNodeGroup, true);
+ MetadataManager.INSTANCE.dropNodegroup(metadataProvider.getMetadataTxnContext(), sourceNodeGroup, true);
}
+
// Creates the files for the rebalance target dataset.
private static void createRebalanceTarget(Dataset target, MetadataProvider metadataProvider,
IHyracksClientConnection hcc) throws Exception {
@@ -254,12 +313,13 @@
new RecordDescriptor[] { target.getPrimaryRecordDescriptor(metadataProvider) });
}
+ // Drops dataset files of a given dataset.
private static void dropDatasetFiles(Dataset dataset, MetadataProvider metadataProvider,
IHyracksClientConnection hcc) throws Exception {
List<JobSpecification> jobs = new ArrayList<>();
List<Index> indexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName());
for (Index index : indexes) {
- jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset));
+ jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset, true));
}
for (JobSpecification jobSpec : jobs) {
JobUtils.runJob(hcc, jobSpec, true);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
index 50c2986..b4f9ded 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -31,6 +31,7 @@
import java.util.concurrent.Future;
import java.util.function.Predicate;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.utils.Servlets;
import org.apache.asterix.test.runtime.SqlppExecutionWithCancellationTest;
import org.apache.asterix.testframework.context.TestCaseContext;
@@ -104,7 +105,7 @@
return false;
}
}
- String errorMsg = getErrorMessage(e);
+ String errorMsg = ExceptionUtils.getErrorMessage(e);
// Expected, "HYR0025" means a user cancelled the query.)
if (errorMsg.startsWith("HYR0025")) {
SqlppExecutionWithCancellationTest.numCancelledQueries++;
@@ -115,21 +116,4 @@
return true;
}
}
-
- public static String getErrorMessage(Throwable th) {
- Throwable cause = getRootCause(th);
- return cause.getMessage();
- }
-
- // Finds the root cause of Throwable.
- private static Throwable getRootCause(Throwable e) {
- Throwable current = e;
- Throwable cause = e.getCause();
- while (cause != null && cause != current) {
- Throwable nextCause = current.getCause();
- current = cause;
- cause = nextCause;
- }
- return current;
- }
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/RebalanceCancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/RebalanceCancellationTestExecutor.java
new file mode 100644
index 0000000..a63cb76
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/RebalanceCancellationTestExecutor.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.common;
+
+import java.io.File;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.logging.Level;
+
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.ComparisonEnum;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.methods.RequestBuilder;
+import org.junit.Assert;
+
+public class RebalanceCancellationTestExecutor extends TestExecutor {
+
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ private long waitTime = 100;
+
+ public void setWaitTime(long waitTime) {
+ this.waitTime = waitTime;
+ }
+
+ @Override
+ protected void executeHttpRequest(TestCaseContext.OutputFormat fmt, String statement,
+ Map<String, Object> variableCtx, String reqType, File testFile, File expectedResultFile,
+ File actualResultFile, MutableInt queryCount, int numResultFiles, String extension, ComparisonEnum compare)
+ throws Exception {
+ // Executes regular tests as usual.
+ if (!(testFile.getAbsolutePath().endsWith("post.http") && statement.contains("rebalance"))) {
+ super.executeHttpRequest(fmt, statement, variableCtx, reqType, testFile, expectedResultFile,
+ actualResultFile, queryCount, numResultFiles, extension, compare);
+ return;
+ }
+
+ // Executes rebalance tests with cancellation.
+ Future<Exception> future = executor.submit(() -> {
+ //boolean failed = false;
+ try {
+ super.executeHttpRequest(fmt, statement, variableCtx, reqType, testFile, expectedResultFile,
+ actualResultFile, queryCount, numResultFiles, extension, compare);
+ } catch (Exception e) {
+ // Since Hyracks job cancellation is not synchronous, re-executing rebalance could
+ // fail, but we keep retrying until it completes.
+ boolean done = false;
+ do {
+ try {
+ // Re-executes rebalance.
+ super.executeHttpRequest(fmt, statement, variableCtx, reqType, testFile, expectedResultFile,
+ actualResultFile, queryCount, numResultFiles, extension, compare);
+ done = true;
+ } catch (Exception e2) {
+ String errorMsg = ExceptionUtils.getErrorMessage(e2);
+ // not expected, but is a false alarm.
+ if (errorMsg == null || !errorMsg.contains("reference count = 1")) {
+ return e2;
+ }
+ LOGGER.log(Level.WARNING, e2.toString(), e2);
+ }
+ } while (!done);
+ }
+ return null;
+ });
+ Thread.sleep(waitTime);
+ // Cancels the query request while the query is executing.
+ int rc = cancelQuery(getEndpoint(Servlets.REBALANCE), Collections.emptyList());
+ Assert.assertTrue(rc == 200 || rc == 404);
+ Exception e = future.get();
+ if (e != null) {
+ throw e;
+ }
+ }
+
+ // Cancels a submitted query through the cancellation REST API.
+ private int cancelQuery(URI uri, List<TestCase.CompilationUnit.Parameter> params) throws Exception {
+ HttpUriRequest method = constructDeleteMethodUrl(uri, params);
+ HttpResponse response = executeHttpRequest(method);
+ return response.getStatusLine().getStatusCode();
+ }
+
+ // Constructs a HTTP DELETE request.
+ private HttpUriRequest constructDeleteMethodUrl(URI uri, List<TestCase.CompilationUnit.Parameter> otherParams) {
+ RequestBuilder builder = RequestBuilder.delete(uri);
+ for (TestCase.CompilationUnit.Parameter param : otherParams) {
+ builder.addParameter(param.getName(), param.getValue());
+ }
+ builder.setCharset(StandardCharsets.UTF_8);
+ return builder.build();
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index ed6a77a..8791756 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -1058,9 +1058,9 @@
}
}
- private void executeHttpRequest(OutputFormat fmt, String statement, Map<String, Object> variableCtx, String reqType,
- File testFile, File expectedResultFile, File actualResultFile, MutableInt queryCount, int numResultFiles,
- String extension, ComparisonEnum compare) throws Exception {
+ protected void executeHttpRequest(OutputFormat fmt, String statement, Map<String, Object> variableCtx,
+ String reqType, File testFile, File expectedResultFile, File actualResultFile, MutableInt queryCount,
+ int numResultFiles, String extension, ComparisonEnum compare) throws Exception {
String handleVar = getHandleVariable(statement);
final String trimmedPathAndQuery = stripLineComments(stripJavaComments(statement)).trim();
final String variablesReplaced = replaceVarRef(trimmedPathAndQuery, variableCtx);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceWithCancellationIT.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceWithCancellationIT.java
new file mode 100644
index 0000000..1d7bdc3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceWithCancellationIT.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.runtime;
+
+import java.util.Collection;
+
+import org.apache.asterix.test.common.RebalanceCancellationTestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs rebalance tests with cancellation.
+ */
+@RunWith(Parameterized.class)
+public class RebalanceWithCancellationIT {
+ protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+ private static RebalanceCancellationTestExecutor executor = new RebalanceCancellationTestExecutor();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, executor);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+ }
+
+ @Parameters(name = "RebalanceWithCancellationIT {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return LangExecutionUtil.tests("only_sqlpp.xml", "rebalance.xml");
+ }
+
+ protected TestCaseContext tcCtx;
+
+ public RebalanceWithCancellationIT(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Test
+ public void test() throws Exception {
+ // Runs each single cancellation test multiple times and tests cancellation at various points of time.
+ for (int waitTime = 100; waitTime <= 1000; waitTime += 50) {
+ executor.setWaitTime(waitTime);
+ LangExecutionUtil.test(tcCtx);
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java
index cce069c..fff0775 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java
@@ -20,6 +20,7 @@
import java.util.Collection;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.test.common.CancellationTestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
import org.junit.AfterClass;
@@ -70,7 +71,7 @@
try {
LangExecutionUtil.test(tcCtx);
} catch (Exception e) {
- String errorMsg = CancellationTestExecutor.getErrorMessage(e);
+ String errorMsg = ExceptionUtils.getErrorMessage(e);
if (!errorMsg.contains("reference count = 1") // not expected, but is a false alarm.
&& !errorMsg.contains("pinned and file is being closed") // not expected, but maybe a false alarm.
) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 51a535a..3da58e9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -35,6 +35,7 @@
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.utils.TransactionUtil;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -263,19 +264,30 @@
@Override
public synchronized void close(String resourcePath) throws HyracksDataException {
- validateDatasetLifecycleManagerState();
- int did = getDIDfromResourcePath(resourcePath);
- long resourceID = getResourceIDfromResourcePath(resourcePath);
- DatasetResource dsr = datasets.get(did);
- if (dsr == null) {
- throw new HyracksDataException("No index found with resourceID " + resourceID);
+ DatasetResource dsr = null;
+ IndexInfo iInfo = null;
+ try {
+ validateDatasetLifecycleManagerState();
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
+ dsr = datasets.get(did);
+ if (dsr == null) {
+ throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
+ }
+ iInfo = dsr.getIndexInfo(resourceID);
+ if (iInfo == null) {
+ throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
+ }
+ } finally {
+ // Regardless of what exception is thrown in the try-block (e.g., line 279),
+ // we have to un-touch the index and dataset.
+ if (iInfo != null) {
+ iInfo.untouch();
+ }
+ if (dsr != null) {
+ dsr.untouch();
+ }
}
- IndexInfo iInfo = dsr.getIndexInfo(resourceID);
- if (iInfo == null) {
- throw new HyracksDataException("No index found with resourceID " + resourceID);
- }
- iInfo.untouch();
- dsr.untouch();
}
@Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
index 649f1f5..3105b3f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
@@ -20,7 +20,6 @@
public class ExceptionUtils {
public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n";
- public static final String MISSING_PARAMETER = "Missing parameter.\n";
public static final String PARAMETER_NAME = "Parameter name: ";
public static final String EXPECTED_VALUE = "Expected value: ";
public static final String PASSED_VALUE = "Passed value: ";
@@ -32,4 +31,22 @@
return INCORRECT_PARAMETER + PARAMETER_NAME + parameterName + System.lineSeparator() + EXPECTED_VALUE
+ expectedValue + System.lineSeparator() + PASSED_VALUE + passedValue;
}
+
+ // Gets the error message for the root cause of a given Throwable instance.
+ public static String getErrorMessage(Throwable th) {
+ Throwable cause = getRootCause(th);
+ return cause.getMessage();
+ }
+
+ // Finds the root cause of a given Throwable instance.
+ public static Throwable getRootCause(Throwable e) {
+ Throwable current = e;
+ Throwable cause = e.getCause();
+ while (cause != null && cause != current) {
+ Throwable nextCause = current.getCause();
+ current = cause;
+ cause = nextCause;
+ }
+ return current;
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 9e55a97..411f866 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -103,7 +103,14 @@
Dataset dataset) throws AlgebricksException {
SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
.createIndexOperationsHelper(dataset, index, metadataProvider, physicalOptimizationConfig);
- return secondaryIndexHelper.buildDropJobSpec();
+ return secondaryIndexHelper.buildDropJobSpec(false);
+ }
+
+ public static JobSpecification buildDropIndexJobSpec(Index index, MetadataProvider metadataProvider,
+ Dataset dataset, boolean failSilently) throws AlgebricksException {
+ SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
+ .createIndexOperationsHelper(dataset, index, metadataProvider, physicalOptimizationConfig);
+ return secondaryIndexHelper.buildDropJobSpec(failSilently);
}
public static JobSpecification buildSecondaryIndexCreationJobSpec(Dataset dataset, Index index,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index d11ba21..8c45c11 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -170,7 +170,7 @@
public abstract JobSpecification buildCompactJobSpec() throws AlgebricksException;
- public abstract JobSpecification buildDropJobSpec() throws AlgebricksException;
+ public abstract JobSpecification buildDropJobSpec(boolean failSilently) throws AlgebricksException;
protected void init() throws AlgebricksException {
payloadSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 907192c..2dcab4f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -66,14 +66,15 @@
}
@Override
- public JobSpecification buildDropJobSpec() throws AlgebricksException {
+ public JobSpecification buildDropJobSpec(boolean failSilently) throws AlgebricksException {
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first);
// The index drop operation should be persistent regardless of temp datasets or permanent dataset.
- IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, dataflowHelperFactory);
+ IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, dataflowHelperFactory,
+ failSilently);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
splitsAndConstraint.second);
spec.addRoot(btreeDrop);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index cb03ae4..e530bc3 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -213,13 +213,18 @@
public synchronized void delete(String relativePath) throws HyracksDataException {
FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
if (resourceFile.getFile().exists()) {
- resourceFile.delete();
- resourceCache.invalidate(relativePath);
-
- //if replication enabled, delete resource from remote replicas
- if (isReplicationEnabled
- && !resourceFile.getFile().getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) {
- createReplicationJob(ReplicationOperation.DELETE, resourceFile);
+ try {
+ // Invalidate before deleting the file just in case file deletion throws some exception.
+ // Since it's just a cache invalidation, it should not affect correctness.
+ resourceCache.invalidate(relativePath);
+ resourceFile.delete();
+ } finally {
+ // Regardless of successfully deleted or not, the operation should be replicated.
+ //if replication enabled, delete resource from remote replicas
+ if (isReplicationEnabled
+ && !resourceFile.getFile().getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) {
+ createReplicationJob(ReplicationOperation.DELETE, resourceFile);
+ }
}
} else {
throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST,
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 8a91547..2e6c8a3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -116,6 +116,7 @@
public static final int CANNOT_CREATE_EXISTING_INDEX = 80;
public static final int FILE_ALREADY_MAPPED = 81;
public static final int FILE_ALREADY_EXISTS = 82;
+ public static final int NO_INDEX_FOUND_WITH_RESOURCE_ID = 83;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 9f983a7..6389ffa 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -99,5 +99,6 @@
80 = Cannot create index because it already exists
81 = File %1$s is already mapped
82 = Failed to create the file %1$s because it already exists
+83 = No index found with resourceID %1$s
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
index d162be0..18c7107 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
@@ -30,16 +30,23 @@
private static final long serialVersionUID = 1L;
private final IIndexDataflowHelperFactory dataflowHelperFactory;
+ private final boolean failSilently;
public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
IIndexDataflowHelperFactory dataflowHelperFactory) {
+ this(spec, dataflowHelperFactory, false);
+ }
+
+ public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ IIndexDataflowHelperFactory dataflowHelperFactory, boolean failSilently) {
super(spec, 0, 0);
this.dataflowHelperFactory = dataflowHelperFactory;
+ this.failSilently = failSilently;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- return new IndexDropOperatorNodePushable(dataflowHelperFactory, ctx, partition);
+ return new IndexDropOperatorNodePushable(dataflowHelperFactory, failSilently, ctx, partition);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
index fce31ca..7c2021b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
@@ -28,10 +28,12 @@
public class IndexDropOperatorNodePushable extends AbstractOperatorNodePushable {
private final IIndexDataflowHelper indexHelper;
+ private final boolean failSliently;
- public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, IHyracksTaskContext ctx,
- int partition) throws HyracksDataException {
+ public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, boolean failSilently,
+ IHyracksTaskContext ctx, int partition) throws HyracksDataException {
this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+ this.failSliently = failSilently;
}
@Override
@@ -50,7 +52,13 @@
@Override
public void initialize() throws HyracksDataException {
- indexHelper.destroy();
+ try {
+ indexHelper.destroy();
+ } catch (HyracksDataException e) {
+ if (!failSliently) {
+ throw e;
+ }
+ }
}
@Override