Merge commit '19a2b58c32d56060a8167a2ad69bb942083ba4c1' from release-0.9.3-pre-rc

Change-Id: I9e0ca4288a38d6311ce3e6d4a71a41086f59a2bb
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index 29b38ba..0321ae4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -134,8 +134,9 @@
             lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
                     listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
             for (Dataset dataset : listener.getDatasets()) {
-                MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(),
-                        dataset.getDataverseName(), DatasetUtil.getFullyQualifiedName(dataset));
+                lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), dataset.getDataverseName());
+                lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
+                        DatasetUtil.getFullyQualifiedName(dataset));
             }
             synchronized (listener) {
                 try {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
index 31f2089..1988f0a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
@@ -24,23 +24,31 @@
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.metadata.IMetadataLock;
+import org.apache.asterix.common.utils.InvokeUtil;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 
 public class DatasetLock implements IMetadataLock {
 
     private final String key;
+    // The lock
     private final ReentrantReadWriteLock lock;
-    private final ReentrantReadWriteLock dsReadLock;
-    private final ReentrantReadWriteLock dsModifyLock;
+    // Used for lock upgrade operation
+    private final ReentrantReadWriteLock upgradeLock;
+    // Used for exclusive modification
+    private final ReentrantReadWriteLock modifyLock;
+    // The two counters below are used to ensure mutual exclusivity between index builds and modifications
+    // order of entry indexBuildCounter -> indexModifyCounter
     private final MutableInt indexBuildCounter;
+    private final MutableInt dsModifyCounter;
 
     public DatasetLock(String key) {
         this.key = key;
         lock = new ReentrantReadWriteLock(true);
-        dsReadLock = new ReentrantReadWriteLock(true);
-        dsModifyLock = new ReentrantReadWriteLock(true);
+        upgradeLock = new ReentrantReadWriteLock(true);
+        modifyLock = new ReentrantReadWriteLock(true);
         indexBuildCounter = new MutableInt(0);
+        dsModifyCounter = new MutableInt(0);
     }
 
     private void readLock() {
@@ -71,63 +79,97 @@
         lock.writeLock().unlock();
     }
 
-    private void readReadLock() {
-        dsReadLock.readLock().lock();
+    private void upgradeReadLock() {
+        upgradeLock.readLock().lock();
     }
 
     private void modifyReadLock() {
         // insert
-        dsModifyLock.readLock().lock();
+        modifyLock.readLock().lock();
+        incrementModifyCounter();
+    }
+
+    private void incrementModifyCounter() {
+        InvokeUtil.doUninterruptibly(() -> {
+            synchronized (indexBuildCounter) {
+                while (indexBuildCounter.getValue() > 0) {
+                    indexBuildCounter.wait();
+                }
+                synchronized (dsModifyCounter) {
+                    dsModifyCounter.increment();
+                }
+            }
+        });
+    }
+
+    private void decrementModifyCounter() {
+        synchronized (indexBuildCounter) {
+            synchronized (dsModifyCounter) {
+                if (dsModifyCounter.decrementAndGet() == 0) {
+                    indexBuildCounter.notifyAll();
+                }
+            }
+        }
     }
 
     private void modifyReadUnlock() {
         // insert
-        dsModifyLock.readLock().unlock();
+        decrementModifyCounter();
+        modifyLock.readLock().unlock();
     }
 
-    private void readReadUnlock() {
-        dsReadLock.readLock().unlock();
+    private void upgradeReadUnlock() {
+        upgradeLock.readLock().unlock();
     }
 
-    private void readWriteUnlock() {
-        dsReadLock.writeLock().unlock();
+    private void upgradeWriteUnlock() {
+        upgradeLock.writeLock().unlock();
     }
 
-    private void modifySharedWriteLock() {
+    private void buildIndexLock() {
         // Build index statement
         synchronized (indexBuildCounter) {
             if (indexBuildCounter.getValue() > 0) {
-                indexBuildCounter.setValue(indexBuildCounter.getValue() + 1);
+                indexBuildCounter.increment();
             } else {
-                dsModifyLock.writeLock().lock();
-                indexBuildCounter.setValue(1);
+                InvokeUtil.doUninterruptibly(() -> {
+                    while (true) {
+                        synchronized (dsModifyCounter) {
+                            if (dsModifyCounter.getValue() == 0) {
+                                indexBuildCounter.increment();
+                                return;
+                            }
+                        }
+                        indexBuildCounter.wait();
+                    }
+                });
             }
         }
     }
 
-    private void modifySharedWriteUnlock() {
+    private void buildIndexUnlock() {
         // Build index statement
         synchronized (indexBuildCounter) {
-            if (indexBuildCounter.getValue() == 1) {
-                dsModifyLock.writeLock().unlock();
+            if (indexBuildCounter.decrementAndGet() == 0) {
+                indexBuildCounter.notifyAll();
             }
-            indexBuildCounter.setValue(indexBuildCounter.getValue() - 1);
         }
     }
 
-    private void modifyExclusiveWriteLock() {
-        dsModifyLock.writeLock().lock();
+    private void modifyWriteLock() {
+        modifyLock.writeLock().lock();
+        incrementModifyCounter();
     }
 
     private void modifyExclusiveWriteUnlock() {
-        dsModifyLock.writeLock().unlock();
+        decrementModifyCounter();
+        modifyLock.writeLock().unlock();
     }
 
     @Override
     public void upgrade(IMetadataLock.Mode from, IMetadataLock.Mode to) throws AlgebricksException {
         if (from == IMetadataLock.Mode.EXCLUSIVE_MODIFY && to == IMetadataLock.Mode.UPGRADED_WRITE) {
-            dsReadLock.readLock().unlock();
-            dsReadLock.writeLock().lock();
+            upgradeLock.writeLock().lock();
         } else {
             throw new MetadataException(ErrorCode.ILLEGAL_LOCK_UPGRADE_OPERATION, from, to);
         }
@@ -136,8 +178,7 @@
     @Override
     public void downgrade(IMetadataLock.Mode from, IMetadataLock.Mode to) throws AlgebricksException {
         if (from == IMetadataLock.Mode.UPGRADED_WRITE && to == IMetadataLock.Mode.EXCLUSIVE_MODIFY) {
-            dsReadLock.writeLock().unlock();
-            dsReadLock.readLock().lock();
+            upgradeLock.writeLock().unlock();
         } else {
             throw new MetadataException(ErrorCode.ILLEGAL_LOCK_DOWNGRADE_OPERATION, from, to);
         }
@@ -148,24 +189,22 @@
         switch (mode) {
             case INDEX_BUILD:
                 readLock();
-                modifySharedWriteLock();
+                buildIndexLock();
                 break;
             case MODIFY:
                 readLock();
-                readReadLock();
                 modifyReadLock();
                 break;
             case EXCLUSIVE_MODIFY:
                 readLock();
-                readReadLock();
-                modifyExclusiveWriteLock();
+                modifyWriteLock();
                 break;
             case WRITE:
                 writeLock();
                 break;
             case READ:
                 readLock();
-                readReadLock();
+                upgradeReadLock();
                 break;
             default:
                 throw new IllegalStateException("locking mode " + mode + " is not supported");
@@ -176,28 +215,26 @@
     public void unlock(IMetadataLock.Mode mode) {
         switch (mode) {
             case INDEX_BUILD:
-                modifySharedWriteUnlock();
+                buildIndexUnlock();
                 readUnlock();
                 break;
             case MODIFY:
                 modifyReadUnlock();
-                readReadUnlock();
                 readUnlock();
                 break;
             case EXCLUSIVE_MODIFY:
                 modifyExclusiveWriteUnlock();
-                readReadUnlock();
                 readUnlock();
                 break;
             case WRITE:
                 writeUnlock();
                 break;
             case READ:
-                readReadUnlock();
+                upgradeReadUnlock();
                 readUnlock();
                 break;
             case UPGRADED_WRITE:
-                readWriteUnlock();
+                upgradeWriteUnlock();
                 modifyExclusiveWriteUnlock();
                 readUnlock();
                 break;
diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java
new file mode 100644
index 0000000..4382860
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.lock;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+
+import org.apache.asterix.common.metadata.LockList;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class MetadataLockManagerTest {
+
+    static final int REPREAT_TEST_COUNT = 3;
+
+    @Parameterized.Parameters
+    public static List<Object[]> data() {
+        return Arrays.asList(new Object[REPREAT_TEST_COUNT][0]);
+    }
+
+    private static class Request {
+        private enum Statement {
+            INDEX,
+            MODIFY,
+            EXCLUSIVE_MODIFY,
+            EXCLUSIVE_MODIFY_UPGRADE_DOWNGRADE,
+            EXCLUSIVE_MODIFY_UPGRADE,
+        }
+
+        private final Statement statement;
+        private final String dataset;
+        private boolean done;
+        private int step = 0;
+
+        public Request(Statement statement, String dataset) {
+            this.statement = statement;
+            this.dataset = dataset;
+            done = false;
+        }
+
+        Statement statement() {
+            return statement;
+        }
+
+        String dataset() {
+            return dataset;
+        }
+
+        synchronized void complete() {
+            done = true;
+            notifyAll();
+        }
+
+        synchronized void await() throws InterruptedException {
+            while (!done) {
+                wait();
+            }
+        }
+
+        synchronized void step() {
+            step++;
+            notifyAll();
+        }
+
+        synchronized int getSteps() {
+            return step;
+        }
+
+        synchronized void await(int step) throws InterruptedException {
+            while (this.step < step) {
+                wait();
+            }
+        }
+    }
+
+    public class User extends SingleThreadEventProcessor<Request> {
+
+        private MetadataLockManager lockManager;
+        private Semaphore step = new Semaphore(0);
+        private final LockList locks = new LockList();
+
+        public User(String username, MetadataLockManager lockManager) {
+            super(username);
+            this.lockManager = lockManager;
+        }
+
+        public void step() {
+            step.release();
+        }
+
+        @Override
+        protected void handle(Request req) throws Exception {
+            try {
+                step.acquire();
+                switch (req.statement()) {
+                    case INDEX:
+                        lockManager.acquireDatasetCreateIndexLock(locks, req.dataset());
+                        break;
+                    case MODIFY:
+                        lockManager.acquireDatasetModifyLock(locks, req.dataset());
+                        break;
+                    case EXCLUSIVE_MODIFY:
+                        lockManager.acquireDatasetExclusiveModificationLock(locks, req.dataset());
+                        break;
+                    case EXCLUSIVE_MODIFY_UPGRADE:
+                        lockManager.acquireDatasetExclusiveModificationLock(locks, req.dataset());
+                        req.step();
+                        step.acquire();
+                        lockManager.upgradeDatasetLockToWrite(locks, req.dataset());
+                        break;
+                    case EXCLUSIVE_MODIFY_UPGRADE_DOWNGRADE:
+                        lockManager.acquireDatasetExclusiveModificationLock(locks, req.dataset());
+                        req.step();
+                        step.acquire();
+                        lockManager.upgradeDatasetLockToWrite(locks, req.dataset());
+                        req.step();
+                        step.acquire();
+                        lockManager.downgradeDatasetLockToExclusiveModify(locks, req.dataset());
+                        break;
+                    default:
+                        break;
+                }
+                req.step();
+                step.acquire();
+            } finally {
+                locks.reset();
+                req.step();
+                req.complete();
+            }
+        }
+
+    }
+
+    @Test
+    public void testDatasetLockMultipleIndexBuildsSingleModifier() throws Exception {
+        MetadataLockManager lockManager = new MetadataLockManager();
+        String dataset = "Dataset";
+        User till = new User("till", lockManager);
+        Request tReq = new Request(Request.Statement.INDEX, dataset);
+        User dmitry = new User("dmitry", lockManager);
+        Request dReq = new Request(Request.Statement.INDEX, dataset);
+        User mike = new User("mike", lockManager);
+        Request mReq = new Request(Request.Statement.MODIFY, dataset);
+        // Till builds an index
+        till.add(tReq);
+        // Dmitry builds an index
+        dmitry.add(dReq);
+        // Mike modifies
+        mike.add(mReq);
+        // Till starts
+        till.step();
+        // Ensure lock acquired
+        tReq.await(1);
+        // Dmitry starts
+        dmitry.step();
+        // Ensure lock acquired
+        dReq.await(1);
+        // Mike starts and is allowed to go all the way
+        mike.step();
+        mike.step();
+        // Ensure that Mike still could not acquire locks
+        Assert.assertEquals(0, mReq.getSteps());
+        // Till finishes first
+        till.step();
+        // Ensure the request has been completed and lock has been released
+        tReq.await();
+        // Ensure that Mike still could not acquire locks
+        Assert.assertEquals(0, mReq.getSteps());
+        // Dmitry finishes second
+        dmitry.step();
+        // Ensure the request has been completed and lock has been released
+        dReq.await();
+        // Ensure that Mike could proceed and request has been completed
+        mReq.await();
+        // Stop users
+        till.stop();
+        dmitry.stop();
+        mike.stop();
+    }
+
+    @Test
+    public void testDatasetLockMultipleModifiersSingleIndexBuilder() throws Exception {
+        MetadataLockManager lockManager = new MetadataLockManager();
+        String dataset = "Dataset";
+        User till = new User("till", lockManager);
+        Request tReq = new Request(Request.Statement.MODIFY, dataset);
+        User dmitry = new User("dmitry", lockManager);
+        Request dReq = new Request(Request.Statement.MODIFY, dataset);
+        User mike = new User("mike", lockManager);
+        Request mReq = new Request(Request.Statement.INDEX, dataset);
+        // Till modifies
+        till.add(tReq);
+        // Dmitry modifies
+        dmitry.add(dReq);
+        // Mike builds an index
+        mike.add(mReq);
+        // Till starts
+        till.step();
+        // Ensure lock acquired
+        tReq.await(1);
+        // Dmitry starts
+        dmitry.step();
+        // Ensure lock acquired
+        dReq.await(1);
+        // Mike starts and is allowed to go all the way
+        mike.step();
+        mike.step();
+        // Ensure that Mike still could not acquire locks
+        Assert.assertEquals(0, mReq.getSteps());
+        // Till finishes first
+        till.step();
+        // Ensure the request has been completed and lock has been released
+        tReq.await();
+        // Ensure that Mike still could not acquire locks
+        Assert.assertEquals(0, mReq.getSteps());
+        // Dmitry finishes second
+        dmitry.step();
+        // Ensure the request has been completed and lock has been released
+        dReq.await();
+        // Ensure that Mike could proceed and request has been completed
+        mReq.await();
+        // Stop users
+        till.stop();
+        dmitry.stop();
+        mike.stop();
+    }
+
+    @Test
+    public void testDatasetLockMultipleModifiersSingleExclusiveModifier() throws Exception {
+        MetadataLockManager lockManager = new MetadataLockManager();
+        String dataset = "Dataset";
+        User till = new User("till", lockManager);
+        Request tReq = new Request(Request.Statement.MODIFY, dataset);
+        User dmitry = new User("dmitry", lockManager);
+        Request dReq = new Request(Request.Statement.MODIFY, dataset);
+        User mike = new User("mike", lockManager);
+        Request mReq = new Request(Request.Statement.EXCLUSIVE_MODIFY, dataset);
+        // Till starts
+        till.add(tReq);
+        till.step();
+        // Ensure lock is acquired
+        tReq.await(1);
+        // Mike starts
+        mike.add(mReq);
+        mike.step();
+        // Sleep for 1s for now as there is no way to find out user has submitted the exclusive lock request
+        Thread.sleep(1000);
+        // Ensure that Mike didn't get the lock
+        Assert.assertEquals(0, mReq.getSteps());
+        // Dmitry starts
+        dmitry.add(dReq);
+        dmitry.step();
+        // Ensure that Dmitry didn't get the lock
+        Assert.assertEquals(0, dReq.getSteps());
+        // Till proceeds
+        till.step();
+        // Ensure the request has been completed and lock has been released
+        tReq.await();
+        // Ensure that Mike got the lock
+        mReq.await(1);
+        // Till submits another request
+        tReq = new Request(Request.Statement.MODIFY, dataset);
+        till.add(tReq);
+        till.step();
+        // Ensure that Till didn't get the lock
+        Assert.assertEquals(0, tReq.getSteps());
+        // Ensure that Dmitry didn't get the lock
+        Assert.assertEquals(0, dReq.getSteps());
+        // Mike completes
+        mike.step();
+        mReq.await();
+        // Ensure that  both Till and Dmitry got the lock
+        tReq.await(1);
+        dReq.await(1);
+        till.step();
+        dmitry.step();
+        // Ensure that  both Till and Dmitry complete
+        tReq.await();
+        dReq.await();
+        // Stop users
+        till.stop();
+        dmitry.stop();
+        mike.stop();
+    }
+
+}