[ASTERIXDB-2131][TX] Do Not Reset Active Ops For Aborted Metadata Txn

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Do not reset the primary index operation tracker active operations
  count if the metadata transaction was aborted.
- Add test cases.

Change-Id: Iee47aca1be0675b704ed9f176d9e10daef1cfc7f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2071
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 71c67f4..f5e94b1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -121,6 +121,10 @@
         this.ncs = nodeControllers.toArray(new NodeControllerService[nodeControllers.size()]);
     }
 
+    public ClusterControllerService getClusterControllerService() {
+        return cc;
+    }
+
     protected CCConfig createCCConfig(ConfigManager configManager) throws IOException {
         CCConfig ccConfig = new CCConfig(configManager);
         ccConfig.setClusterListenAddress(Inet4Address.getLoopbackAddress().getHostAddress());
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
new file mode 100644
index 0000000..3969ec5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.metadata;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.NodeGroup;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MetadataTxnTest {
+
+    private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+    private static final TestExecutor testExecutor = new TestExecutor();
+    private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+        integrationUtil.init(true);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        integrationUtil.deinit(true);
+    }
+
+    @Test
+    public void abortMetadataTxn() throws Exception {
+        ICcApplicationContext appCtx =
+                (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
+        final MetadataProvider metadataProvider = new MetadataProvider(appCtx, null);
+        final MetadataTransactionContext mdTxn = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxn);
+        final String nodeGroupName = "ng";
+        try {
+            final List<String> ngNodes = Arrays.asList("asterix_nc1");
+            MetadataManager.INSTANCE.addNodegroup(mdTxn, new NodeGroup(nodeGroupName, ngNodes));
+            MetadataManager.INSTANCE.abortTransaction(mdTxn);
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+
+        // ensure that the node group was not added
+        final MetadataTransactionContext readMdTxn = MetadataManager.INSTANCE.beginTransaction();
+        try {
+            final NodeGroup nodegroup = MetadataManager.INSTANCE.getNodegroup(readMdTxn, nodeGroupName);
+            if (nodegroup != null) {
+                throw new AssertionError("nodegroup was found after metadata txn was aborted");
+            }
+        } finally {
+            MetadataManager.INSTANCE.commitTransaction(readMdTxn);
+        }
+    }
+
+    @Test
+    public void rebalanceFailureMetadataTxn() throws Exception {
+        ICcApplicationContext appCtx =
+                (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
+        String nodeGroup = "ng";
+        String datasetName = "dataset1";
+        final TestCaseContext.OutputFormat format = TestCaseContext.OutputFormat.CLEAN_JSON;
+        // create original node group
+        testExecutor.executeSqlppUpdateOrDdl("CREATE nodegroup " + nodeGroup + " on asterix_nc2;", format);
+        // create original dataset
+        testExecutor.executeSqlppUpdateOrDdl("CREATE TYPE KeyType AS { id: int };", format);
+        testExecutor.executeSqlppUpdateOrDdl(
+                "CREATE DATASET " + datasetName + "(KeyType) PRIMARY KEY id on " + nodeGroup + ";", format);
+        // find source dataset
+        Dataset sourceDataset;
+        MetadataProvider metadataProvider = new MetadataProvider(appCtx, null);
+        final MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        try {
+            sourceDataset = metadataProvider.findDataset(MetadataBuiltinEntities.DEFAULT_DATAVERSE_NAME, datasetName);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+
+        // create rebalance metadata provider and metadata txn
+        metadataProvider = new MetadataProvider(appCtx, null);
+        final MetadataTransactionContext rebalanceTxn = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(rebalanceTxn);
+        try {
+            final Set<String> rebalanceToNodes = Stream.of("asterix_nc1").collect(Collectors.toSet());
+            DatasetUtil.createNodeGroupForNewDataset(sourceDataset.getDataverseName(), sourceDataset.getDatasetName(),
+                    sourceDataset.getRebalanceCount() + 1, rebalanceToNodes, metadataProvider);
+            // rebalance failed --> abort txn
+            MetadataManager.INSTANCE.abortTransaction(rebalanceTxn);
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+        // ensure original dataset can be dropped after rebalance failure
+        testExecutor.executeSqlppUpdateOrDdl("DROP DATASET " + datasetName + ";", format);
+
+        // ensure the node group was dropped too since its only dataset was dropped
+        final MetadataTransactionContext readMdTxn = MetadataManager.INSTANCE.beginTransaction();
+        try {
+            final NodeGroup nodegroup = MetadataManager.INSTANCE.getNodegroup(readMdTxn, nodeGroup);
+            if (nodegroup != null) {
+                throw new AssertionError("nodegroup was found after its only dataset was dropped");
+            }
+        } finally {
+            MetadataManager.INSTANCE.commitTransaction(readMdTxn);
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 67b25b6..0899c21 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -184,6 +184,9 @@
     public void cleanupNumActiveOperationsForAbortedJob(int numberOfActiveOperations) {
         numberOfActiveOperations *= -1;
         numActiveOperations.getAndAdd(numberOfActiveOperations);
+        if (numActiveOperations.get() < 0) {
+            throw new IllegalStateException("The number of active operations cannot be negative!");
+        }
     }
 
     public boolean isFlushOnExit() {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
index f53aeb1..eb37f22 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -147,7 +147,11 @@
     @Override
     public void notifyOptracker(boolean isJobLevelCommit) {
         try {
-            if (isJobLevelCommit && isMetadataTxn) {
+            /**
+             * in case of transaction abort {@link TransactionContext#cleanupForAbort()} will
+             * clean the primaryIndexOpTracker state.
+             */
+            if (isJobLevelCommit && isMetadataTxn && txnState.get() != ITransactionManager.ABORTED) {
                 primaryIndexOpTracker.exclusiveJobCommitted();
             } else if (!isJobLevelCommit) {
                 primaryIndexOpTracker.completeOperation(null, LSMOperationType.MODIFICATION, null,