Merge branch 'gerrit/stabilization-3b6982ce7f'
Change-Id: I2ea64b20d39dc9cb4e14f350424f0e4b2e038e2c
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 7396dcb..4b6f01d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -1703,7 +1703,7 @@
validateDatasetState(metadataProvider, ds, sourceLoc);
ds.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn, progress, hcc, dropCorrespondingNodeGroup,
- sourceLoc, Collections.emptySet(), requestParameters.isForceDropDataset());
+ sourceLoc, EnumSet.of(DropOption.IF_EXISTS), requestParameters.isForceDropDataset());
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
return true;
@@ -1719,6 +1719,10 @@
try {
if (ds != null) {
jobsToExecute.clear();
+ // start another txn for the compensating operations
+ mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
+ bActiveTxn.setValue(true);
+ metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
ds.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn, progress, hcc,
dropCorrespondingNodeGroup, sourceLoc, EnumSet.of(DropOption.IF_EXISTS),
requestParameters.isForceDropDataset());
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 508eb76..29dedf7 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -230,20 +230,21 @@
@Override
public synchronized void delete(String relativePath) throws HyracksDataException {
FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
- if (resourceFile.getFile().exists()) {
- if (isReplicationEnabled) {
- createReplicationJob(ReplicationOperation.DELETE, resourceFile);
+ try {
+ if (resourceFile.getFile().exists()) {
+ if (isReplicationEnabled) {
+ createReplicationJob(ReplicationOperation.DELETE, resourceFile);
+ }
+ final LocalResource localResource = readLocalResource(resourceFile.getFile());
+ IoUtil.delete(resourceFile);
+ // delete all checkpoints
+ indexCheckpointManagerProvider.get(DatasetResourceReference.of(localResource)).delete();
+ } else {
+ throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST,
+ relativePath);
}
- final LocalResource localResource = readLocalResource(resourceFile.getFile());
- // Invalidate before deleting the file just in case file deletion throws some exception.
- // Since it's just a cache invalidation, it should not affect correctness.
+ } finally {
resourceCache.invalidate(relativePath);
- IoUtil.delete(resourceFile);
- // delete all checkpoints
- indexCheckpointManagerProvider.get(DatasetResourceReference.of(localResource)).delete();
- } else {
- throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST,
- relativePath);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index c61e0d2..835cd54 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -45,6 +45,7 @@
import org.apache.hyracks.http.server.FormUrlEncodedRequest;
import org.apache.hyracks.http.server.HttpServer;
import org.apache.hyracks.util.ThrowingConsumer;
+import org.apache.hyracks.util.ThrowingFunction;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -191,16 +192,23 @@
return i < 0 ? uri : uri.substring(0, i);
}
- public static void handleStreamInterruptibly(CloseableHttpResponse response,
+ public static void consumeStreamInterruptibly(CloseableHttpResponse response,
ThrowingConsumer<Reader> streamProcessor, ExecutorService executor, Supplier<String> descriptionSupplier)
+ throws InterruptedException, ExecutionException, IOException {
+ processStreamInterruptibly(response, ThrowingConsumer.asFunction(streamProcessor), executor,
+ descriptionSupplier);
+ }
+
+ public static <T> T processStreamInterruptibly(CloseableHttpResponse response,
+ ThrowingFunction<Reader, T> streamProcessor, ExecutorService executor, Supplier<String> descriptionSupplier)
throws IOException, InterruptedException, ExecutionException {
// we have to consume the stream in a separate thread, as it not stop on interrupt; we need to
// instead close the connection to achieve the interrupt
String description = descriptionSupplier.get();
- Future<Void> readFuture = executor.submit(() -> {
+ Future<T> readFuture = executor.submit(() -> {
Thread.currentThread().setName(description);
InputStreamReader reader = new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8);
- streamProcessor.process(new Reader() {
+ return streamProcessor.process(new Reader() {
@Override
public int read(char[] cbuf, int off, int len) throws IOException {
return reader.read(cbuf, off, len);
@@ -213,10 +221,9 @@
LOGGER.debug("ignoring close on {}", reader);
}
});
- return null;
});
try {
- readFuture.get();
+ return readFuture.get();
} catch (InterruptedException ex) { // NOSONAR -- interrupt or rethrow
response.close();
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
index e48db2b..3b6669e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
@@ -21,6 +21,7 @@
import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_DROP_IN_USE_INDEX;
import static org.apache.hyracks.api.exceptions.ErrorCode.INDEX_DOES_NOT_EXIST;
+import static org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST;
import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption.IF_EXISTS;
import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption.WAIT_ON_IN_USE;
@@ -96,7 +97,7 @@
}
private boolean isIgnorable(HyracksDataException e) {
- return e.matches(INDEX_DOES_NOT_EXIST) && options.contains(IF_EXISTS);
+ return (e.matches(INDEX_DOES_NOT_EXIST) || e.matches(RESOURCE_DOES_NOT_EXIST)) && options.contains(IF_EXISTS);
}
private boolean canRetry(HyracksDataException e) throws HyracksDataException {