Revert "[ASTERIXDB-3417][STO] Block all I/Os while the clean operation is running"

This reverts commit 5aa6c089bd968ea2465596c1c56c898d4c6d4656.

Reason for revert: Caused deadlock

Change-Id: I5df4941481d92eb4805689bd24e2a148ec039190
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18112
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 091712b..070fe65 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -87,8 +87,10 @@
     DatasetInfo getDatasetInfo(int datasetID);
 
     /**
-     * @param datasetId  the dataset id to be flushed.
-     * @param asyncFlush a flag indicating whether to wait for the flush to complete or not.
+     * @param datasetId
+     *            the dataset id to be flushed.
+     * @param asyncFlush
+     *            a flag indicating whether to wait for the flush to complete or not.
      * @throws HyracksDataException
      */
     void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException;
@@ -170,27 +172,12 @@
     void waitForIO(IReplicationStrategy replicationStrategy, int partition) throws HyracksDataException;
 
     /**
-     * Waits for all ongoing IO operations on all open datasets and atomically performs the provided {@code operation}
-     * on each opened index before allowing any I/Os to go through.
-     * <p>
-     * <b>NOTE: This maybe a synchronized call</b>
-     *
-     * @param replicationStrategy replication strategy
-     * @param partition           partition to perform the required operation against
-     * @param operation           operation to perform
-     */
-
-    void waitForIOAndPerform(IReplicationStrategy replicationStrategy, int partition, IIOBlockingOperation operation)
-            throws HyracksDataException;
-
-    /**
      * @return the current datasets io stats
      */
     StorageIOStats getDatasetsIOStats();
 
     /**
      * Closes {@code resourcePath} if open
-     *
      * @param resourcePath
      * @throws HyracksDataException
      */
@@ -198,7 +185,6 @@
 
     /**
      * Removes all memory references of {@code partition}
-     *
      * @param partitionId
      */
     void closePartition(int partitionId);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IIOBlockingOperation.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IIOBlockingOperation.java
deleted file mode 100644
index e2f4c91..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IIOBlockingOperation.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.api;
-
-import java.util.Collection;
-
-import org.apache.asterix.common.context.DatasetInfo;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
-import org.apache.asterix.common.context.IndexInfo;
-import org.apache.asterix.common.replication.IReplicationStrategy;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * An {@link DatasetLifecycleManager#waitForIOAndPerform(IReplicationStrategy, int, IIOBlockingOperation)} operation,
- * which can be executed while the I/O is blocked for each open {@link DatasetInfo}
- */
-public interface IIOBlockingOperation {
-
-    /**
-     * Prepares for calling {@link #perform(Collection)} on the provided {@code partition}.
-     */
-    void beforeOperation() throws HyracksDataException;
-
-    /**
-     * Performs the required operations. The operation will be performed in a {@code synchronize} block on
-     * {@link DatasetInfo}, which would block all operations on the dataset
-     *
-     * @param indexes to perform the operation against
-     * @see DatasetInfo#waitForIOAndPerform(int, IIOBlockingOperation)
-     */
-    void perform(Collection<IndexInfo> indexes) throws HyracksDataException;
-
-    /**
-     * After calling {@link #perform(Collection)}, this should be invoked to perform any necessary clean up
-     */
-    void afterOperation() throws HyracksDataException;
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 9064db5..87a3c2f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -24,7 +24,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.asterix.common.api.IIOBlockingOperation;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
@@ -261,7 +260,7 @@
         }
     }
 
-    public void waitForIOAndPerform(int partition, IIOBlockingOperation operation) throws HyracksDataException {
+    public void waitForIO(int partition) throws HyracksDataException {
         logManager.log(waitLog);
         synchronized (this) {
             while (partitionPendingIO.getOrDefault(partition, 0) > 0) {
@@ -272,13 +271,6 @@
                     throw HyracksDataException.create(e);
                 }
             }
-
-            Set<IndexInfo> indexes = partitionIndexes.get(partition);
-            if (indexes != null) {
-                // Perform the required operation
-                operation.perform(indexes);
-            }
-
             if (partitionPendingIO.getOrDefault(partition, 0) < 0) {
                 LOGGER.error("number of IO operations cannot be negative for dataset {}, partition {}", this,
                         partition);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index e8ead2b..07801a9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -31,7 +31,6 @@
 import java.util.function.Predicate;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.api.IIOBlockingOperation;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.dataflow.LSMIndexUtil;
@@ -572,39 +571,11 @@
     public void waitForIO(IReplicationStrategy replicationStrategy, int partition) throws HyracksDataException {
         for (DatasetResource dsr : datasets.values()) {
             if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
-                // Do a simple wait without any operation
-                dsr.getDatasetInfo().waitForIOAndPerform(partition, NoOpBlockingIOOperation.INSTANCE);
+                dsr.getDatasetInfo().waitForIO(partition);
             }
         }
     }
 
-    /**
-     * Waits for all ongoing IO operations on all open datasets and atomically performs the provided {@code operation}
-     * on each opened index before allowing any I/Os to go through.
-     * <p>
-     * <b>NOTE: this is a synchronized call to prevent activating new indexes (i.e., modifying {@link #datasets})</b>
-     *
-     * @param replicationStrategy replication strategy
-     * @param partition           partition to perform the required operation against
-     * @param operation           operation to perform
-     */
-    @Override
-    public synchronized void waitForIOAndPerform(IReplicationStrategy replicationStrategy, int partition,
-            IIOBlockingOperation operation) throws HyracksDataException {
-        // Signal the operation will be performed
-        operation.beforeOperation();
-
-        for (DatasetResource dsr : datasets.values()) {
-            if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
-                // Wait for all I/Os and then perform the requested operation
-                dsr.getDatasetInfo().waitForIOAndPerform(partition, operation);
-            }
-        }
-
-        // Signal the operation has been performed
-        operation.afterOperation();
-    }
-
     @Override
     public StorageIOStats getDatasetsIOStats() {
         StorageIOStats stats = new StorageIOStats();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/NoOpBlockingIOOperation.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/NoOpBlockingIOOperation.java
deleted file mode 100644
index ad42f7a..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/NoOpBlockingIOOperation.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.context;
-
-import java.util.Collection;
-
-import org.apache.asterix.common.api.IIOBlockingOperation;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public final class NoOpBlockingIOOperation implements IIOBlockingOperation {
-    public static final NoOpBlockingIOOperation INSTANCE = new NoOpBlockingIOOperation();
-
-    private NoOpBlockingIOOperation() {
-    }
-
-    @Override
-    public void beforeOperation() throws HyracksDataException {
-        // NoOp
-    }
-
-    @Override
-    public void perform(Collection<IndexInfo> indexes) throws HyracksDataException {
-        // NoOp
-    }
-
-    @Override
-    public void afterOperation() throws HyracksDataException {
-        // NoOp
-    }
-}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/CleanupBlockingIOOperation.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/CleanupBlockingIOOperation.java
deleted file mode 100644
index 0790a22..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/CleanupBlockingIOOperation.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.resource;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.asterix.common.api.IIOBlockingOperation;
-import org.apache.asterix.common.context.IndexInfo;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IIOManager;
-
-class CleanupBlockingIOOperation implements IIOBlockingOperation {
-    private final int partition;
-    private final PersistentLocalResourceRepository localRepository;
-    private final IIOManager ioManager;
-    private final Set<FileReference> cleanedIndexes;
-
-    public CleanupBlockingIOOperation(int partition, PersistentLocalResourceRepository localRepository,
-            IIOManager ioManager) {
-        this.partition = partition;
-        this.localRepository = localRepository;
-        this.ioManager = ioManager;
-        cleanedIndexes = new HashSet<>();
-    }
-
-    @Override
-    public void beforeOperation() throws HyracksDataException {
-        // NoOp
-    }
-
-    /**
-     * Clean all active indexes while the DatasetInfo is synchronized
-     *
-     * @param indexes active indexes to clean
-     */
-    @Override
-    public void perform(Collection<IndexInfo> indexes) throws HyracksDataException {
-        for (IndexInfo indexInfo : indexes) {
-            FileReference index = ioManager.resolve(indexInfo.getLocalResource().getPath());
-            localRepository.cleanupIndex(index);
-            cleanedIndexes.add(index);
-        }
-    }
-
-    /**
-     * Clean all inactive indexes while the DatasetLifeCycleManager is synchronized
-     */
-    @Override
-    public void afterOperation() throws HyracksDataException {
-        Set<FileReference> indexes = localRepository.getPartitionIndexes(partition);
-        for (FileReference index : indexes) {
-            if (!cleanedIndexes.contains(index)) {
-                localRepository.cleanupIndex(index);
-            }
-        }
-    }
-
-}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index d0b4aa1..39a8841 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -561,30 +561,24 @@
     public void cleanup(int partition) throws HyracksDataException {
         beforeReadAccess();
         try {
-            CleanupBlockingIOOperation cleanupOp = new CleanupBlockingIOOperation(partition, this, ioManager);
-            datasetLifecycleManager.waitForIOAndPerform(AllDatasetsReplicationStrategy.INSTANCE, partition, cleanupOp);
+            datasetLifecycleManager.waitForIO(AllDatasetsReplicationStrategy.INSTANCE, partition);
+            final Set<FileReference> partitionIndexes = getPartitionIndexes(partition);
+            try {
+                for (FileReference index : partitionIndexes) {
+                    deleteIndexMaskedFiles(index);
+                    if (isValidIndex(index)) {
+                        deleteIndexInvalidComponents(index);
+                    }
+                }
+            } catch (IOException | ParseException e) {
+                throw HyracksDataException.create(e);
+            }
         } finally {
             clearResourcesCache();
             afterReadAccess();
         }
     }
 
-    /**
-     * This will be invoked by {@link CleanupBlockingIOOperation}
-     *
-     * @param index to clean
-     */
-    void cleanupIndex(FileReference index) throws HyracksDataException {
-        try {
-            deleteIndexMaskedFiles(index);
-            if (isValidIndex(index)) {
-                deleteIndexInvalidComponents(index);
-            }
-        } catch (IOException | ParseException e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
     public List<ResourceStorageStats> getStorageStats() throws HyracksDataException {
         beforeReadAccess();
         try {