[NO ISSUE][TX] Concurrently wirte checkpoints for atomic statements

- user model changes: no
- storage format changes: no
- interface changes: no

Change-Id: I3846bfa534ebe4077f55f3a9acccd3dc3d8d0cda
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17725
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
index fac023c..e653b7a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
@@ -18,7 +18,11 @@
  */
 package org.apache.asterix.app.message;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -46,13 +50,29 @@
     @Override
     public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
         IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
+        ForkJoinPool commonPool = ForkJoinPool.commonPool();
+        List<Future> futures = new ArrayList<>();
         for (Integer datasetId : datasetIds) {
             for (IndexInfo indexInfo : datasetLifecycleManager.getDatasetInfo(datasetId).getIndexes().values()) {
                 if (indexInfo.getIndex().isPrimaryIndex()) {
-                    ((PrimaryIndexOperationTracker) indexInfo.getIndex().getOperationTracker()).commit();
+                    futures.add(commonPool.submit(() -> {
+                        try {
+                            ((PrimaryIndexOperationTracker) indexInfo.getIndex().getOperationTracker()).commit();
+                        } catch (HyracksDataException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }));
                 }
             }
         }
+        for (Future f : futures) {
+            try {
+                f.get();
+            } catch (ExecutionException e) {
+                futures.forEach(future -> future.cancel(true));
+                throw HyracksDataException.create(e);
+            }
+        }
         AtomicJobCompletionMessage message =
                 new AtomicJobCompletionMessage(jobId, appCtx.getServiceContext().getNodeId());
         NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();