[ASTERIXDB-3253][STO] Fix deleting uncached files

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
When deleting uncached files, we should account
(correctly) what files were deleted from the
cloud storage and were not cached locally.

Change-Id: Ie72121593acc6f53f9cfb87e54c17577e8633df7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17749
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 0611688..6973b7b 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -31,6 +31,7 @@
 import java.util.Set;
 
 import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
+import org.apache.asterix.cloud.bulk.NoOpDeleteBulkCallBack;
 import org.apache.asterix.cloud.clients.CloudClientProvider;
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.cloud.util.CloudFileUtil;
@@ -199,7 +200,7 @@
 
     @Override
     public IIOBulkOperation createDeleteBulkOperation() {
-        return new DeleteBulkCloudOperation(localIoManager, bucket, cloudClient);
+        return new DeleteBulkCloudOperation(localIoManager, bucket, cloudClient, NoOpDeleteBulkCallBack.INSTANCE);
     }
 
     @Override
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 94fd5f8..01f684b 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -25,6 +25,7 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
 import org.apache.asterix.cloud.lazy.accessor.ILazyAccessor;
 import org.apache.asterix.cloud.lazy.accessor.ILazyAccessorReplacer;
 import org.apache.asterix.cloud.lazy.accessor.InitialCloudAccessor;
@@ -34,6 +35,7 @@
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOBulkOperation;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
@@ -112,6 +114,12 @@
      * IIOManager functions
      * ******************************************************************
      */
+
+    @Override
+    public IIOBulkOperation createDeleteBulkOperation() {
+        return new DeleteBulkCloudOperation(localIoManager, bucket, cloudClient, accessor.getBulkOperationCallBack());
+    }
+
     @Override
     public Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException {
         return accessor.doList(dir, filter);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
index f36d594..3135624 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
@@ -24,7 +24,7 @@
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.control.nc.io.bulk.DeleteBulkOperation;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -33,26 +33,30 @@
     private static final Logger LOGGER = LogManager.getLogger();
     private final String bucket;
     private final ICloudClient cloudClient;
+    private final IBulkOperationCallBack callBack;
 
-    public DeleteBulkCloudOperation(IIOManager ioManager, String bucket, ICloudClient cloudClient) {
+    public DeleteBulkCloudOperation(IOManager ioManager, String bucket, ICloudClient cloudClient,
+            IBulkOperationCallBack callBack) {
         super(ioManager);
         this.bucket = bucket;
         this.cloudClient = cloudClient;
+        this.callBack = callBack;
     }
 
     @Override
-    public void performOperation() throws HyracksDataException {
+    public int performOperation() throws HyracksDataException {
         /*
          * TODO What about deleting multiple directories?
          *      Actually, is there a case where we delete multiple directories from the cloud?
          */
         List<String> paths = fileReferences.stream().map(FileReference::getRelativePath).collect(Collectors.toList());
         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Bulk deleting: {}", paths);
+            LOGGER.debug("Bulk deleting: local: {}, cloud: {}", fileReferences, paths);
         }
         cloudClient.deleteObjects(bucket, paths);
-
         // Bulk delete locally as well
-        super.performOperation();
+        int localDeletes = super.performOperation();
+        callBack.call(localDeletes, paths);
+        return paths.size();
     }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
new file mode 100644
index 0000000..14a0c4e
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.bulk;
+
+import java.util.Collection;
+
+public interface IBulkOperationCallBack {
+    void call(int numberOfAffectedLocalFiles, Collection<String> paths);
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java
new file mode 100644
index 0000000..c877be2
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.bulk;
+
+import java.util.Collection;
+
+public class NoOpDeleteBulkCallBack implements IBulkOperationCallBack {
+    public static final IBulkOperationCallBack INSTANCE = new NoOpDeleteBulkCallBack();
+
+    private NoOpDeleteBulkCallBack() {
+    }
+
+    @Override
+    public void call(int numberOfAffectedLocalFiles, Collection<String> paths) {
+        // NoOp
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
index efedd64..8f803a0 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
@@ -22,6 +22,7 @@
 import java.util.Set;
 
 import org.apache.asterix.cloud.CloudFileHandle;
+import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
@@ -29,6 +30,8 @@
 public interface ILazyAccessor {
     boolean isLocalAccessor();
 
+    IBulkOperationCallBack getBulkOperationCallBack();
+
     void doOnOpen(CloudFileHandle fileHandle, IIOManager.FileReadWriteMode rwMode, IIOManager.FileSyncMode syncMode)
             throws HyracksDataException;
 
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
index 5715f43..378cf03 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
@@ -22,6 +22,8 @@
 import java.util.Set;
 
 import org.apache.asterix.cloud.CloudFileHandle;
+import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
+import org.apache.asterix.cloud.bulk.NoOpDeleteBulkCallBack;
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -43,6 +45,11 @@
     }
 
     @Override
+    public IBulkOperationCallBack getBulkOperationCallBack() {
+        return NoOpDeleteBulkCallBack.INSTANCE;
+    }
+
+    @Override
     public void doOnOpen(CloudFileHandle fileHandle, IIOManager.FileReadWriteMode rwMode,
             IIOManager.FileSyncMode syncMode) throws HyracksDataException {
         // NoOp
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
index f30bf82..19873e8 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -23,9 +23,11 @@
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.cloud.CloudFileHandle;
 import org.apache.asterix.cloud.WriteBufferProvider;
+import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.cloud.util.CloudFileUtil;
 import org.apache.asterix.common.utils.StoragePathUtil;
@@ -46,6 +48,7 @@
     private final AtomicInteger numberOfUncachedFiles;
     private final WriteBufferProvider writeBufferProvider;
     private final ILazyAccessorReplacer replacer;
+    private final IBulkOperationCallBack callBack;
 
     public ReplaceableCloudAccessor(ICloudClient cloudClient, String bucket, IOManager localIoManager,
             Set<Integer> partitions, int numberOfUncachedFiles, WriteBufferProvider writeBufferProvider,
@@ -55,6 +58,10 @@
         this.numberOfUncachedFiles = new AtomicInteger(numberOfUncachedFiles);
         this.writeBufferProvider = writeBufferProvider;
         this.replacer = replacer;
+        this.callBack = (numberOfAffectedLocalFiles, paths) -> {
+            int totalUncached = paths.size() - numberOfAffectedLocalFiles;
+            replaceAccessor(this.numberOfUncachedFiles.addAndGet(-totalUncached));
+        };
     }
 
     @Override
@@ -63,6 +70,11 @@
     }
 
     @Override
+    public IBulkOperationCallBack getBulkOperationCallBack() {
+        return callBack;
+    }
+
+    @Override
     public void doOnOpen(CloudFileHandle fileHandle, IIOManager.FileReadWriteMode rwMode,
             IIOManager.FileSyncMode syncMode) throws HyracksDataException {
         FileReference fileRef = fileHandle.getFileReference();
@@ -148,8 +160,17 @@
         // Never delete the storage dir in cloud storage
         int numberOfCloudDeletes = doCloudDelete(fileReference);
         // check local
-        if (numberOfCloudDeletes > 0 && localIoManager.exists(fileReference)) {
-            int numberOfLocalDeletes = fileReference.getFile().isFile() ? 1 : localIoManager.list(fileReference).size();
+        if (numberOfCloudDeletes > 0) {
+            int numberOfLocalDeletes;
+            if (numberOfCloudDeletes == 1) {
+                // file delete
+                numberOfLocalDeletes = localIoManager.exists(fileReference) ? 1 : 0;
+            } else {
+                // directory delete
+                Set<String> localToBeDeleted = localIoManager.list(fileReference).stream()
+                        .map(FileReference::getRelativePath).collect(Collectors.toSet());
+                numberOfLocalDeletes = localToBeDeleted.size();
+            }
             // Decrement by number of cloud deletes that have no counterparts locally
             decrementNumberOfUncachedFiles(numberOfCloudDeletes - numberOfLocalDeletes);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java
index 4ca7b9c..26c2789 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java
@@ -40,5 +40,5 @@
         fileReferences.add(fileReference);
     }
 
-    public abstract void performOperation() throws HyracksDataException;
+    public abstract int performOperation() throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java
index 5ccfdd6..f1911de 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java
@@ -28,9 +28,12 @@
     }
 
     @Override
-    public void performOperation() throws HyracksDataException {
+    public int performOperation() throws HyracksDataException {
+        int count = 0;
         for (FileReference fileReference : fileReferences) {
+            count += ioManager.exists(fileReference) ? 1 : 0;
             ioManager.delete(fileReference);
         }
+        return count;
     }
 }