[NO ISSUE][TX]: Multiple fixes to atomic statements

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Correctly computes partition count in case of parallel
  execution.
- Correctly computes node count in case of a node group
  is used.
- Fix for the case when a job prepared message is recieved
  after the transaction is aborted.
- Job commit message is sent in commitTransaction method.
  This makes sure that the commit messages are sent only
  after the job was successfull.
- Fix for the case when job is aborted/cancelled after
  flushes are scheduled on atomic datasets but are not
  finished.

Change-Id: Ieb029d6273f19fa4bd0e7edfb8897f894c1f5b6e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17643
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 10f118c0..bb93fbc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -554,7 +554,7 @@
 
     public static AlgebricksAbsolutePartitionConstraint getJobLocations(JobSpecification spec,
             INodeJobTracker jobTracker, AlgebricksAbsolutePartitionConstraint clusterLocations) {
-        final Set<String> jobParticipatingNodes = jobTracker.getJobParticipatingNodes(spec);
+        final Set<String> jobParticipatingNodes = jobTracker.getJobParticipatingNodes(spec, null);
         return new AlgebricksAbsolutePartitionConstraint(Arrays.stream(clusterLocations.getLocations())
                 .filter(jobParticipatingNodes::contains).toArray(String[]::new));
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index beb55d8..4dbf38b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -69,8 +69,7 @@
     @Override
     public void commitTransaction(JobId jobId) throws ACIDException {
         IGlobalTransactionContext context = getTransactionContext(jobId);
-        if (context.getTxnStatus() == TransactionStatus.ACTIVE
-                || context.getTxnStatus() == TransactionStatus.PREPARED) {
+        if (context.getAcksReceived() != context.getNumPartitions()) {
             synchronized (context) {
                 try {
                     context.wait();
@@ -80,6 +79,19 @@
                 }
             }
         }
+        context.setTxnStatus(TransactionStatus.PREPARED);
+        context.persist(ioManager);
+        context.resetAcksReceived();
+        sendJobCommitMessages(context);
+
+        synchronized (context) {
+            try {
+                context.wait();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new ACIDException(e);
+            }
+        }
         txnContextRepository.remove(jobId);
     }
 
@@ -95,17 +107,21 @@
     @Override
     public void handleJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
             Map<String, ILSMComponentId> componentIdMap) {
-        IGlobalTransactionContext context = getTransactionContext(jobId);
+        IGlobalTransactionContext context = txnContextRepository.get(jobId);
+        if (context == null) {
+            LOGGER.warn("JobPreparedMessage received for jobId " + jobId
+                    + ", which does not exist. The transaction for the job is already aborted");
+            return;
+        }
         if (context.getNodeResourceMap().containsKey(nodeId)) {
             context.getNodeResourceMap().get(nodeId).putAll(componentIdMap);
         } else {
             context.getNodeResourceMap().put(nodeId, componentIdMap);
         }
         if (context.incrementAndGetAcksReceived() == context.getNumPartitions()) {
-            context.setTxnStatus(TransactionStatus.PREPARED);
-            context.persist(ioManager);
-            context.resetAcksReceived();
-            sendJobCommitMessages(context);
+            synchronized (context) {
+                context.notifyAll();
+            }
         }
     }
 
@@ -127,10 +143,10 @@
         if (context.incrementAndGetAcksReceived() == context.getNumNodes()) {
             context.delete(ioManager);
             context.setTxnStatus(TransactionStatus.COMMITTED);
-            sendEnableMergeMessages(context);
             synchronized (context) {
                 context.notifyAll();
             }
+            sendEnableMergeMessages(context);
         }
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f390237..9f292e7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -261,6 +261,7 @@
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
 import org.apache.hyracks.storage.am.lsm.invertedindex.fulltext.TokenizerCategory;
 import org.apache.hyracks.util.LogRedactionUtil;
 import org.apache.logging.log4j.Level;
@@ -3524,8 +3525,11 @@
             if (spec != null && !isCompileOnly()) {
                 atomic = dataset.isAtomic();
                 if (atomic) {
-                    int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(spec).size();
-                    int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(spec);
+                    int numParticipatingNodes = appCtx.getNodeJobTracker()
+                            .getJobParticipatingNodes(spec, LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class)
+                            .size();
+                    int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(spec,
+                            LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class);
                     List<Integer> participatingDatasetIds = new ArrayList<>();
                     participatingDatasetIds.add(dataset.getDatasetId());
                     spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(participatingDatasetIds,
@@ -3613,8 +3617,11 @@
                         ((InsertStatement) stmt).getDatasetName());
                 atomic = ds.isAtomic();
                 if (atomic) {
-                    int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size();
-                    int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec);
+                    int numParticipatingNodes = appCtx.getNodeJobTracker()
+                            .getJobParticipatingNodes(jobSpec, LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class)
+                            .size();
+                    int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec,
+                            LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class);
                     List<Integer> participatingDatasetIds = new ArrayList<>();
                     participatingDatasetIds.add(ds.getDatasetId());
                     jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
@@ -3672,8 +3679,11 @@
                 Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
                 atomic = ds.isAtomic();
                 if (atomic) {
-                    int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size();
-                    int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec);
+                    int numParticipatingNodes = appCtx.getNodeJobTracker()
+                            .getJobParticipatingNodes(jobSpec, LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class)
+                            .size();
+                    int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec,
+                            LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class);
                     List<Integer> participatingDatasetIds = new ArrayList<>();
                     participatingDatasetIds.add(ds.getDatasetId());
                     jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
@@ -4791,8 +4801,11 @@
                         ((InsertStatement) atomicStatement).getDatasetName());
                 atomic = ds.isAtomic();
                 if (atomic) {
-                    int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size();
-                    int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec);
+                    int numParticipatingNodes = appCtx.getNodeJobTracker()
+                            .getJobParticipatingNodes(jobSpec, LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class)
+                            .size();
+                    int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec,
+                            LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class);
                     List<Integer> participatingDatasetIds = new ArrayList<>();
                     participatingDatasetIds.add(ds.getDatasetId());
                     jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.1.ddl.sqlpp
new file mode 100644
index 0000000..d1216e1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.1.ddl.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create dataset page_views primary key (user:string);
+
+create dataset tmp primary key (id:uuid) autogenerated;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.2.update.sqlpp
new file mode 100644
index 0000000..a29e59b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.2.update.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+load dataset page_views using localfs
+(("path"="asterix_nc1://data/page_views.adm"),("format"="adm"));
+
+insert into tmp(
+  from page_views as t
+  group by t.user
+  group as g
+  select value {
+     "groups": (select value g.t from g)
+  }
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.3.query.sqlpp
new file mode 100644
index 0000000..b7a8bdc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.3.query.sqlpp
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+WITH beth AS(
+  FROM tmp AS i, i.groups AS item
+  SELECT DISTINCT VALUE
+  {
+    "user": item.user,
+    "action": item.action
+  }
+)
+,
+rev AS(
+  FROM tmp i, i.groups AS item
+  SELECT DISTINCT VALUE
+  {
+     "user": item.user,
+     "estimated_revenue":  item.estimated_revenue
+  }
+)
+,
+ts AS (
+  FROM tmp i, i.groups AS item
+  SELECT DISTINCT VALUE
+  {
+     "user": item.user,
+     "timespent": item.timespent
+  }
+)
+
+FROM beth AS a,
+     ts AS b,
+     rev AS c
+WHERE a.user=b.user AND a.user=c.user AND b.user=c.user
+SELECT VALUE a.user
+ORDER BY a.user;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.1.ddl.sqlpp
new file mode 100644
index 0000000..8035b89
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.1.ddl.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+drop nodegroup group_test if exists;
+create  nodegroup group_test on
+    asterix_nc1
+;
+
+create dataset Points
+primary key (x:int, y:int)
+WITH {"node-group":{"name":"group_test"}};;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.2.update.sqlpp
new file mode 100644
index 0000000..a8b70cd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.2.update.sqlpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into Points
+{"x": 9, "y": 15};
+
+insert into Points
+{"x": 15, "y": 40};
+
+insert into Points
+{"x": 20, "y": 50};
+
+insert into Points
+{"x": 50, "y": 200};
+
+insert into Points
+{"x": 60, "y": 40};
+
+insert into Points
+{"x": 101, "y": 80};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.3.query.sqlpp
new file mode 100644
index 0000000..777499c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.3.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+use test;
+
+select value p
+from Points p
+where p.x>10 and p.x<100
+and p.y>10 and p.y<100
+order by p.x;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-3/atomic-statements-3.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-3/atomic-statements-3.3.adm
new file mode 100644
index 0000000..80d0c73
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-3/atomic-statements-3.3.adm
@@ -0,0 +1,2 @@
+"Bill"
+"John"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-4/atomic-statements-4.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-4/atomic-statements-4.3.adm
new file mode 100644
index 0000000..625f0fd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-4/atomic-statements-4.3.adm
@@ -0,0 +1,3 @@
+{ "x": 15, "y": 40 }
+{ "x": 20, "y": 50 }
+{ "x": 60, "y": 40 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index d1a59a3..d24873e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -4145,24 +4145,24 @@
         -->
   </test-group>
   <test-group name="ddl">
-<!--    <test-case FilePath="ddl">-->
-<!--      <compilation-unit name="create-dataset-1">-->
-<!--        <output-dir compare="Clean-JSON">create-dataset-1</output-dir>-->
-<!--      </compilation-unit>-->
-<!--    </test-case>-->
-<!--    <test-case FilePath="ddl">-->
-<!--      <compilation-unit name="create-dataset-2">-->
-<!--        <output-dir compare="Clean-JSON">create-dataset-2</output-dir>-->
-<!--        <source-location>false</source-location>-->
-<!--        <expected-error>type mismatch: missing a required closed field my_id: string</expected-error>-->
-<!--      </compilation-unit>-->
-<!--    </test-case>-->
-<!--    <test-case FilePath="ddl">-->
-<!--      <compilation-unit name="create-dataset-3">-->
-<!--        <output-dir compare="Clean-JSON">create-dataset-3</output-dir>-->
-<!--        <expected-error>ASX1077: Cannot find dataset non_existent in dataverse test nor an alias with name non_existent (in line 23, at column 21)</expected-error>-->
-<!--      </compilation-unit>-->
-<!--    </test-case>-->
+    <test-case FilePath="ddl">
+      <compilation-unit name="create-dataset-1">
+        <output-dir compare="Clean-JSON">create-dataset-1</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="ddl">
+      <compilation-unit name="create-dataset-2">
+        <output-dir compare="Clean-JSON">create-dataset-2</output-dir>
+        <source-location>false</source-location>
+        <expected-error>type mismatch: missing a required closed field my_id: string</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="ddl">
+      <compilation-unit name="create-dataset-3">
+        <output-dir compare="Clean-JSON">create-dataset-3</output-dir>
+        <expected-error>ASX1077: Cannot find dataset non_existent in dataverse test nor an alias with name non_existent (in line 23, at column 21)</expected-error>
+      </compilation-unit>
+    </test-case>
     <test-case FilePath="ddl">
       <compilation-unit name="analyze-dataset-1">
         <output-dir compare="Text">analyze-dataset-1</output-dir>
@@ -16321,25 +16321,35 @@
       </compilation-unit>
     </test-case>
   </test-group>
-<!--  <test-group name="copy">-->
-<!--    <test-case FilePath="copy">-->
-<!--      <compilation-unit name="copy-1">-->
-<!--        <output-dir compare="Text">copy-1</output-dir>-->
-<!--      </compilation-unit>-->
-<!--    </test-case>-->
-<!--  </test-group>-->
-<!--  <test-group name="atomic-statements">-->
-<!--    <test-case FilePath="atomic-statements">-->
-<!--      <compilation-unit name="atomic-statements-1">-->
-<!--        <output-dir compare="Clean-JSON">atomic-statements-1</output-dir>-->
-<!--        <expected-error>HYR0033: Inserting duplicate keys into the primary storage</expected-error>-->
-<!--        <source-location>false</source-location>-->
-<!--      </compilation-unit>-->
-<!--    </test-case>-->
-<!--    <test-case FilePath="atomic-statements">-->
-<!--      <compilation-unit name="atomic-statements-2">-->
-<!--        <output-dir compare="Clean-JSON">atomic-statements-2</output-dir>-->
-<!--      </compilation-unit>-->
-<!--    </test-case>-->
-<!--  </test-group>-->
+  <test-group name="copy">
+    <test-case FilePath="copy">
+      <compilation-unit name="copy-1">
+        <output-dir compare="Text">copy-1</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
+  <test-group name="atomic-statements">
+    <test-case FilePath="atomic-statements">
+      <compilation-unit name="atomic-statements-1">
+        <output-dir compare="Clean-JSON">atomic-statements-1</output-dir>
+        <expected-error>HYR0033: Inserting duplicate keys into the primary storage</expected-error>
+        <source-location>false</source-location>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="atomic-statements">
+      <compilation-unit name="atomic-statements-2">
+        <output-dir compare="Clean-JSON">atomic-statements-2</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="atomic-statements">
+      <compilation-unit name="atomic-statements-3">
+        <output-dir compare="Clean-JSON">atomic-statements-3</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="atomic-statements">
+      <compilation-unit name="atomic-statements-4">
+        <output-dir compare="Clean-JSON">atomic-statements-4</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
 </test-suite>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
index 27a3d49..66ca95e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
@@ -21,6 +21,7 @@
 import java.util.Set;
 
 import org.apache.hyracks.api.application.IClusterLifecycleListener;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
@@ -44,8 +45,8 @@
      * @param spec
      * @return The participating nodes in the job execution
      */
-    Set<String> getJobParticipatingNodes(JobSpecification spec);
+    Set<String> getJobParticipatingNodes(JobSpecification spec, Class<? extends IOperatorDescriptor> operatorClass);
 
-    int getNumParticipatingPartitions(JobSpecification spec);
+    int getNumParticipatingPartitions(JobSpecification spec, Class<? extends IOperatorDescriptor> operatorClass);
 
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index a1980b6..3ec2d22 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -294,6 +294,8 @@
     }
 
     public void clear() throws HyracksDataException {
+        List<FlushOperation> flushes = new ArrayList<>(getScheduledFlushes());
+        LSMIndexUtil.waitFor(flushes);
         deleteMemoryComponent(false);
         Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
         for (ILSMIndex lsmIndex : indexes) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
index 52fba60..56d3eea 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
@@ -34,6 +34,9 @@
 import org.apache.hyracks.api.constraints.Constraint;
 import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
 import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
+import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
@@ -46,7 +49,7 @@
 
     @Override
     public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) {
-        getJobParticipatingNodes(spec).stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId));
+        getJobParticipatingNodes(spec, null).stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId));
     }
 
     @Override
@@ -76,17 +79,42 @@
     }
 
     @Override
-    public Set<String> getJobParticipatingNodes(JobSpecification spec) {
-        return spec.getUserConstraints().stream().map(Constraint::getRValue)
-                .filter(ce -> ce.getTag() == ExpressionTag.CONSTANT).map(ConstantExpression.class::cast)
-                .map(ConstantExpression::getValue).map(Object::toString).filter(nodeJobs::containsKey)
-                .collect(Collectors.toSet());
+    public Set<String> getJobParticipatingNodes(JobSpecification spec,
+            Class<? extends IOperatorDescriptor> operatorClass) {
+        if (operatorClass != null) {
+            List<OperatorDescriptorId> operatorDescriptorIds =
+                    spec.getOperatorMap().entrySet().stream().filter(op -> operatorClass.isInstance(op.getValue()))
+                            .map(Map.Entry::getKey).collect(Collectors.toList());
+            return spec.getUserConstraints().stream()
+                    .filter(ce -> ce.getLValue().getTag() == ExpressionTag.PARTITION_LOCATION && operatorDescriptorIds
+                            .contains(((PartitionLocationExpression) ce.getLValue()).getOperatorDescriptorId()))
+                    .map(Constraint::getRValue).map(ConstantExpression.class::cast).map(ConstantExpression::getValue)
+                    .map(Object::toString).filter(nodeJobs::containsKey).collect(Collectors.toSet());
+        } else {
+            return spec.getUserConstraints().stream().map(Constraint::getRValue)
+                    .filter(ce -> ce.getTag() == ExpressionTag.CONSTANT).map(ConstantExpression.class::cast)
+                    .map(ConstantExpression::getValue).map(Object::toString).filter(nodeJobs::containsKey)
+                    .collect(Collectors.toSet());
+        }
     }
 
     @Override
-    public int getNumParticipatingPartitions(JobSpecification spec) {
-        return spec.getUserConstraints().stream().filter(ce -> ce.getLValue() instanceof PartitionCountExpression)
-                .map(Constraint::getRValue).map(ConstantExpression.class::cast).map(ConstantExpression::getValue)
-                .map(Object::toString).map(Integer::parseInt).max(Integer::compare).get();
+    public int getNumParticipatingPartitions(JobSpecification spec,
+            Class<? extends IOperatorDescriptor> operatorClass) {
+        if (operatorClass != null) {
+            List<OperatorDescriptorId> operatorDescriptorIds =
+                    spec.getOperatorMap().entrySet().stream().filter(op -> operatorClass.isInstance(op.getValue()))
+                            .map(Map.Entry::getKey).collect(Collectors.toList());
+            return spec.getUserConstraints().stream()
+                    .filter(ce -> ce.getLValue().getTag() == ExpressionTag.PARTITION_COUNT && operatorDescriptorIds
+                            .contains(((PartitionCountExpression) ce.getLValue()).getOperatorDescriptorId()))
+                    .map(Constraint::getRValue).map(ConstantExpression.class::cast).map(ConstantExpression::getValue)
+                    .map(Object::toString).map(Integer::parseInt).max(Integer::compare).get();
+        } else {
+            return spec.getUserConstraints().stream()
+                    .filter(ce -> ce.getLValue().getTag() == ExpressionTag.PARTITION_COUNT).map(Constraint::getRValue)
+                    .map(ConstantExpression.class::cast).map(ConstantExpression::getValue).map(Object::toString)
+                    .map(Integer::parseInt).max(Integer::compare).get();
+        }
     }
 }