[ASTERIXDB-3253][STO] Fix deleting uncached files
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
When deleting uncached files, we should account
(correctly) what files were deleted from the
cloud storage and were not cached locally.
Change-Id: Ie72121593acc6f53f9cfb87e54c17577e8633df7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17749
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 0611688..6973b7b 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -31,6 +31,7 @@
import java.util.Set;
import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
+import org.apache.asterix.cloud.bulk.NoOpDeleteBulkCallBack;
import org.apache.asterix.cloud.clients.CloudClientProvider;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.util.CloudFileUtil;
@@ -199,7 +200,7 @@
@Override
public IIOBulkOperation createDeleteBulkOperation() {
- return new DeleteBulkCloudOperation(localIoManager, bucket, cloudClient);
+ return new DeleteBulkCloudOperation(localIoManager, bucket, cloudClient, NoOpDeleteBulkCallBack.INSTANCE);
}
@Override
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 94fd5f8..01f684b 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -25,6 +25,7 @@
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
import org.apache.asterix.cloud.lazy.accessor.ILazyAccessor;
import org.apache.asterix.cloud.lazy.accessor.ILazyAccessorReplacer;
import org.apache.asterix.cloud.lazy.accessor.InitialCloudAccessor;
@@ -34,6 +35,7 @@
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOBulkOperation;
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.control.nc.io.IOManager;
@@ -112,6 +114,12 @@
* IIOManager functions
* ******************************************************************
*/
+
+ @Override
+ public IIOBulkOperation createDeleteBulkOperation() {
+ return new DeleteBulkCloudOperation(localIoManager, bucket, cloudClient, accessor.getBulkOperationCallBack());
+ }
+
@Override
public Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException {
return accessor.doList(dir, filter);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
index f36d594..3135624 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
@@ -24,7 +24,7 @@
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.control.nc.io.bulk.DeleteBulkOperation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -33,26 +33,30 @@
private static final Logger LOGGER = LogManager.getLogger();
private final String bucket;
private final ICloudClient cloudClient;
+ private final IBulkOperationCallBack callBack;
- public DeleteBulkCloudOperation(IIOManager ioManager, String bucket, ICloudClient cloudClient) {
+ public DeleteBulkCloudOperation(IOManager ioManager, String bucket, ICloudClient cloudClient,
+ IBulkOperationCallBack callBack) {
super(ioManager);
this.bucket = bucket;
this.cloudClient = cloudClient;
+ this.callBack = callBack;
}
@Override
- public void performOperation() throws HyracksDataException {
+ public int performOperation() throws HyracksDataException {
/*
* TODO What about deleting multiple directories?
* Actually, is there a case where we delete multiple directories from the cloud?
*/
List<String> paths = fileReferences.stream().map(FileReference::getRelativePath).collect(Collectors.toList());
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Bulk deleting: {}", paths);
+ LOGGER.debug("Bulk deleting: local: {}, cloud: {}", fileReferences, paths);
}
cloudClient.deleteObjects(bucket, paths);
-
// Bulk delete locally as well
- super.performOperation();
+ int localDeletes = super.performOperation();
+ callBack.call(localDeletes, paths);
+ return paths.size();
}
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
new file mode 100644
index 0000000..14a0c4e
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.bulk;
+
+import java.util.Collection;
+
+public interface IBulkOperationCallBack {
+ void call(int numberOfAffectedLocalFiles, Collection<String> paths);
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java
new file mode 100644
index 0000000..c877be2
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.bulk;
+
+import java.util.Collection;
+
+public class NoOpDeleteBulkCallBack implements IBulkOperationCallBack {
+ public static final IBulkOperationCallBack INSTANCE = new NoOpDeleteBulkCallBack();
+
+ private NoOpDeleteBulkCallBack() {
+ }
+
+ @Override
+ public void call(int numberOfAffectedLocalFiles, Collection<String> paths) {
+ // NoOp
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
index efedd64..8f803a0 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
@@ -22,6 +22,7 @@
import java.util.Set;
import org.apache.asterix.cloud.CloudFileHandle;
+import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
@@ -29,6 +30,8 @@
public interface ILazyAccessor {
boolean isLocalAccessor();
+ IBulkOperationCallBack getBulkOperationCallBack();
+
void doOnOpen(CloudFileHandle fileHandle, IIOManager.FileReadWriteMode rwMode, IIOManager.FileSyncMode syncMode)
throws HyracksDataException;
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
index 5715f43..378cf03 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
@@ -22,6 +22,8 @@
import java.util.Set;
import org.apache.asterix.cloud.CloudFileHandle;
+import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
+import org.apache.asterix.cloud.bulk.NoOpDeleteBulkCallBack;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
@@ -43,6 +45,11 @@
}
@Override
+ public IBulkOperationCallBack getBulkOperationCallBack() {
+ return NoOpDeleteBulkCallBack.INSTANCE;
+ }
+
+ @Override
public void doOnOpen(CloudFileHandle fileHandle, IIOManager.FileReadWriteMode rwMode,
IIOManager.FileSyncMode syncMode) throws HyracksDataException {
// NoOp
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
index f30bf82..19873e8 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -23,9 +23,11 @@
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.asterix.cloud.CloudFileHandle;
import org.apache.asterix.cloud.WriteBufferProvider;
+import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.util.CloudFileUtil;
import org.apache.asterix.common.utils.StoragePathUtil;
@@ -46,6 +48,7 @@
private final AtomicInteger numberOfUncachedFiles;
private final WriteBufferProvider writeBufferProvider;
private final ILazyAccessorReplacer replacer;
+ private final IBulkOperationCallBack callBack;
public ReplaceableCloudAccessor(ICloudClient cloudClient, String bucket, IOManager localIoManager,
Set<Integer> partitions, int numberOfUncachedFiles, WriteBufferProvider writeBufferProvider,
@@ -55,6 +58,10 @@
this.numberOfUncachedFiles = new AtomicInteger(numberOfUncachedFiles);
this.writeBufferProvider = writeBufferProvider;
this.replacer = replacer;
+ this.callBack = (numberOfAffectedLocalFiles, paths) -> {
+ int totalUncached = paths.size() - numberOfAffectedLocalFiles;
+ replaceAccessor(this.numberOfUncachedFiles.addAndGet(-totalUncached));
+ };
}
@Override
@@ -63,6 +70,11 @@
}
@Override
+ public IBulkOperationCallBack getBulkOperationCallBack() {
+ return callBack;
+ }
+
+ @Override
public void doOnOpen(CloudFileHandle fileHandle, IIOManager.FileReadWriteMode rwMode,
IIOManager.FileSyncMode syncMode) throws HyracksDataException {
FileReference fileRef = fileHandle.getFileReference();
@@ -148,8 +160,17 @@
// Never delete the storage dir in cloud storage
int numberOfCloudDeletes = doCloudDelete(fileReference);
// check local
- if (numberOfCloudDeletes > 0 && localIoManager.exists(fileReference)) {
- int numberOfLocalDeletes = fileReference.getFile().isFile() ? 1 : localIoManager.list(fileReference).size();
+ if (numberOfCloudDeletes > 0) {
+ int numberOfLocalDeletes;
+ if (numberOfCloudDeletes == 1) {
+ // file delete
+ numberOfLocalDeletes = localIoManager.exists(fileReference) ? 1 : 0;
+ } else {
+ // directory delete
+ Set<String> localToBeDeleted = localIoManager.list(fileReference).stream()
+ .map(FileReference::getRelativePath).collect(Collectors.toSet());
+ numberOfLocalDeletes = localToBeDeleted.size();
+ }
// Decrement by number of cloud deletes that have no counterparts locally
decrementNumberOfUncachedFiles(numberOfCloudDeletes - numberOfLocalDeletes);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java
index 4ca7b9c..26c2789 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java
@@ -40,5 +40,5 @@
fileReferences.add(fileReference);
}
- public abstract void performOperation() throws HyracksDataException;
+ public abstract int performOperation() throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java
index 5ccfdd6..f1911de 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java
@@ -28,9 +28,12 @@
}
@Override
- public void performOperation() throws HyracksDataException {
+ public int performOperation() throws HyracksDataException {
+ int count = 0;
for (FileReference fileReference : fileReferences) {
+ count += ioManager.exists(fileReference) ? 1 : 0;
ioManager.delete(fileReference);
}
+ return count;
}
}