[ASTERIXDB-1954][STO][RT] Add Index Drop Options
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add options to allow drop index to ignore
index does not exist and retry on index in-use.
- Add test case for new index drop options.
Change-Id: Id6f8fa52489bbe64d2f48c5c3d0a07be60f30b1b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2121
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index ceaf4cf..9e34d70 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -19,8 +19,10 @@
package org.apache.asterix.utils;
import static org.apache.asterix.app.translator.QueryTranslator.abort;
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -338,7 +340,8 @@
List<JobSpecification> jobs = new ArrayList<>();
List<Index> indexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName());
for (Index index : indexes) {
- jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset, true));
+ jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset,
+ EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE)));
}
for (JobSpecification jobSpec : jobs) {
JobUtils.runJob(hcc, jobSpec, true);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
new file mode 100644
index 0000000..2bac49e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.storage;
+
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorNodePushable;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IndexDropOperatorNodePushableTest {
+
+ private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+ private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+ new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+ private static final ARecordType META_TYPE = null;
+ private static final int[] KEY_INDEXES = { 0 };
+ private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+ private static final int DATASET_ID = 101;
+ private static final String DATAVERSE_NAME = "TestDV";
+ private static final String DATASET_NAME = "TestDS";
+ private static final String DATA_TYPE_NAME = "DUMMY";
+ private static final String NODE_GROUP_NAME = "DEFAULT";
+ private final AtomicBoolean dropFailed = new AtomicBoolean(false);
+
+ @Before
+ public void setUp() throws Exception {
+ System.out.println("SetUp: ");
+ TestHelper.deleteExistingInstanceFiles();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ System.out.println("TearDown");
+ TestHelper.deleteExistingInstanceFiles();
+ }
+
+ @Test
+ public void dropOptionsTest() throws Exception {
+ TestNodeController nc = new TestNodeController(null, false);
+ try {
+ nc.init();
+ StorageComponentProvider storageManager = new StorageComponentProvider();
+ List<List<String>> partitioningKeys = new ArrayList<>();
+ partitioningKeys.add(Collections.singletonList("key"));
+ Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+ NoMergePolicyFactory.NAME, null,
+ new InternalDatasetDetails(null, InternalDatasetDetails.PartitioningStrategy.HASH, partitioningKeys,
+ null, null, null, false, null, false), null, DatasetConfig.DatasetType.INTERNAL, DATASET_ID,
+ 0);
+ // create dataset
+ TestNodeController.PrimaryIndexInfo indexInfo =
+ nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, KEY_INDEXES,
+ KEY_INDICATORS_LIST);
+ IndexDataflowHelperFactory helperFactory =
+ new IndexDataflowHelperFactory(nc.getStorageManager(), indexInfo.getFileSplitProvider());
+ IHyracksTaskContext ctx = nc.createTestContext(true);
+ IIndexDataflowHelper dataflowHelper = helperFactory.create(ctx.getJobletContext().getServiceContext(), 0);
+ dropInUse(ctx, helperFactory, dataflowHelper);
+ dropInUseWithWait(ctx, helperFactory, dataflowHelper);
+ dropNonExisting(ctx, helperFactory);
+ dropNonExistingWithIfExists(ctx, helperFactory);
+ } finally {
+ nc.deInit();
+ }
+ }
+
+ private void dropInUse(IHyracksTaskContext ctx, IndexDataflowHelperFactory helperFactory,
+ IIndexDataflowHelper dataflowHelper) throws Exception {
+ dropFailed.set(false);
+ // open the index to make it in-use
+ dataflowHelper.open();
+ // try to drop in-use index (should fail)
+ IndexDropOperatorNodePushable dropInUseOp =
+ new IndexDropOperatorNodePushable(helperFactory, EnumSet.noneOf(DropOption.class), ctx, 0);
+ try {
+ dropInUseOp.initialize();
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ Assert.assertEquals(ErrorCode.CANNOT_DROP_IN_USE_INDEX, e.getErrorCode());
+ dropFailed.set(true);
+ }
+ Assert.assertTrue(dropFailed.get());
+ }
+
+ private void dropInUseWithWait(IHyracksTaskContext ctx, IndexDataflowHelperFactory helperFactory,
+ IIndexDataflowHelper dataflowHelper) throws Exception {
+ dropFailed.set(false);
+ // drop with option wait for in-use should be successful once the index is closed
+ final IndexDropOperatorNodePushable dropWithWaitOp = new IndexDropOperatorNodePushable(helperFactory,
+ EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), ctx, 0);
+ Thread dropThread = new Thread(() -> {
+ try {
+ dropWithWaitOp.initialize();
+ } catch (HyracksDataException e) {
+ dropFailed.set(true);
+ e.printStackTrace();
+ }
+ });
+ dropThread.start();
+ // wait for the drop thread to start
+ while (dropThread.getState() == Thread.State.NEW) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ // close the index to allow the drop to complete
+ dataflowHelper.close();
+ dropThread.join();
+ Assert.assertFalse(dropFailed.get());
+ }
+
+ private void dropNonExisting(IHyracksTaskContext ctx, IndexDataflowHelperFactory helperFactory) throws Exception {
+ dropFailed.set(false);
+ // Dropping non-existing index
+ IndexDropOperatorNodePushable dropNonExistingOp =
+ new IndexDropOperatorNodePushable(helperFactory, EnumSet.noneOf(DropOption.class), ctx, 0);
+ try {
+ dropNonExistingOp.initialize();
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ Assert.assertEquals(ErrorCode.INDEX_DOES_NOT_EXIST, e.getErrorCode());
+ dropFailed.set(true);
+ }
+ Assert.assertTrue(dropFailed.get());
+ }
+
+ private void dropNonExistingWithIfExists(IHyracksTaskContext ctx, IndexDataflowHelperFactory helperFactory)
+ throws Exception {
+ // Dropping non-existing index with if exists option should be successful
+ dropFailed.set(false);
+ IndexDropOperatorNodePushable dropNonExistingWithIfExistsOp =
+ new IndexDropOperatorNodePushable(helperFactory, EnumSet.of(DropOption.IF_EXISTS), ctx, 0);
+ try {
+ dropNonExistingWithIfExistsOp.initialize();
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ dropFailed.set(true);
+ }
+ Assert.assertFalse(dropFailed.get());
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index e79f002..c53af3e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -37,6 +37,7 @@
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -129,14 +130,19 @@
IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
if (dsr == null || iInfo == null) {
- throw new HyracksDataException("Index with resource ID " + resourceID + " does not exist.");
+ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST);
}
PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
- throw new HyracksDataException("Cannot remove index while it is open. (Dataset reference count = "
- + iInfo.getReferenceCount() + ", Operation tracker number of active operations = "
- + opTracker.getNumActiveOperations() + ")");
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ final String logMsg = String.format(
+ "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+ resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+ LOGGER.severe(logMsg);
+ }
+ throw HyracksDataException
+ .create(ErrorCode.CANNOT_DROP_IN_USE_INDEX, StoragePathUtil.getIndexNameFromPath(resourcePath));
}
// TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 824e30b..0b9b94b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -129,4 +129,15 @@
throw HyracksDataException.create(e);
}
}
+
+ /**
+ * Gets the index name part in the index relative path.
+ *
+ * @param path
+ * @return The index name
+ */
+ public static String getIndexNameFromPath(String path) {
+ int idx = path.lastIndexOf(DATASET_INDEX_NAME_SEPARATOR);
+ return idx != -1 ? path.substring(idx + DATASET_INDEX_NAME_SEPARATOR.length()) : path;
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 411f866..cc6923e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -18,7 +18,11 @@
*/
package org.apache.asterix.metadata.utils;
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.*;
+
+import java.util.EnumSet;
import java.util.List;
+import java.util.Set;
import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.config.OptimizationConfUtil;
@@ -37,6 +41,7 @@
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.job.IJobletEventListenerFactory;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
public class IndexUtil {
@@ -103,14 +108,14 @@
Dataset dataset) throws AlgebricksException {
SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
.createIndexOperationsHelper(dataset, index, metadataProvider, physicalOptimizationConfig);
- return secondaryIndexHelper.buildDropJobSpec(false);
+ return secondaryIndexHelper.buildDropJobSpec(EnumSet.noneOf(DropOption.class));
}
public static JobSpecification buildDropIndexJobSpec(Index index, MetadataProvider metadataProvider,
- Dataset dataset, boolean failSilently) throws AlgebricksException {
+ Dataset dataset, Set<DropOption> options) throws AlgebricksException {
SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
.createIndexOperationsHelper(dataset, index, metadataProvider, physicalOptimizationConfig);
- return secondaryIndexHelper.buildDropJobSpec(failSilently);
+ return secondaryIndexHelper.buildDropJobSpec(options);
}
public static JobSpecification buildSecondaryIndexCreationJobSpec(Dataset dataset, Index index,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 8fc9ed7..98c47a2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -19,9 +19,13 @@
package org.apache.asterix.metadata.utils;
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
+
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -170,7 +174,7 @@
public abstract JobSpecification buildCompactJobSpec() throws AlgebricksException;
- public abstract JobSpecification buildDropJobSpec(boolean failSilently) throws AlgebricksException;
+ public abstract JobSpecification buildDropJobSpec(Set<DropOption> options) throws AlgebricksException;
protected void init() throws AlgebricksException {
payloadSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 2dcab4f..b63ea16 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -19,6 +19,10 @@
package org.apache.asterix.metadata.utils;
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
+
+import java.util.Set;
+
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
@@ -66,15 +70,14 @@
}
@Override
- public JobSpecification buildDropJobSpec(boolean failSilently) throws AlgebricksException {
+ public JobSpecification buildDropJobSpec(Set<DropOption> options) throws AlgebricksException {
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first);
// The index drop operation should be persistent regardless of temp datasets or permanent dataset.
- IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, dataflowHelperFactory,
- failSilently);
+ IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, dataflowHelperFactory, options);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
splitsAndConstraint.second);
spec.addRoot(btreeDrop);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 42da1d7..7926469 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -137,6 +137,8 @@
public static final int PAGE_DOES_NOT_EXIST_IN_FILE = 101;
public static final int VBC_ALREADY_OPEN = 102;
public static final int VBC_ALREADY_CLOSED = 103;
+ public static final int INDEX_DOES_NOT_EXIST = 104;
+ public static final int CANNOT_DROP_IN_USE_INDEX = 105;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index bc83f8c..330cf1f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -120,5 +120,7 @@
101 = Page %1$s does not exist in file %2$s
102 = Failed to open virtual buffer cache since it is already open
103 = Failed to close virtual buffer cache since it is already closed
+104 = Index does not exist
+105 = Cannot drop in-use index (%1$s)
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index af843d2..abfe0bb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -23,6 +23,7 @@
import java.util.logging.Logger;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
@@ -72,8 +73,7 @@
// Get local resource
LocalResource lr = getResource();
if (lr == null) {
- throw new HyracksDataException(
- "Index resource couldn't be found. Has it been created yet? Was it deleted?");
+ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST);
}
IResource resource = lr.getResource();
index = resource.createInstance(ctx);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
index 18c7107..032e758 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
@@ -19,6 +19,9 @@
package org.apache.hyracks.storage.am.common.dataflow;
+import java.util.EnumSet;
+import java.util.Set;
+
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -28,25 +31,30 @@
public class IndexDropOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ public enum DropOption {
+ IF_EXISTS,
+ WAIT_ON_IN_USE
+ }
+
private static final long serialVersionUID = 1L;
private final IIndexDataflowHelperFactory dataflowHelperFactory;
- private final boolean failSilently;
+ private final Set<DropOption> options;
public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
IIndexDataflowHelperFactory dataflowHelperFactory) {
- this(spec, dataflowHelperFactory, false);
+ this(spec, dataflowHelperFactory, EnumSet.noneOf(DropOption.class));
}
public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
- IIndexDataflowHelperFactory dataflowHelperFactory, boolean failSilently) {
+ IIndexDataflowHelperFactory dataflowHelperFactory, Set<DropOption> options) {
super(spec, 0, 0);
this.dataflowHelperFactory = dataflowHelperFactory;
- this.failSilently = failSilently;
+ this.options = options;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- return new IndexDropOperatorNodePushable(dataflowHelperFactory, failSilently, ctx, partition);
+ return new IndexDropOperatorNodePushable(dataflowHelperFactory, options, ctx, partition);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
index 7c2021b..ea1635a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
@@ -19,6 +19,17 @@
package org.apache.hyracks.storage.am.common.dataflow;
+import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_DROP_IN_USE_INDEX;
+import static org.apache.hyracks.api.exceptions.ErrorCode.INDEX_DOES_NOT_EXIST;
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption.IF_EXISTS;
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption.WAIT_ON_IN_USE;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -27,17 +38,22 @@
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
public class IndexDropOperatorNodePushable extends AbstractOperatorNodePushable {
- private final IIndexDataflowHelper indexHelper;
- private final boolean failSliently;
- public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, boolean failSilently,
+ private static final Logger LOGGER = Logger.getLogger(IndexDropOperatorNodePushable.class.getName());
+ private static final long DROP_ATTEMPT_WAIT_TIME_MILLIS = TimeUnit.SECONDS.toMillis(1);
+ private final IIndexDataflowHelper indexHelper;
+ private final Set<DropOption> options;
+ private long maxWaitTimeMillis = TimeUnit.SECONDS.toMillis(30);
+
+ public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, Set<DropOption> options,
IHyracksTaskContext ctx, int partition) throws HyracksDataException {
this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
- this.failSliently = failSilently;
+ this.options = options;
}
@Override
public void deinitialize() throws HyracksDataException {
+ // no op
}
@Override
@@ -52,16 +68,51 @@
@Override
public void initialize() throws HyracksDataException {
- try {
- indexHelper.destroy();
- } catch (HyracksDataException e) {
- if (!failSliently) {
+ dropIndex();
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ // no op
+ }
+
+ private void dropIndex() throws HyracksDataException {
+ while (true) {
+ try {
+ indexHelper.destroy();
+ return;
+ } catch (HyracksDataException e) {
+ if (isIgnorable(e)) {
+ LOGGER.log(Level.INFO, e, () -> "Ignoring exception on drop");
+ return;
+ }
+ if (canRetry(e)) {
+ LOGGER.log(Level.INFO, e, () -> "Retrying drop on exception");
+ continue;
+ }
throw e;
}
}
}
- @Override
- public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ private boolean isIgnorable(HyracksDataException e) {
+ return e.getErrorCode() == INDEX_DOES_NOT_EXIST && options.contains(IF_EXISTS);
+ }
+
+ private boolean canRetry(HyracksDataException e) throws HyracksDataException {
+ if (e.getErrorCode() == CANNOT_DROP_IN_USE_INDEX && options.contains(WAIT_ON_IN_USE)) {
+ if (maxWaitTimeMillis <= 0) {
+ return false;
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(DROP_ATTEMPT_WAIT_TIME_MILLIS);
+ maxWaitTimeMillis -= DROP_ATTEMPT_WAIT_TIME_MILLIS;
+ return true;
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e1);
+ }
+ }
+ return false;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index de2f742..fe64e9f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.storage.common.IIndex;
@@ -223,12 +224,12 @@
public void unregister(String resourcePath) throws HyracksDataException {
IndexInfo info = indexInfos.get(resourcePath);
if (info == null) {
- throw new HyracksDataException("Index with resource name " + resourcePath + " does not exist.");
+ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST);
}
if (info.referenceCount != 0) {
indexInfos.put(resourcePath, info);
- throw new HyracksDataException("Cannot remove index while it is open.");
+ throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX, resourcePath);
}
if (info.isOpen) {