[ASTERIXDB-1954][STO][RT] Add Index Drop Options

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Add options to allow drop index to ignore
  index does not exist and retry on index in-use.
- Add test case for new index drop options.

Change-Id: Id6f8fa52489bbe64d2f48c5c3d0a07be60f30b1b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2121
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index ceaf4cf..9e34d70 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -19,8 +19,10 @@
 package org.apache.asterix.utils;
 
 import static org.apache.asterix.app.translator.QueryTranslator.abort;
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
 
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -338,7 +340,8 @@
         List<JobSpecification> jobs = new ArrayList<>();
         List<Index> indexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName());
         for (Index index : indexes) {
-            jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset, true));
+            jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset,
+                    EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE)));
         }
         for (JobSpecification jobSpec : jobs) {
             JobUtils.runJob(hcc, jobSpec, true);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
new file mode 100644
index 0000000..2bac49e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.storage;
+
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorNodePushable;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IndexDropOperatorNodePushableTest {
+
+    private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+    private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+            new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+    private static final ARecordType META_TYPE = null;
+    private static final int[] KEY_INDEXES = { 0 };
+    private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+    private static final int DATASET_ID = 101;
+    private static final String DATAVERSE_NAME = "TestDV";
+    private static final String DATASET_NAME = "TestDS";
+    private static final String DATA_TYPE_NAME = "DUMMY";
+    private static final String NODE_GROUP_NAME = "DEFAULT";
+    private final AtomicBoolean dropFailed = new AtomicBoolean(false);
+
+    @Before
+    public void setUp() throws Exception {
+        System.out.println("SetUp: ");
+        TestHelper.deleteExistingInstanceFiles();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        System.out.println("TearDown");
+        TestHelper.deleteExistingInstanceFiles();
+    }
+
+    @Test
+    public void dropOptionsTest() throws Exception {
+        TestNodeController nc = new TestNodeController(null, false);
+        try {
+            nc.init();
+            StorageComponentProvider storageManager = new StorageComponentProvider();
+            List<List<String>> partitioningKeys = new ArrayList<>();
+            partitioningKeys.add(Collections.singletonList("key"));
+            Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                    NoMergePolicyFactory.NAME, null,
+                    new InternalDatasetDetails(null, InternalDatasetDetails.PartitioningStrategy.HASH, partitioningKeys,
+                            null, null, null, false, null, false), null, DatasetConfig.DatasetType.INTERNAL, DATASET_ID,
+                    0);
+            // create dataset
+            TestNodeController.PrimaryIndexInfo indexInfo =
+                    nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, KEY_INDEXES,
+                            KEY_INDICATORS_LIST);
+            IndexDataflowHelperFactory helperFactory =
+                    new IndexDataflowHelperFactory(nc.getStorageManager(), indexInfo.getFileSplitProvider());
+            IHyracksTaskContext ctx = nc.createTestContext(true);
+            IIndexDataflowHelper dataflowHelper = helperFactory.create(ctx.getJobletContext().getServiceContext(), 0);
+            dropInUse(ctx, helperFactory, dataflowHelper);
+            dropInUseWithWait(ctx, helperFactory, dataflowHelper);
+            dropNonExisting(ctx, helperFactory);
+            dropNonExistingWithIfExists(ctx, helperFactory);
+        } finally {
+            nc.deInit();
+        }
+    }
+
+    private void dropInUse(IHyracksTaskContext ctx, IndexDataflowHelperFactory helperFactory,
+            IIndexDataflowHelper dataflowHelper) throws Exception {
+        dropFailed.set(false);
+        // open the index to make it in-use
+        dataflowHelper.open();
+        // try to drop in-use index (should fail)
+        IndexDropOperatorNodePushable dropInUseOp =
+                new IndexDropOperatorNodePushable(helperFactory, EnumSet.noneOf(DropOption.class), ctx, 0);
+        try {
+            dropInUseOp.initialize();
+        } catch (HyracksDataException e) {
+            e.printStackTrace();
+            Assert.assertEquals(ErrorCode.CANNOT_DROP_IN_USE_INDEX, e.getErrorCode());
+            dropFailed.set(true);
+        }
+        Assert.assertTrue(dropFailed.get());
+    }
+
+    private void dropInUseWithWait(IHyracksTaskContext ctx, IndexDataflowHelperFactory helperFactory,
+            IIndexDataflowHelper dataflowHelper) throws Exception {
+        dropFailed.set(false);
+        // drop with option wait for in-use should be successful once the index is closed
+        final IndexDropOperatorNodePushable dropWithWaitOp = new IndexDropOperatorNodePushable(helperFactory,
+                EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), ctx, 0);
+        Thread dropThread = new Thread(() -> {
+            try {
+                dropWithWaitOp.initialize();
+            } catch (HyracksDataException e) {
+                dropFailed.set(true);
+                e.printStackTrace();
+            }
+        });
+        dropThread.start();
+        // wait for the drop thread to start
+        while (dropThread.getState() == Thread.State.NEW) {
+            TimeUnit.MILLISECONDS.sleep(100);
+        }
+        // close the index to allow the drop to complete
+        dataflowHelper.close();
+        dropThread.join();
+        Assert.assertFalse(dropFailed.get());
+    }
+
+    private void dropNonExisting(IHyracksTaskContext ctx, IndexDataflowHelperFactory helperFactory) throws Exception {
+        dropFailed.set(false);
+        // Dropping non-existing index
+        IndexDropOperatorNodePushable dropNonExistingOp =
+                new IndexDropOperatorNodePushable(helperFactory, EnumSet.noneOf(DropOption.class), ctx, 0);
+        try {
+            dropNonExistingOp.initialize();
+        } catch (HyracksDataException e) {
+            e.printStackTrace();
+            Assert.assertEquals(ErrorCode.INDEX_DOES_NOT_EXIST, e.getErrorCode());
+            dropFailed.set(true);
+        }
+        Assert.assertTrue(dropFailed.get());
+    }
+
+    private void dropNonExistingWithIfExists(IHyracksTaskContext ctx, IndexDataflowHelperFactory helperFactory)
+            throws Exception {
+        // Dropping non-existing index with if exists option should be successful
+        dropFailed.set(false);
+        IndexDropOperatorNodePushable dropNonExistingWithIfExistsOp =
+                new IndexDropOperatorNodePushable(helperFactory, EnumSet.of(DropOption.IF_EXISTS), ctx, 0);
+        try {
+            dropNonExistingWithIfExistsOp.initialize();
+        } catch (HyracksDataException e) {
+            e.printStackTrace();
+            dropFailed.set(true);
+        }
+        Assert.assertFalse(dropFailed.get());
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index e79f002..c53af3e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -37,6 +37,7 @@
 import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -129,14 +130,19 @@
         IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
 
         if (dsr == null || iInfo == null) {
-            throw new HyracksDataException("Index with resource ID " + resourceID + " does not exist.");
+            throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST);
         }
 
         PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
         if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
-            throw new HyracksDataException("Cannot remove index while it is open. (Dataset reference count = "
-                    + iInfo.getReferenceCount() + ", Operation tracker number of active operations = "
-                    + opTracker.getNumActiveOperations() + ")");
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                final String logMsg = String.format(
+                        "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+                        resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+                LOGGER.severe(logMsg);
+            }
+            throw HyracksDataException
+                    .create(ErrorCode.CANNOT_DROP_IN_USE_INDEX, StoragePathUtil.getIndexNameFromPath(resourcePath));
         }
 
         // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 824e30b..0b9b94b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -129,4 +129,15 @@
             throw HyracksDataException.create(e);
         }
     }
+
+    /**
+     * Gets the index name part in the index relative path.
+     *
+     * @param path
+     * @return The index name
+     */
+    public static String getIndexNameFromPath(String path) {
+        int idx = path.lastIndexOf(DATASET_INDEX_NAME_SEPARATOR);
+        return idx != -1 ? path.substring(idx + DATASET_INDEX_NAME_SEPARATOR.length()) : path;
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 411f866..cc6923e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -18,7 +18,11 @@
  */
 package org.apache.asterix.metadata.utils;
 
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.*;
+
+import java.util.EnumSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.asterix.common.config.OptimizationConfUtil;
@@ -37,6 +41,7 @@
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
 
 public class IndexUtil {
 
@@ -103,14 +108,14 @@
             Dataset dataset) throws AlgebricksException {
         SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
                 .createIndexOperationsHelper(dataset, index, metadataProvider, physicalOptimizationConfig);
-        return secondaryIndexHelper.buildDropJobSpec(false);
+        return secondaryIndexHelper.buildDropJobSpec(EnumSet.noneOf(DropOption.class));
     }
 
     public static JobSpecification buildDropIndexJobSpec(Index index, MetadataProvider metadataProvider,
-            Dataset dataset, boolean failSilently) throws AlgebricksException {
+            Dataset dataset, Set<DropOption> options) throws AlgebricksException {
         SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
                 .createIndexOperationsHelper(dataset, index, metadataProvider, physicalOptimizationConfig);
-        return secondaryIndexHelper.buildDropJobSpec(failSilently);
+        return secondaryIndexHelper.buildDropJobSpec(options);
     }
 
     public static JobSpecification buildSecondaryIndexCreationJobSpec(Dataset dataset, Index index,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 8fc9ed7..98c47a2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -19,9 +19,13 @@
 
 package org.apache.asterix.metadata.utils;
 
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
+
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -170,7 +174,7 @@
 
     public abstract JobSpecification buildCompactJobSpec() throws AlgebricksException;
 
-    public abstract JobSpecification buildDropJobSpec(boolean failSilently) throws AlgebricksException;
+    public abstract JobSpecification buildDropJobSpec(Set<DropOption> options) throws AlgebricksException;
 
     protected void init() throws AlgebricksException {
         payloadSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 2dcab4f..b63ea16 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -19,6 +19,10 @@
 
 package org.apache.asterix.metadata.utils;
 
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
+
+import java.util.Set;
+
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -66,15 +70,14 @@
     }
 
     @Override
-    public JobSpecification buildDropJobSpec(boolean failSilently) throws AlgebricksException {
+    public JobSpecification buildDropJobSpec(Set<DropOption> options) throws AlgebricksException {
         JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                 metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
         IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
                 metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first);
         // The index drop operation should be persistent regardless of temp datasets or permanent dataset.
-        IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, dataflowHelperFactory,
-                failSilently);
+        IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, dataflowHelperFactory, options);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
                 splitsAndConstraint.second);
         spec.addRoot(btreeDrop);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 42da1d7..7926469 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -137,6 +137,8 @@
     public static final int PAGE_DOES_NOT_EXIST_IN_FILE = 101;
     public static final int VBC_ALREADY_OPEN = 102;
     public static final int VBC_ALREADY_CLOSED = 103;
+    public static final int INDEX_DOES_NOT_EXIST = 104;
+    public static final int CANNOT_DROP_IN_USE_INDEX = 105;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index bc83f8c..330cf1f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -120,5 +120,7 @@
 101 = Page %1$s does not exist in file %2$s
 102 = Failed to open virtual buffer cache since it is already open
 103 = Failed to close virtual buffer cache since it is already closed
+104 = Index does not exist
+105 = Cannot drop in-use index (%1$s)
 
 10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index af843d2..abfe0bb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -23,6 +23,7 @@
 import java.util.logging.Logger;
 
 import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
@@ -72,8 +73,7 @@
         // Get local resource
         LocalResource lr = getResource();
         if (lr == null) {
-            throw new HyracksDataException(
-                    "Index resource couldn't be found. Has it been created yet? Was it deleted?");
+            throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST);
         }
         IResource resource = lr.getResource();
         index = resource.createInstance(ctx);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
index 18c7107..032e758 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
@@ -19,6 +19,9 @@
 
 package org.apache.hyracks.storage.am.common.dataflow;
 
+import java.util.EnumSet;
+import java.util.Set;
+
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -28,25 +31,30 @@
 
 public class IndexDropOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
+    public enum DropOption {
+        IF_EXISTS,
+        WAIT_ON_IN_USE
+    }
+
     private static final long serialVersionUID = 1L;
     private final IIndexDataflowHelperFactory dataflowHelperFactory;
-    private final boolean failSilently;
+    private final Set<DropOption> options;
 
     public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
             IIndexDataflowHelperFactory dataflowHelperFactory) {
-        this(spec, dataflowHelperFactory, false);
+        this(spec, dataflowHelperFactory, EnumSet.noneOf(DropOption.class));
     }
 
     public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
-            IIndexDataflowHelperFactory dataflowHelperFactory, boolean failSilently) {
+            IIndexDataflowHelperFactory dataflowHelperFactory, Set<DropOption> options) {
         super(spec, 0, 0);
         this.dataflowHelperFactory = dataflowHelperFactory;
-        this.failSilently = failSilently;
+        this.options = options;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        return new IndexDropOperatorNodePushable(dataflowHelperFactory, failSilently, ctx, partition);
+        return new IndexDropOperatorNodePushable(dataflowHelperFactory, options, ctx, partition);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
index 7c2021b..ea1635a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
@@ -19,6 +19,17 @@
 
 package org.apache.hyracks.storage.am.common.dataflow;
 
+import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_DROP_IN_USE_INDEX;
+import static org.apache.hyracks.api.exceptions.ErrorCode.INDEX_DOES_NOT_EXIST;
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption.IF_EXISTS;
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption.WAIT_ON_IN_USE;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -27,17 +38,22 @@
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 
 public class IndexDropOperatorNodePushable extends AbstractOperatorNodePushable {
-    private final IIndexDataflowHelper indexHelper;
-    private final boolean failSliently;
 
-    public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, boolean failSilently,
+    private static final Logger LOGGER = Logger.getLogger(IndexDropOperatorNodePushable.class.getName());
+    private static final long DROP_ATTEMPT_WAIT_TIME_MILLIS = TimeUnit.SECONDS.toMillis(1);
+    private final IIndexDataflowHelper indexHelper;
+    private final Set<DropOption> options;
+    private long maxWaitTimeMillis = TimeUnit.SECONDS.toMillis(30);
+
+    public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, Set<DropOption> options,
             IHyracksTaskContext ctx, int partition) throws HyracksDataException {
         this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
-        this.failSliently = failSilently;
+        this.options = options;
     }
 
     @Override
     public void deinitialize() throws HyracksDataException {
+        // no op
     }
 
     @Override
@@ -52,16 +68,51 @@
 
     @Override
     public void initialize() throws HyracksDataException {
-        try {
-            indexHelper.destroy();
-        } catch (HyracksDataException e) {
-            if (!failSliently) {
+        dropIndex();
+    }
+
+    @Override
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        // no op
+    }
+
+    private void dropIndex() throws HyracksDataException {
+        while (true) {
+            try {
+                indexHelper.destroy();
+                return;
+            } catch (HyracksDataException e) {
+                if (isIgnorable(e)) {
+                    LOGGER.log(Level.INFO, e, () -> "Ignoring exception on drop");
+                    return;
+                }
+                if (canRetry(e)) {
+                    LOGGER.log(Level.INFO, e, () -> "Retrying drop on exception");
+                    continue;
+                }
                 throw e;
             }
         }
     }
 
-    @Override
-    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+    private boolean isIgnorable(HyracksDataException e) {
+        return e.getErrorCode() == INDEX_DOES_NOT_EXIST && options.contains(IF_EXISTS);
+    }
+
+    private boolean canRetry(HyracksDataException e) throws HyracksDataException {
+        if (e.getErrorCode() == CANNOT_DROP_IN_USE_INDEX && options.contains(WAIT_ON_IN_USE)) {
+            if (maxWaitTimeMillis <= 0) {
+                return false;
+            }
+            try {
+                TimeUnit.MILLISECONDS.sleep(DROP_ATTEMPT_WAIT_TIME_MILLIS);
+                maxWaitTimeMillis -= DROP_ATTEMPT_WAIT_TIME_MILLIS;
+                return true;
+            } catch (InterruptedException e1) {
+                Thread.currentThread().interrupt();
+                throw HyracksDataException.create(e1);
+            }
+        }
+        return false;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index de2f742..fe64e9f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -27,6 +27,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.common.IIndex;
@@ -223,12 +224,12 @@
     public void unregister(String resourcePath) throws HyracksDataException {
         IndexInfo info = indexInfos.get(resourcePath);
         if (info == null) {
-            throw new HyracksDataException("Index with resource name " + resourcePath + " does not exist.");
+            throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST);
         }
 
         if (info.referenceCount != 0) {
             indexInfos.put(resourcePath, info);
-            throw new HyracksDataException("Cannot remove index while it is open.");
+            throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX, resourcePath);
         }
 
         if (info.isOpen) {