[NO ISSUE][TX] Concurrently wirte checkpoints for atomic statements
- user model changes: no
- storage format changes: no
- interface changes: no
Change-Id: I3846bfa534ebe4077f55f3a9acccd3dc3d8d0cda
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17725
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
index fac023c..e653b7a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
@@ -18,7 +18,11 @@
*/
package org.apache.asterix.app.message;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -46,13 +50,29 @@
@Override
public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
+ ForkJoinPool commonPool = ForkJoinPool.commonPool();
+ List<Future> futures = new ArrayList<>();
for (Integer datasetId : datasetIds) {
for (IndexInfo indexInfo : datasetLifecycleManager.getDatasetInfo(datasetId).getIndexes().values()) {
if (indexInfo.getIndex().isPrimaryIndex()) {
- ((PrimaryIndexOperationTracker) indexInfo.getIndex().getOperationTracker()).commit();
+ futures.add(commonPool.submit(() -> {
+ try {
+ ((PrimaryIndexOperationTracker) indexInfo.getIndex().getOperationTracker()).commit();
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+ }));
}
}
}
+ for (Future f : futures) {
+ try {
+ f.get();
+ } catch (ExecutionException e) {
+ futures.forEach(future -> future.cancel(true));
+ throw HyracksDataException.create(e);
+ }
+ }
AtomicJobCompletionMessage message =
new AtomicJobCompletionMessage(jobId, appCtx.getServiceContext().getNodeId());
NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();