[ASTERIXDB-2246][STO] Ensure Metadata Cache Updated on Rebalance

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Ensure metadata cache is updated to the target
  dataset by commiting the transaction before
  downgrading the write lock. This prevents waiting
  readers from reading a stale cache that points
  to the source dataset that will be dropped.

Change-Id: I222be5d551a6dae66bf97c4bccf696f5b916d9b5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2282
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 1a14864..16ffd40 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -21,6 +21,7 @@
 import static org.apache.asterix.app.translator.QueryTranslator.abort;
 import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
 
+import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashSet;
@@ -153,7 +154,7 @@
             // Executes the 2nd Metadata transaction for switching the metadata entity.
             // It detaches the source dataset and attaches the target dataset to metadata's point of view.
             runMetadataTransaction(metadataProvider,
-                    () -> rebalanceSwitch(sourceDataset, targetDataset, metadataProvider, hcc));
+                    () -> rebalanceSwitch(sourceDataset, targetDataset, metadataProvider));
             // Executes the 3rd Metadata transaction to drop the source dataset files and the node group for
             // the source dataset.
             runMetadataTransaction(metadataProvider, () -> dropSourceDataset(sourceDataset, metadataProvider, hcc));
@@ -200,8 +201,6 @@
         try {
             // Performs the actual work.
             work.run();
-            // Complete the metadata transaction.
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
             throw e;
@@ -231,15 +230,15 @@
     }
 
     // Switches the metadata entity from the source dataset to the target dataset.
-    private static void rebalanceSwitch(Dataset source, Dataset target, MetadataProvider metadataProvider,
-            IHyracksClientConnection hcc) throws Exception {
+    private static void rebalanceSwitch(Dataset source, Dataset target, MetadataProvider metadataProvider)
+            throws AlgebricksException, RemoteException {
         MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
         // upgrade lock
         ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
         ActiveNotificationHandler activeNotificationHandler =
                 (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
         IMetadataLockManager lockManager = appCtx.getMetadataLockManager();
-        lockManager.upgradeDatasetLockToWrite(metadataProvider.getLocks(), DatasetUtil.getFullyQualifiedName(target));
+        lockManager.upgradeDatasetLockToWrite(metadataProvider.getLocks(), DatasetUtil.getFullyQualifiedName(source));
         try {
             // Updates the dataset entry in the metadata storage
             MetadataManager.INSTANCE.updateDataset(mdTxnCtx, target);
@@ -249,6 +248,7 @@
                     controller.replace(target);
                 }
             }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } finally {
             lockManager.downgradeDatasetLockToExclusiveModify(metadataProvider.getLocks(),
                     DatasetUtil.getFullyQualifiedName(target));
@@ -261,8 +261,8 @@
         // Drops the source dataset files. No need to lock the dataset entity here because the source dataset has
         // been detached at this point.
         dropDatasetFiles(source, metadataProvider, hcc);
-
         tryDropDatasetNodegroup(source, metadataProvider);
+        MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
     }
 
     // Drops the metadata entry of source dataset's node group.
@@ -408,8 +408,10 @@
             runMetadataTransaction(metadataProvider, () -> MetadataManager.INSTANCE
                     .dropDataset(metadataProvider.getMetadataTxnContext(), dataset.getDataverseName(),
                             dataset.getDatasetName()));
+            MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
             // try to drop the dataset's node group
             runMetadataTransaction(metadataProvider, () -> tryDropDatasetNodegroup(dataset, metadataProvider));
+            MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
         });
     }
 }