Merge commit '19a2b58c32d56060a8167a2ad69bb942083ba4c1' from release-0.9.3-pre-rc
Change-Id: I9e0ca4288a38d6311ce3e6d4a71a41086f59a2bb
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index 29b38ba..0321ae4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -134,8 +134,9 @@
lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
for (Dataset dataset : listener.getDatasets()) {
- MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(),
- dataset.getDataverseName(), DatasetUtil.getFullyQualifiedName(dataset));
+ lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), dataset.getDataverseName());
+ lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
+ DatasetUtil.getFullyQualifiedName(dataset));
}
synchronized (listener) {
try {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
index 31f2089..1988f0a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
@@ -24,23 +24,31 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.metadata.IMetadataLock;
+import org.apache.asterix.common.utils.InvokeUtil;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
public class DatasetLock implements IMetadataLock {
private final String key;
+ // The lock
private final ReentrantReadWriteLock lock;
- private final ReentrantReadWriteLock dsReadLock;
- private final ReentrantReadWriteLock dsModifyLock;
+ // Used for lock upgrade operation
+ private final ReentrantReadWriteLock upgradeLock;
+ // Used for exclusive modification
+ private final ReentrantReadWriteLock modifyLock;
+ // The two counters below are used to ensure mutual exclusivity between index builds and modifications
+ // order of entry indexBuildCounter -> indexModifyCounter
private final MutableInt indexBuildCounter;
+ private final MutableInt dsModifyCounter;
public DatasetLock(String key) {
this.key = key;
lock = new ReentrantReadWriteLock(true);
- dsReadLock = new ReentrantReadWriteLock(true);
- dsModifyLock = new ReentrantReadWriteLock(true);
+ upgradeLock = new ReentrantReadWriteLock(true);
+ modifyLock = new ReentrantReadWriteLock(true);
indexBuildCounter = new MutableInt(0);
+ dsModifyCounter = new MutableInt(0);
}
private void readLock() {
@@ -71,63 +79,97 @@
lock.writeLock().unlock();
}
- private void readReadLock() {
- dsReadLock.readLock().lock();
+ private void upgradeReadLock() {
+ upgradeLock.readLock().lock();
}
private void modifyReadLock() {
// insert
- dsModifyLock.readLock().lock();
+ modifyLock.readLock().lock();
+ incrementModifyCounter();
+ }
+
+ private void incrementModifyCounter() {
+ InvokeUtil.doUninterruptibly(() -> {
+ synchronized (indexBuildCounter) {
+ while (indexBuildCounter.getValue() > 0) {
+ indexBuildCounter.wait();
+ }
+ synchronized (dsModifyCounter) {
+ dsModifyCounter.increment();
+ }
+ }
+ });
+ }
+
+ private void decrementModifyCounter() {
+ synchronized (indexBuildCounter) {
+ synchronized (dsModifyCounter) {
+ if (dsModifyCounter.decrementAndGet() == 0) {
+ indexBuildCounter.notifyAll();
+ }
+ }
+ }
}
private void modifyReadUnlock() {
// insert
- dsModifyLock.readLock().unlock();
+ decrementModifyCounter();
+ modifyLock.readLock().unlock();
}
- private void readReadUnlock() {
- dsReadLock.readLock().unlock();
+ private void upgradeReadUnlock() {
+ upgradeLock.readLock().unlock();
}
- private void readWriteUnlock() {
- dsReadLock.writeLock().unlock();
+ private void upgradeWriteUnlock() {
+ upgradeLock.writeLock().unlock();
}
- private void modifySharedWriteLock() {
+ private void buildIndexLock() {
// Build index statement
synchronized (indexBuildCounter) {
if (indexBuildCounter.getValue() > 0) {
- indexBuildCounter.setValue(indexBuildCounter.getValue() + 1);
+ indexBuildCounter.increment();
} else {
- dsModifyLock.writeLock().lock();
- indexBuildCounter.setValue(1);
+ InvokeUtil.doUninterruptibly(() -> {
+ while (true) {
+ synchronized (dsModifyCounter) {
+ if (dsModifyCounter.getValue() == 0) {
+ indexBuildCounter.increment();
+ return;
+ }
+ }
+ indexBuildCounter.wait();
+ }
+ });
}
}
}
- private void modifySharedWriteUnlock() {
+ private void buildIndexUnlock() {
// Build index statement
synchronized (indexBuildCounter) {
- if (indexBuildCounter.getValue() == 1) {
- dsModifyLock.writeLock().unlock();
+ if (indexBuildCounter.decrementAndGet() == 0) {
+ indexBuildCounter.notifyAll();
}
- indexBuildCounter.setValue(indexBuildCounter.getValue() - 1);
}
}
- private void modifyExclusiveWriteLock() {
- dsModifyLock.writeLock().lock();
+ private void modifyWriteLock() {
+ modifyLock.writeLock().lock();
+ incrementModifyCounter();
}
private void modifyExclusiveWriteUnlock() {
- dsModifyLock.writeLock().unlock();
+ decrementModifyCounter();
+ modifyLock.writeLock().unlock();
}
@Override
public void upgrade(IMetadataLock.Mode from, IMetadataLock.Mode to) throws AlgebricksException {
if (from == IMetadataLock.Mode.EXCLUSIVE_MODIFY && to == IMetadataLock.Mode.UPGRADED_WRITE) {
- dsReadLock.readLock().unlock();
- dsReadLock.writeLock().lock();
+ upgradeLock.writeLock().lock();
} else {
throw new MetadataException(ErrorCode.ILLEGAL_LOCK_UPGRADE_OPERATION, from, to);
}
@@ -136,8 +178,7 @@
@Override
public void downgrade(IMetadataLock.Mode from, IMetadataLock.Mode to) throws AlgebricksException {
if (from == IMetadataLock.Mode.UPGRADED_WRITE && to == IMetadataLock.Mode.EXCLUSIVE_MODIFY) {
- dsReadLock.writeLock().unlock();
- dsReadLock.readLock().lock();
+ upgradeLock.writeLock().unlock();
} else {
throw new MetadataException(ErrorCode.ILLEGAL_LOCK_DOWNGRADE_OPERATION, from, to);
}
@@ -148,24 +189,22 @@
switch (mode) {
case INDEX_BUILD:
readLock();
- modifySharedWriteLock();
+ buildIndexLock();
break;
case MODIFY:
readLock();
- readReadLock();
modifyReadLock();
break;
case EXCLUSIVE_MODIFY:
readLock();
- readReadLock();
- modifyExclusiveWriteLock();
+ modifyWriteLock();
break;
case WRITE:
writeLock();
break;
case READ:
readLock();
- readReadLock();
+ upgradeReadLock();
break;
default:
throw new IllegalStateException("locking mode " + mode + " is not supported");
@@ -176,28 +215,26 @@
public void unlock(IMetadataLock.Mode mode) {
switch (mode) {
case INDEX_BUILD:
- modifySharedWriteUnlock();
+ buildIndexUnlock();
readUnlock();
break;
case MODIFY:
modifyReadUnlock();
- readReadUnlock();
readUnlock();
break;
case EXCLUSIVE_MODIFY:
modifyExclusiveWriteUnlock();
- readReadUnlock();
readUnlock();
break;
case WRITE:
writeUnlock();
break;
case READ:
- readReadUnlock();
+ upgradeReadUnlock();
readUnlock();
break;
case UPGRADED_WRITE:
- readWriteUnlock();
+ upgradeWriteUnlock();
modifyExclusiveWriteUnlock();
readUnlock();
break;
diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java
new file mode 100644
index 0000000..4382860
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.lock;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+
+import org.apache.asterix.common.metadata.LockList;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class MetadataLockManagerTest {
+
+ static final int REPREAT_TEST_COUNT = 3;
+
+ @Parameterized.Parameters
+ public static List<Object[]> data() {
+ return Arrays.asList(new Object[REPREAT_TEST_COUNT][0]);
+ }
+
+ private static class Request {
+ private enum Statement {
+ INDEX,
+ MODIFY,
+ EXCLUSIVE_MODIFY,
+ EXCLUSIVE_MODIFY_UPGRADE_DOWNGRADE,
+ EXCLUSIVE_MODIFY_UPGRADE,
+ }
+
+ private final Statement statement;
+ private final String dataset;
+ private boolean done;
+ private int step = 0;
+
+ public Request(Statement statement, String dataset) {
+ this.statement = statement;
+ this.dataset = dataset;
+ done = false;
+ }
+
+ Statement statement() {
+ return statement;
+ }
+
+ String dataset() {
+ return dataset;
+ }
+
+ synchronized void complete() {
+ done = true;
+ notifyAll();
+ }
+
+ synchronized void await() throws InterruptedException {
+ while (!done) {
+ wait();
+ }
+ }
+
+ synchronized void step() {
+ step++;
+ notifyAll();
+ }
+
+ synchronized int getSteps() {
+ return step;
+ }
+
+ synchronized void await(int step) throws InterruptedException {
+ while (this.step < step) {
+ wait();
+ }
+ }
+ }
+
+ public class User extends SingleThreadEventProcessor<Request> {
+
+ private MetadataLockManager lockManager;
+ private Semaphore step = new Semaphore(0);
+ private final LockList locks = new LockList();
+
+ public User(String username, MetadataLockManager lockManager) {
+ super(username);
+ this.lockManager = lockManager;
+ }
+
+ public void step() {
+ step.release();
+ }
+
+ @Override
+ protected void handle(Request req) throws Exception {
+ try {
+ step.acquire();
+ switch (req.statement()) {
+ case INDEX:
+ lockManager.acquireDatasetCreateIndexLock(locks, req.dataset());
+ break;
+ case MODIFY:
+ lockManager.acquireDatasetModifyLock(locks, req.dataset());
+ break;
+ case EXCLUSIVE_MODIFY:
+ lockManager.acquireDatasetExclusiveModificationLock(locks, req.dataset());
+ break;
+ case EXCLUSIVE_MODIFY_UPGRADE:
+ lockManager.acquireDatasetExclusiveModificationLock(locks, req.dataset());
+ req.step();
+ step.acquire();
+ lockManager.upgradeDatasetLockToWrite(locks, req.dataset());
+ break;
+ case EXCLUSIVE_MODIFY_UPGRADE_DOWNGRADE:
+ lockManager.acquireDatasetExclusiveModificationLock(locks, req.dataset());
+ req.step();
+ step.acquire();
+ lockManager.upgradeDatasetLockToWrite(locks, req.dataset());
+ req.step();
+ step.acquire();
+ lockManager.downgradeDatasetLockToExclusiveModify(locks, req.dataset());
+ break;
+ default:
+ break;
+ }
+ req.step();
+ step.acquire();
+ } finally {
+ locks.reset();
+ req.step();
+ req.complete();
+ }
+ }
+
+ }
+
+ @Test
+ public void testDatasetLockMultipleIndexBuildsSingleModifier() throws Exception {
+ MetadataLockManager lockManager = new MetadataLockManager();
+ String dataset = "Dataset";
+ User till = new User("till", lockManager);
+ Request tReq = new Request(Request.Statement.INDEX, dataset);
+ User dmitry = new User("dmitry", lockManager);
+ Request dReq = new Request(Request.Statement.INDEX, dataset);
+ User mike = new User("mike", lockManager);
+ Request mReq = new Request(Request.Statement.MODIFY, dataset);
+ // Till builds an index
+ till.add(tReq);
+ // Dmitry builds an index
+ dmitry.add(dReq);
+ // Mike modifies
+ mike.add(mReq);
+ // Till starts
+ till.step();
+ // Ensure lock acquired
+ tReq.await(1);
+ // Dmitry starts
+ dmitry.step();
+ // Ensure lock acquired
+ dReq.await(1);
+ // Mike starts and is allowed to go all the way
+ mike.step();
+ mike.step();
+ // Ensure that Mike still could not acquire locks
+ Assert.assertEquals(0, mReq.getSteps());
+ // Till finishes first
+ till.step();
+ // Ensure the request has been completed and lock has been released
+ tReq.await();
+ // Ensure that Mike still could not acquire locks
+ Assert.assertEquals(0, mReq.getSteps());
+ // Dmitry finishes second
+ dmitry.step();
+ // Ensure the request has been completed and lock has been released
+ dReq.await();
+ // Ensure that Mike could proceed and request has been completed
+ mReq.await();
+ // Stop users
+ till.stop();
+ dmitry.stop();
+ mike.stop();
+ }
+
+ @Test
+ public void testDatasetLockMultipleModifiersSingleIndexBuilder() throws Exception {
+ MetadataLockManager lockManager = new MetadataLockManager();
+ String dataset = "Dataset";
+ User till = new User("till", lockManager);
+ Request tReq = new Request(Request.Statement.MODIFY, dataset);
+ User dmitry = new User("dmitry", lockManager);
+ Request dReq = new Request(Request.Statement.MODIFY, dataset);
+ User mike = new User("mike", lockManager);
+ Request mReq = new Request(Request.Statement.INDEX, dataset);
+ // Till modifies
+ till.add(tReq);
+ // Dmitry modifies
+ dmitry.add(dReq);
+ // Mike builds an index
+ mike.add(mReq);
+ // Till starts
+ till.step();
+ // Ensure lock acquired
+ tReq.await(1);
+ // Dmitry starts
+ dmitry.step();
+ // Ensure lock acquired
+ dReq.await(1);
+ // Mike starts and is allowed to go all the way
+ mike.step();
+ mike.step();
+ // Ensure that Mike still could not acquire locks
+ Assert.assertEquals(0, mReq.getSteps());
+ // Till finishes first
+ till.step();
+ // Ensure the request has been completed and lock has been released
+ tReq.await();
+ // Ensure that Mike still could not acquire locks
+ Assert.assertEquals(0, mReq.getSteps());
+ // Dmitry finishes second
+ dmitry.step();
+ // Ensure the request has been completed and lock has been released
+ dReq.await();
+ // Ensure that Mike could proceed and request has been completed
+ mReq.await();
+ // Stop users
+ till.stop();
+ dmitry.stop();
+ mike.stop();
+ }
+
+ @Test
+ public void testDatasetLockMultipleModifiersSingleExclusiveModifier() throws Exception {
+ MetadataLockManager lockManager = new MetadataLockManager();
+ String dataset = "Dataset";
+ User till = new User("till", lockManager);
+ Request tReq = new Request(Request.Statement.MODIFY, dataset);
+ User dmitry = new User("dmitry", lockManager);
+ Request dReq = new Request(Request.Statement.MODIFY, dataset);
+ User mike = new User("mike", lockManager);
+ Request mReq = new Request(Request.Statement.EXCLUSIVE_MODIFY, dataset);
+ // Till starts
+ till.add(tReq);
+ till.step();
+ // Ensure lock is acquired
+ tReq.await(1);
+ // Mike starts
+ mike.add(mReq);
+ mike.step();
+ // Sleep for 1s for now as there is no way to find out user has submitted the exclusive lock request
+ Thread.sleep(1000);
+ // Ensure that Mike didn't get the lock
+ Assert.assertEquals(0, mReq.getSteps());
+ // Dmitry starts
+ dmitry.add(dReq);
+ dmitry.step();
+ // Ensure that Dmitry didn't get the lock
+ Assert.assertEquals(0, dReq.getSteps());
+ // Till proceeds
+ till.step();
+ // Ensure the request has been completed and lock has been released
+ tReq.await();
+ // Ensure that Mike got the lock
+ mReq.await(1);
+ // Till submits another request
+ tReq = new Request(Request.Statement.MODIFY, dataset);
+ till.add(tReq);
+ till.step();
+ // Ensure that Till didn't get the lock
+ Assert.assertEquals(0, tReq.getSteps());
+ // Ensure that Dmitry didn't get the lock
+ Assert.assertEquals(0, dReq.getSteps());
+ // Mike completes
+ mike.step();
+ mReq.await();
+ // Ensure that both Till and Dmitry got the lock
+ tReq.await(1);
+ dReq.await(1);
+ till.step();
+ dmitry.step();
+ // Ensure that both Till and Dmitry complete
+ tReq.await();
+ dReq.await();
+ // Stop users
+ till.stop();
+ dmitry.stop();
+ mike.stop();
+ }
+
+}