[NO ISSUE][TX]: Statement level atomicity for inserts/upserts/deletes
- user model changes: yes
- storage format changes: no
- interface changes: yes
Details:
With this change we introduce statement level atomicity for
inserts/upserts/deletes. The statement level atomicity is
only enabled for datasets created without any type specification.
Change-Id: I3b4aefeba07be921d128255393aec1b703198a40
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17598
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Murtadha Al Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
index 0b8024b..bb7be73 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
@@ -35,6 +35,7 @@
import org.apache.asterix.common.api.IRequestTracker;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.cluster.IGlobalTxManager;
import org.apache.asterix.common.config.ActiveProperties;
import org.apache.asterix.common.config.BuildProperties;
import org.apache.asterix.common.config.CloudProperties;
@@ -74,6 +75,7 @@
import org.apache.hyracks.api.job.IJobLifecycleListener;
import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.client.result.ResultSet;
+import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.ipc.impl.HyracksConnection;
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.util.NetworkUtil;
@@ -119,6 +121,8 @@
private final IAdapterFactoryService adapterFactoryService;
private final ReentrantReadWriteLock compilationLock = new ReentrantReadWriteLock(true);
private final IDataPartitioningProvider dataPartitioningProvider;
+ private final IGlobalTxManager globalTxManager;
+ private final IOManager ioManager;
public CcApplicationContext(ICCServiceContext ccServiceCtx, HyracksConnection hcc,
Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
@@ -126,7 +130,8 @@
IStorageComponentProvider storageComponentProvider, IMetadataLockManager mdLockManager,
IMetadataLockUtil mdLockUtil, IReceptionistFactory receptionistFactory,
IConfigValidatorFactory configValidatorFactory, Object extensionManager,
- IAdapterFactoryService adapterFactoryService) throws AlgebricksException, IOException {
+ IAdapterFactoryService adapterFactoryService, IGlobalTxManager globalTxManager, IOManager ioManager,
+ CloudProperties cloudProperties) throws AlgebricksException, IOException {
this.ccServiceCtx = ccServiceCtx;
this.hcc = hcc;
this.activeLifeCycleListener = activeLifeCycleListener;
@@ -142,7 +147,7 @@
activeProperties = new ActiveProperties(propertiesAccessor);
extensionProperties = new ExtensionProperties(propertiesAccessor);
replicationProperties = new ReplicationProperties(propertiesAccessor);
- cloudProperties = new CloudProperties(propertiesAccessor);
+ this.cloudProperties = cloudProperties;
this.ftStrategy = ftStrategy;
this.buildProperties = new BuildProperties(propertiesAccessor);
this.messagingProperties = new MessagingProperties(propertiesAccessor);
@@ -163,6 +168,8 @@
configValidator = configValidatorFactory.create();
this.adapterFactoryService = adapterFactoryService;
dataPartitioningProvider = DataPartitioningProvider.create(this);
+ this.globalTxManager = globalTxManager;
+ this.ioManager = ioManager;
}
@Override
@@ -381,4 +388,14 @@
public CloudProperties getCloudProperties() {
return cloudProperties;
}
+
+ @Override
+ public IGlobalTxManager getGlobalTxManager() {
+ return globalTxManager;
+ }
+
+ @Override
+ public IOManager getIoManager() {
+ return ioManager;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
new file mode 100644
index 0000000..beb55d8
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.cc;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.asterix.app.message.AtomicJobCommitMessage;
+import org.apache.asterix.app.message.AtomicJobRollbackMessage;
+import org.apache.asterix.app.message.EnableMergeMessage;
+import org.apache.asterix.common.cluster.IGlobalTxManager;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.transactions.IGlobalTransactionContext;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.transaction.management.service.transaction.GlobalTransactionContext;
+import org.apache.asterix.transaction.management.service.transaction.GlobalTxInfo;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class GlobalTxManager implements IGlobalTxManager {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final Map<JobId, IGlobalTransactionContext> txnContextRepository = new ConcurrentHashMap<>();
+ private final ICCServiceContext serviceContext;
+ private final IOManager ioManager;
+ public static final String GlOBAL_TX_PROPERTY_NAME = "GlobalTxProperty";
+
+ public GlobalTxManager(ICCServiceContext serviceContext, IOManager ioManager) {
+ this.serviceContext = serviceContext;
+ this.ioManager = ioManager;
+ }
+
+ @Override
+ public IGlobalTransactionContext beginTransaction(JobId jobId, int numParticipatingNodes,
+ int numParticipatingPartitions, List<Integer> participatingDatasetIds) throws ACIDException {
+ GlobalTransactionContext context = new GlobalTransactionContext(jobId, participatingDatasetIds,
+ numParticipatingNodes, numParticipatingPartitions);
+ txnContextRepository.put(jobId, context);
+ return context;
+ }
+
+ @Override
+ public void commitTransaction(JobId jobId) throws ACIDException {
+ IGlobalTransactionContext context = getTransactionContext(jobId);
+ if (context.getTxnStatus() == TransactionStatus.ACTIVE
+ || context.getTxnStatus() == TransactionStatus.PREPARED) {
+ synchronized (context) {
+ try {
+ context.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ACIDException(e);
+ }
+ }
+ }
+ txnContextRepository.remove(jobId);
+ }
+
+ @Override
+ public IGlobalTransactionContext getTransactionContext(JobId jobId) throws ACIDException {
+ IGlobalTransactionContext context = txnContextRepository.get(jobId);
+ if (context == null) {
+ throw new ACIDException("Transaction for jobId " + jobId + " does not exist");
+ }
+ return context;
+ }
+
+ @Override
+ public void handleJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
+ Map<String, ILSMComponentId> componentIdMap) {
+ IGlobalTransactionContext context = getTransactionContext(jobId);
+ if (context.getNodeResourceMap().containsKey(nodeId)) {
+ context.getNodeResourceMap().get(nodeId).putAll(componentIdMap);
+ } else {
+ context.getNodeResourceMap().put(nodeId, componentIdMap);
+ }
+ if (context.incrementAndGetAcksReceived() == context.getNumPartitions()) {
+ context.setTxnStatus(TransactionStatus.PREPARED);
+ context.persist(ioManager);
+ context.resetAcksReceived();
+ sendJobCommitMessages(context);
+ }
+ }
+
+ private void sendJobCommitMessages(IGlobalTransactionContext context) {
+ for (String nodeId : context.getNodeResourceMap().keySet()) {
+ AtomicJobCommitMessage message = new AtomicJobCommitMessage(context.getJobId(), context.getDatasetIds());
+ try {
+ ((ICCMessageBroker) serviceContext.getMessageBroker()).sendRealTimeApplicationMessageToNC(message,
+ nodeId);
+ } catch (Exception e) {
+ throw new ACIDException(e);
+ }
+ }
+ }
+
+ @Override
+ public void handleJobCompletionMessage(JobId jobId, String nodeId) {
+ IGlobalTransactionContext context = getTransactionContext(jobId);
+ if (context.incrementAndGetAcksReceived() == context.getNumNodes()) {
+ context.delete(ioManager);
+ context.setTxnStatus(TransactionStatus.COMMITTED);
+ sendEnableMergeMessages(context);
+ synchronized (context) {
+ context.notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public void handleJobRollbackCompletionMessage(JobId jobId, String nodeId) {
+ IGlobalTransactionContext context = getTransactionContext(jobId);
+ if (context.incrementAndGetAcksReceived() == context.getNumNodes()) {
+ context.setTxnStatus(TransactionStatus.ROLLBACK);
+ context.delete(ioManager);
+ synchronized (context) {
+ context.notifyAll();
+ }
+ }
+ }
+
+ private void sendEnableMergeMessages(IGlobalTransactionContext context) {
+ for (String nodeId : context.getNodeResourceMap().keySet()) {
+ for (Integer datasetId : context.getDatasetIds()) {
+ EnableMergeMessage message = new EnableMergeMessage(context.getJobId(), datasetId);
+ try {
+ ((ICCMessageBroker) serviceContext.getMessageBroker()).sendRealTimeApplicationMessageToNC(message,
+ nodeId);
+ } catch (Exception e) {
+ throw new ACIDException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void rollback() throws Exception {
+ Set<FileReference> txnLogFileRefs = ioManager.list(ioManager.resolve(StorageConstants.CC_TX_LOG_DIR));
+ for (FileReference txnLogFileRef : txnLogFileRefs) {
+ IGlobalTransactionContext context = new GlobalTransactionContext(txnLogFileRef, ioManager);
+ txnContextRepository.put(context.getJobId(), context);
+ sendJobRollbackMessages(context);
+ }
+ }
+
+ private void sendJobRollbackMessages(IGlobalTransactionContext context) throws Exception {
+ JobId jobId = context.getJobId();
+ for (String nodeId : context.getNodeResourceMap().keySet()) {
+ AtomicJobRollbackMessage rollbackMessage = new AtomicJobRollbackMessage(jobId, context.getDatasetIds(),
+ context.getNodeResourceMap().get(nodeId));
+ ((ICCMessageBroker) serviceContext.getMessageBroker()).sendRealTimeApplicationMessageToNC(rollbackMessage,
+ nodeId);
+ }
+ synchronized (context) {
+ try {
+ context.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ACIDException(e);
+ }
+ }
+ txnContextRepository.remove(jobId);
+ }
+
+ @Override
+ public void abortTransaction(JobId jobId) throws Exception {
+ IGlobalTransactionContext context = getTransactionContext(jobId);
+ if (context.getTxnStatus() == TransactionStatus.PREPARED) {
+ sendJobRollbackMessages(context);
+ }
+ txnContextRepository.remove(jobId);
+ }
+
+ @Override
+ public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+ GlobalTxInfo globalTxInfo = (GlobalTxInfo) spec.getProperty(GlOBAL_TX_PROPERTY_NAME);
+ if (globalTxInfo != null) {
+ beginTransaction(jobId, globalTxInfo.getNumNodes(), globalTxInfo.getNumPartitions(),
+ globalTxInfo.getDatasetIds());
+ }
+ }
+
+ @Override
+ public void notifyJobStart(JobId jobId) throws HyracksException {
+
+ }
+
+ @Override
+ public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
new file mode 100644
index 0000000..55ae225
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.List;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Message sent from CC to all NCs asking to commit an atomic statement/job.
+ */
+public class AtomicJobCommitMessage implements INcAddressedMessage {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 1L;
+ private final JobId jobId;
+ private final List<Integer> datasetIds;
+
+ public AtomicJobCommitMessage(JobId jobId, List<Integer> datasetIds) {
+ this.jobId = jobId;
+ this.datasetIds = datasetIds;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
+ for (Integer datasetId : datasetIds) {
+ for (IndexInfo indexInfo : datasetLifecycleManager.getDatasetInfo(datasetId).getIndexes().values()) {
+ if (indexInfo.getIndex().isPrimaryIndex()) {
+ ((PrimaryIndexOperationTracker) indexInfo.getIndex().getOperationTracker()).commit();
+ }
+ }
+ }
+ AtomicJobCompletionMessage message =
+ new AtomicJobCompletionMessage(jobId, appCtx.getServiceContext().getNodeId());
+ NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ try {
+ mb.sendRealTimeMessageToCC(jobId.getCcId(), message);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCompletionMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCompletionMessage.java
new file mode 100644
index 0000000..869e5d2
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCompletionMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Message sent from NC to CC on successful local commit of an atomic statement/job.
+ */
+public class AtomicJobCompletionMessage implements ICcAddressedMessage {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 1L;
+ private final String nodeId;
+ private final JobId jobId;
+
+ public AtomicJobCompletionMessage(JobId jobId, String nodeId) {
+ this.jobId = jobId;
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ appCtx.getGlobalTxManager().handleJobCompletionMessage(jobId, nodeId);
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackCompleteMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackCompleteMessage.java
new file mode 100644
index 0000000..90e1fbd
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackCompleteMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Message sent from an NC to CC after successful local rollback of an atomic statement/job.
+ */
+public class AtomicJobRollbackCompleteMessage implements ICcAddressedMessage {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 1L;
+ private final String nodeId;
+ private final JobId jobId;
+
+ public AtomicJobRollbackCompleteMessage(JobId jobId, String nodeId) {
+ this.jobId = jobId;
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ appCtx.getGlobalTxManager().handleJobRollbackCompletionMessage(jobId, nodeId);
+ }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
new file mode 100644
index 0000000..aeb6be2
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.IndexCheckpoint;
+import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Message sent from CC to all NCs to rollback an atomic statement/job.
+ */
+public class AtomicJobRollbackMessage implements INcAddressedMessage {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 1L;
+ private final JobId jobId;
+ private final List<Integer> datasetIds;
+ private final Map<String, ILSMComponentId> componentIdMap;
+
+ public AtomicJobRollbackMessage(JobId jobId, List<Integer> datasetIds,
+ Map<String, ILSMComponentId> componentIdMap) {
+ this.jobId = jobId;
+ this.datasetIds = datasetIds;
+ this.componentIdMap = componentIdMap;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
+ datasetLifecycleManager.getIndexCheckpointManagerProvider();
+ componentIdMap.forEach((k, v) -> {
+ try {
+ IIndexCheckpointManager checkpointManager = indexCheckpointManagerProvider.get(ResourceReference.of(k));
+ if (checkpointManager.getCheckpointCount() > 0) {
+ IndexCheckpoint checkpoint = checkpointManager.getLatest();
+ if (checkpoint.getLastComponentId() == v.getMaxId()) {
+ checkpointManager.deleteLatest(v.getMaxId(), 1);
+ }
+ }
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ AtomicJobRollbackCompleteMessage message =
+ new AtomicJobRollbackCompleteMessage(jobId, appCtx.getServiceContext().getNodeId());
+ NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ try {
+ mb.sendRealTimeMessageToPrimaryCC(message);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/EnableMergeMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/EnableMergeMessage.java
new file mode 100644
index 0000000..900da82
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/EnableMergeMessage.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class EnableMergeMessage implements INcAddressedMessage {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 1L;
+ private final JobId jobId;
+ private final int datasetId;
+
+ public EnableMergeMessage(JobId jobId, int datasetId) {
+ this.jobId = jobId;
+ this.datasetId = datasetId;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
+ for (IndexInfo indexInfo : datasetLifecycleManager.getDatasetInfo(datasetId).getIndexes().values()) {
+ if (indexInfo.getIndex().isPrimaryIndex()) {
+ indexInfo.getIndex().getMergePolicy().diskComponentAdded(indexInfo.getIndex(), false);;
+ }
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 372cf69..9383f06 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -227,6 +227,22 @@
return IndexCheckpoint.fromJson(new String(ioManager.readAllBytes(checkpointPath)));
}
+ @Override
+ public void deleteLatest(long latestId, int historyToDelete) {
+ try {
+ final Collection<FileReference> checkpointFiles = ioManager.list(indexPath, CHECKPOINT_FILE_FILTER);
+ if (!checkpointFiles.isEmpty()) {
+ for (FileReference checkpointFile : checkpointFiles) {
+ if (getCheckpointIdFromFileName(checkpointFile) > (latestId - historyToDelete)) {
+ ioManager.delete(checkpointFile);
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn(() -> "Couldn't delete history checkpoints at " + indexPath, e);
+ }
+ }
+
private void deleteHistory(long latestId, int historyToKeep) {
try {
final Collection<FileReference> checkpointFiles = ioManager.list(indexPath, CHECKPOINT_FILE_FILTER);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 28a6d2d..dc165a0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -55,6 +55,7 @@
import org.apache.asterix.app.active.ActiveEntityEventsListener;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.active.FeedEventsListener;
+import org.apache.asterix.app.cc.GlobalTxManager;
import org.apache.asterix.app.external.ExternalLibraryJobUtils;
import org.apache.asterix.app.result.ExecutionError;
import org.apache.asterix.app.result.ResultHandle;
@@ -69,6 +70,7 @@
import org.apache.asterix.common.api.IRequestTracker;
import org.apache.asterix.common.api.IResponsePrinter;
import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.cluster.IGlobalTxManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.config.DatasetConfig.TransactionState;
@@ -209,6 +211,7 @@
import org.apache.asterix.runtime.fulltext.StopwordsFullTextFilterDescriptor;
import org.apache.asterix.runtime.operators.DatasetStreamStats;
import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
+import org.apache.asterix.transaction.management.service.transaction.GlobalTxInfo;
import org.apache.asterix.translator.AbstractLangTranslator;
import org.apache.asterix.translator.ClientRequest;
import org.apache.asterix.translator.CompiledStatements.CompiledCopyFromFileStatement;
@@ -291,6 +294,7 @@
protected final IResponsePrinter responsePrinter;
protected final WarningCollector warningCollector;
protected final ReentrantReadWriteLock compilationLock;
+ protected final IGlobalTxManager globalTxManager;
public QueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
ILangCompilationProvider compilationProvider, ExecutorService executorService,
@@ -313,6 +317,7 @@
if (appCtx.getServiceContext().getAppConfig().getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)) {
this.jobFlags.add(JobFlag.ENFORCE_CONTRACT);
}
+ this.globalTxManager = appCtx.getGlobalTxManager();
}
public SessionOutput getSessionOutput() {
@@ -3481,6 +3486,8 @@
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
+ JobId jobId = null;
+ boolean atomic = false;
try {
metadataProvider.setWriteTransaction(true);
Dataset dataset = metadataProvider.findDataset(dataverseName, copyStmt.getDatasetName());
@@ -3503,9 +3510,32 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
if (spec != null && !isCompileOnly()) {
- runJob(hcc, spec);
+ atomic = dataset.isAtomic();
+ if (atomic) {
+ int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(spec).size();
+ int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(spec);
+ List<Integer> participatingDatasetIds = new ArrayList<>();
+ participatingDatasetIds.add(dataset.getDatasetId());
+ spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(participatingDatasetIds,
+ numParticipatingNodes, numParticipatingPartitions));
+ }
+ jobId = JobUtils.runJob(hcc, spec, jobFlags, false);
+
+ String nameBefore = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
+ hcc.waitForCompletion(jobId);
+ } finally {
+ Thread.currentThread().setName(nameBefore);
+ }
+ if (atomic) {
+ globalTxManager.commitTransaction(jobId);
+ }
}
} catch (Exception e) {
+ if (atomic && jobId != null) {
+ globalTxManager.abortTransaction(jobId);
+ }
if (bActiveTxn) {
abort(e, e, mdTxnCtx);
}
@@ -3555,18 +3585,45 @@
throw e;
}
};
-
if (stmtInsertUpsert.getReturnExpression() != null) {
deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
- requestParameters, false);
+ requestParameters, false, stmt);
} else {
locker.lock();
+ JobId jobId = null;
+ boolean atomic = false;
try {
final JobSpecification jobSpec = compiler.compile();
if (jobSpec == null) {
return jobSpec;
}
- runJob(hcc, jobSpec);
+ Dataset ds = metadataProvider.findDataset(((InsertStatement) stmt).getDataverseName(),
+ ((InsertStatement) stmt).getDatasetName());
+ atomic = ds.isAtomic();
+ if (atomic) {
+ int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size();
+ int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec);
+ List<Integer> participatingDatasetIds = new ArrayList<>();
+ participatingDatasetIds.add(ds.getDatasetId());
+ jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
+ participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions));
+ }
+ jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+ String nameBefore = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
+ hcc.waitForCompletion(jobId);
+ } finally {
+ Thread.currentThread().setName(nameBefore);
+ }
+ if (atomic) {
+ globalTxManager.commitTransaction(jobId);
+ }
+ } catch (Exception e) {
+ if (atomic && jobId != null) {
+ globalTxManager.abortTransaction(jobId);
+ }
+ throw e;
} finally {
locker.unlock();
}
@@ -3586,6 +3643,8 @@
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
+ boolean atomic = false;
+ JobId jobId = null;
try {
metadataProvider.setWriteTransaction(true);
CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
@@ -3597,12 +3656,34 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
-
if (jobSpec != null && !isCompileOnly()) {
- runJob(hcc, jobSpec);
+ Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
+ atomic = ds.isAtomic();
+ if (atomic) {
+ int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size();
+ int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec);
+ List<Integer> participatingDatasetIds = new ArrayList<>();
+ participatingDatasetIds.add(ds.getDatasetId());
+ jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
+ participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions));
+ }
+ jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+ String nameBefore = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
+ hcc.waitForCompletion(jobId);
+ } finally {
+ Thread.currentThread().setName(nameBefore);
+ }
+ if (atomic) {
+ globalTxManager.commitTransaction(jobId);
+ }
}
return jobSpec;
} catch (Exception e) {
+ if (atomic && jobId != null) {
+ globalTxManager.abortTransaction(jobId);
+ }
if (bActiveTxn) {
abort(e, e, mdTxnCtx);
}
@@ -4557,19 +4638,19 @@
}
};
deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
- requestParameters, true);
+ requestParameters, true, null);
}
private void deliverResult(IHyracksClientConnection hcc, IResultSet resultSet, IStatementCompiler compiler,
MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery,
- ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable)
- throws Exception {
+ ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable,
+ Statement atomicStmt) throws Exception {
final ResultSetId resultSetId = metadataProvider.getResultSetId();
switch (resultDelivery) {
case ASYNC:
MutableBoolean printed = new MutableBoolean(false);
executorService.submit(() -> asyncCreateAndRunJob(hcc, compiler, locker, resultDelivery,
- requestParameters, cancellable, resultSetId, printed, metadataProvider));
+ requestParameters, cancellable, resultSetId, printed, metadataProvider, atomicStmt));
synchronized (printed) {
while (!printed.booleanValue()) {
printed.wait();
@@ -4583,7 +4664,7 @@
responsePrinter.addResultPrinter(new ResultsPrinter(appCtx, resultReader,
metadataProvider.findOutputRecordType(), stats, sessionOutput));
responsePrinter.printResults();
- }, requestParameters, cancellable, appCtx, metadataProvider);
+ }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt);
break;
case DEFERRED:
createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
@@ -4595,7 +4676,7 @@
outMetadata.getResultSets().add(org.apache.commons.lang3.tuple.Triple.of(id, resultSetId,
metadataProvider.findOutputRecordType()));
}
- }, requestParameters, cancellable, appCtx, metadataProvider);
+ }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt);
break;
default:
break;
@@ -4618,7 +4699,7 @@
private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker,
ResultDelivery resultDelivery, IRequestParameters requestParameters, boolean cancellable,
- ResultSetId resultSetId, MutableBoolean printed, MetadataProvider metadataProvider) {
+ ResultSetId resultSetId, MutableBoolean printed, MetadataProvider metadataProvider, Statement atomicStmt) {
Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
try {
createAndRunJob(hcc, jobFlags, jobId, compiler, locker, resultDelivery, id -> {
@@ -4630,7 +4711,7 @@
printed.setTrue();
printed.notify();
}
- }, requestParameters, cancellable, appCtx, metadataProvider);
+ }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt);
} catch (Exception e) {
if (Objects.equals(JobId.INVALID, jobId.getValue())) {
// compilation failed
@@ -4670,10 +4751,10 @@
return p.second;
}
- private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
+ private void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer,
IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx,
- MetadataProvider metadataProvider) throws Exception {
+ MetadataProvider metadataProvider, Statement atomicStatement) throws Exception {
final IRequestTracker requestTracker = appCtx.getRequestTracker();
final ClientRequest clientRequest =
(ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
@@ -4681,6 +4762,8 @@
clientRequest.markCancellable();
}
locker.lock();
+ JobId jobId = null;
+ boolean atomic = false;
try {
final JobSpecification jobSpec = compiler.compile();
if (jobSpec == null) {
@@ -4691,7 +4774,20 @@
appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
// ensure request not cancelled before running job
ensureNotCancelled(clientRequest);
- final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+ if (atomicStatement != null) {
+ Dataset ds = metadataProvider.findDataset(((InsertStatement) atomicStatement).getDataverseName(),
+ ((InsertStatement) atomicStatement).getDatasetName());
+ atomic = ds.isAtomic();
+ if (atomic) {
+ int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size();
+ int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec);
+ List<Integer> participatingDatasetIds = new ArrayList<>();
+ participatingDatasetIds.add(ds.getDatasetId());
+ jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
+ participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions));
+ }
+ }
+ jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
clientRequest.setJobId(jobId);
if (jId != null) {
jId.setValue(jobId);
@@ -4704,7 +4800,13 @@
ensureNotCancelled(clientRequest);
printer.print(jobId);
}
+ if (atomic) {
+ globalTxManager.commitTransaction(jobId);
+ }
} catch (Exception e) {
+ if (atomic && jobId != null) {
+ globalTxManager.abortTransaction(jobId);
+ }
if (org.apache.hyracks.api.util.ExceptionUtils.getRootCause(e) instanceof InterruptedException) {
Thread.currentThread().interrupt();
throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, clientRequest.getId());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 2a66cfd..0e740f7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -23,6 +23,7 @@
import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.SHUTTING_DOWN;
+import static org.apache.hyracks.control.common.controllers.ControllerConfig.Option.CLOUD_DEPLOYMENT;
import java.io.File;
import java.io.IOException;
@@ -54,16 +55,20 @@
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.cc.CCExtensionManager;
import org.apache.asterix.app.cc.CcApplicationContext;
+import org.apache.asterix.app.cc.GlobalTxManager;
import org.apache.asterix.app.config.ConfigValidator;
import org.apache.asterix.app.io.PersistedResourceRegistry;
import org.apache.asterix.app.replication.NcLifecycleCoordinator;
import org.apache.asterix.app.result.JobResultCallback;
+import org.apache.asterix.cloud.CloudManagerProvider;
import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.api.IConfigValidatorFactory;
import org.apache.asterix.common.api.INodeJobTracker;
import org.apache.asterix.common.api.IReceptionistFactory;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.cluster.IGlobalTxManager;
import org.apache.asterix.common.config.AsterixExtension;
+import org.apache.asterix.common.config.CloudProperties;
import org.apache.asterix.common.config.ExtensionProperties;
import org.apache.asterix.common.config.ExternalProperties;
import org.apache.asterix.common.config.GlobalConfig;
@@ -77,6 +82,7 @@
import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.external.adapter.factory.AdapterFactoryService;
import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.messaging.CCMessageBroker;
@@ -99,12 +105,15 @@
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.control.IGatekeeper;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
import org.apache.hyracks.api.result.IJobResultCallback;
import org.apache.hyracks.control.cc.BaseCCApplication;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.nc.io.DefaultDeviceResolver;
+import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.http.api.IServlet;
import org.apache.hyracks.http.server.HttpServer;
import org.apache.hyracks.http.server.HttpServerConfig;
@@ -158,8 +167,19 @@
componentProvider = new StorageComponentProvider();
ccExtensionManager = new CCExtensionManager(new ArrayList<>(getExtensions()));
IGlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager();
+
+ List<IODeviceHandle> devices = new ArrayList<>();
+ devices.add(new IODeviceHandle(new File(StorageConstants.CC_STORAGE_ROOT_DIR), "."));
+ IOManager ioManager = new IOManager(devices, new DefaultDeviceResolver(), 1, 10);
+ CloudProperties cloudProperties = null;
+ if (ccServiceCtx.getAppConfig().getBoolean(CLOUD_DEPLOYMENT)) {
+ cloudProperties = new CloudProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
+ ioManager = (IOManager) CloudManagerProvider.createIOManager(cloudProperties, ioManager);;
+ }
+ IGlobalTxManager globalTxManager = createGlobalTxManager(ioManager);
appCtx = createApplicationContext(null, globalRecoveryManager, lifecycleCoordinator, Receptionist::new,
- ConfigValidator::new, ccExtensionManager, new AdapterFactoryService());
+ ConfigValidator::new, ccExtensionManager, new AdapterFactoryService(), globalTxManager, ioManager,
+ cloudProperties);
final CCConfig ccConfig = controllerService.getCCConfig();
if (System.getProperty("java.rmi.server.hostname") == null) {
System.setProperty("java.rmi.server.hostname", ccConfig.getClusterPublicAddress());
@@ -180,6 +200,7 @@
final INodeJobTracker nodeJobTracker = appCtx.getNodeJobTracker();
ccServiceCtx.addJobLifecycleListener(nodeJobTracker);
ccServiceCtx.addClusterLifecycleListener(nodeJobTracker);
+ ccServiceCtx.addJobLifecycleListener(globalTxManager);
jobCapacityController = new JobCapacityController(controllerService.getResourceManager());
}
@@ -207,18 +228,23 @@
protected ICcApplicationContext createApplicationContext(ILibraryManager libraryManager,
IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator lifecycleCoordinator,
IReceptionistFactory receptionistFactory, IConfigValidatorFactory configValidatorFactory,
- CCExtensionManager ccExtensionManager, IAdapterFactoryService adapterFactoryService)
+ CCExtensionManager ccExtensionManager, IAdapterFactoryService adapterFactoryService,
+ IGlobalTxManager globalTxManager, IOManager ioManager, CloudProperties cloudProperties)
throws AlgebricksException, IOException {
return new CcApplicationContext(ccServiceCtx, hcc, () -> MetadataManager.INSTANCE, globalRecoveryManager,
lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider, new MetadataLockManager(),
createMetadataLockUtil(), receptionistFactory, configValidatorFactory, ccExtensionManager,
- adapterFactoryService);
+ adapterFactoryService, globalTxManager, ioManager, cloudProperties);
}
protected IGlobalRecoveryManager createGlobalRecoveryManager() throws Exception {
return ccExtensionManager.getGlobalRecoveryManager(ccServiceCtx, getHcc(), componentProvider);
}
+ protected IGlobalTxManager createGlobalTxManager(IOManager ioManager) throws Exception {
+ return new GlobalTxManager(ccServiceCtx, ioManager);
+ }
+
protected INcLifecycleCoordinator createNcLifeCycleCoordinator(boolean replicationEnabled) {
return new NcLifecycleCoordinator(ccServiceCtx, replicationEnabled);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 2452b8f..3f76a31 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -117,6 +117,7 @@
performGlobalStorageCleanup(mdTxnCtx, storageGlobalCleanupTimeout);
}
mdTxnCtx = doRecovery(appCtx, mdTxnCtx);
+ rollbackIncompleteAtomicTransactions(appCtx);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
recoveryCompleted = true;
recovering = false;
@@ -127,6 +128,10 @@
appCtx.getClusterStateManager().refreshState();
}
+ protected void rollbackIncompleteAtomicTransactions(ICcApplicationContext appCtx) throws Exception {
+ appCtx.getGlobalTxManager().rollback();
+ }
+
protected void performGlobalStorageCleanup(MetadataTransactionContext mdTxnCtx, int storageGlobalCleanupTimeoutSecs)
throws Exception {
List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.01.ddl.sqlpp
new file mode 100644
index 0000000..e41bd77
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.01.ddl.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create dataset reviews primary key (id: int);
+
+CREATE PRIMARY INDEX review_idx_primary ON reviews;
+
+CREATE INDEX review_idx_review ON reviews(review: string?) TYPE BTREE;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.02.update.sqlpp
new file mode 100644
index 0000000..1358269
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.02.update.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ use test;
+
+insert into reviews ([
+ {"id": 1, "year": null, "quarter": null, "review": "good"},
+ {"id": 2, "year": null, "quarter": null, "review": "good"},
+ {"id": 3, "year": 2018, "quarter": null, "review": "good"},
+ {"id": 4, "year": 2018, "quarter": null, "review": "bad"},
+ {"id": 5, "year": 2018, "quarter": 1, "review": "good"},
+ {"id": 5, "year": 2018, "quarter": 1, "review": "bad"},
+ {"id": 7, "year": 2018, "quarter": 2, "review": "good"},
+ {"id": 8, "year": 2018, "quarter": 2, "review": "bad"},
+ {"id": 9, "year": 2019, "quarter": null, "review": "good"},
+ {"id": 10, "year": 2019, "quarter": null, "review": "bad"}
+]);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.03.query.sqlpp
new file mode 100644
index 0000000..8709326
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.03.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+select value count(*)
+from reviews;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.04.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.04.update.sqlpp
new file mode 100644
index 0000000..4ab1bcc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.04.update.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into reviews ([
+ {"id": 1, "year": null, "quarter": null, "review": "good"},
+ {"id": 2, "year": null, "quarter": null, "review": "good"},
+ {"id": 3, "year": 2018, "quarter": null, "review": "good"},
+ {"id": 4, "year": 2018, "quarter": null, "review": "bad"},
+ {"id": 5, "year": 2018, "quarter": 1, "review": "good"},
+ {"id": 6, "year": 2018, "quarter": 1, "review": "bad"},
+ {"id": 7, "year": 2018, "quarter": 2, "review": "good"},
+ {"id": 8, "year": 2018, "quarter": 2, "review": "bad"},
+ {"id": 9, "year": 2019, "quarter": null, "review": "good"},
+ {"id": 10, "year": 2019, "quarter": null, "review": "bad"}
+]);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.05.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.05.query.sqlpp
new file mode 100644
index 0000000..8709326
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.05.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+select value count(*)
+from reviews;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.06.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.06.ddl.sqlpp
new file mode 100644
index 0000000..3cc900b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.06.ddl.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+drop dataset reviews;
+
+drop dataverse test;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.01.ddl.sqlpp
new file mode 100644
index 0000000..e41bd77
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.01.ddl.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create dataset reviews primary key (id: int);
+
+CREATE PRIMARY INDEX review_idx_primary ON reviews;
+
+CREATE INDEX review_idx_review ON reviews(review: string?) TYPE BTREE;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.02.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.02.query.sqlpp
new file mode 100644
index 0000000..ba1e16a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.02.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into reviews ([
+ {"id": 1, "year": null, "quarter": null, "review": "good"},
+ {"id": 2, "year": null, "quarter": null, "review": "good"},
+ {"id": 3, "year": 2018, "quarter": null, "review": "good"}
+]) returning review;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.03.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.03.update.sqlpp
new file mode 100644
index 0000000..ea608d6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.03.update.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into reviews ([
+ {"id": 4, "year": 2018, "quarter": null, "review": "bad"},
+ {"id": 5, "year": 2018, "quarter": 1, "review": "good"},
+ {"id": 6, "year": 2018, "quarter": 1, "review": "bad"},
+ {"id": 7, "year": 2018, "quarter": 2, "review": "good"},
+ {"id": 8, "year": 2018, "quarter": 2, "review": "bad"},
+ {"id": 9, "year": 2019, "quarter": null, "review": "good"},
+ {"id": 10, "year": 2019, "quarter": null, "review": "bad"}
+]);
+
+delete from reviews where year=2019;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.04.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.04.query.sqlpp
new file mode 100644
index 0000000..8709326
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.04.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+select value count(*)
+from reviews;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.05.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.05.ddl.sqlpp
new file mode 100644
index 0000000..3cc900b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.05.ddl.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+drop dataset reviews;
+
+drop dataverse test;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.03.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.03.adm
new file mode 100644
index 0000000..c227083
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.03.adm
@@ -0,0 +1 @@
+0
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.05.adm
new file mode 100644
index 0000000..9a03714
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.05.adm
@@ -0,0 +1 @@
+10
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.02.adm
new file mode 100644
index 0000000..3ca0114
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.02.adm
@@ -0,0 +1,3 @@
+"good"
+"good"
+"good"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.04.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.04.adm
new file mode 100644
index 0000000..301160a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.04.adm
@@ -0,0 +1 @@
+8
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index a257c2d..0afee45 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -16328,4 +16328,18 @@
</compilation-unit>
</test-case>
</test-group>
+ <test-group name="atomic-statements">
+ <test-case FilePath="atomic-statements">
+ <compilation-unit name="atomic-statements-1">
+ <output-dir compare="Clean-JSON">atomic-statements-1</output-dir>
+ <expected-error>HYR0033: Inserting duplicate keys into the primary storage</expected-error>
+ <source-location>false</source-location>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="atomic-statements">
+ <compilation-unit name="atomic-statements-2">
+ <output-dir compare="Clean-JSON">atomic-statements-2</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
</test-suite>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 1c2a047..ac66f33 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.context.IndexInfo;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.StorageIOStats;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -185,4 +186,6 @@
* @param partitionId
*/
void closePartition(int partitionId);
+
+ IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
index 09ab550..27a3d49 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
@@ -45,4 +45,7 @@
* @return The participating nodes in the job execution
*/
Set<String> getJobParticipatingNodes(JobSpecification spec);
+
+ int getNumParticipatingPartitions(JobSpecification spec);
+
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
new file mode 100644
index 0000000..498d174
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.IGlobalTransactionContext;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+
+public interface IGlobalTxManager extends IJobLifecycleListener {
+
+ enum TransactionStatus {
+ ACTIVE,
+ PREPARED,
+ COMMITTED,
+ ABORTED,
+ ROLLBACK
+ }
+
+ IGlobalTransactionContext beginTransaction(JobId jobId, int numParticipatingNodes, int numParticipatingPartitions,
+ List<Integer> participatingDatasetIds) throws ACIDException;
+
+ void commitTransaction(JobId jobId) throws ACIDException;
+
+ void abortTransaction(JobId jobId) throws Exception;
+
+ IGlobalTransactionContext getTransactionContext(JobId jobId) throws ACIDException;
+
+ void handleJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
+ Map<String, ILSMComponentId> componentIdMap);
+
+ void handleJobCompletionMessage(JobId jobId, String nodeId);
+
+ void handleJobRollbackCompletionMessage(JobId jobId, String nodeId);
+
+ void rollback() throws Exception;
+
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index f9c410b..7a109cc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -654,4 +654,9 @@
return !(lsmIndex.isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()
|| opTracker.isFlushLogCreated() || opTracker.isFlushOnExit());
}
+
+ @Override
+ public IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
+ return indexCheckpointManagerProvider;
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index d9456d2..a1980b6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -266,12 +266,14 @@
}
}
- public void commit() throws HyracksDataException {
+ public void finishAllFlush() throws HyracksDataException {
LogRecord logRecord = new LogRecord();
triggerScheduleFlush(logRecord);
List<FlushOperation> flushes = new ArrayList<>(getScheduledFlushes());
LSMIndexUtil.waitFor(flushes);
+ }
+ public synchronized void commit() throws HyracksDataException {
Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
for (ILSMIndex lsmIndex : indexes) {
lsmIndex.commit();
@@ -285,13 +287,14 @@
indexCheckpointManagerProvider.get(ref).flushed(componentSequence, 0L, id.getMaxId());
}
lastFlushOperation.clear();
-
- for (ILSMIndex lsmIndex : indexes) {
- lsmIndex.getMergePolicy().diskComponentAdded(lsmIndex, false);
- }
}
public void abort() throws HyracksDataException {
+ clear();
+ }
+
+ public void clear() throws HyracksDataException {
+ deleteMemoryComponent(false);
Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
for (ILSMIndex lsmIndex : indexes) {
lsmIndex.abort();
@@ -362,7 +365,16 @@
return "Dataset (" + datasetID + "), Partition (" + partition + ")";
}
- public void deleteMemoryComponent() throws HyracksDataException {
+ public void deleteMemoryComponent(ILSMIndex lsmIndex, ILSMComponentId nextComponentId) throws HyracksDataException {
+ Map<String, Object> flushMap = new HashMap<>();
+ flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, 0L);
+ flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
+ ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ accessor.getOpContext().setParameters(flushMap);
+ accessor.deleteComponents(c -> c.getType() == ILSMComponent.LSMComponentType.MEMORY);
+ }
+
+ public void deleteMemoryComponent(boolean onlyPrimaryIndex) throws HyracksDataException {
Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
ILSMIndex primaryLsmIndex = null;
for (ILSMIndex lsmIndex : indexes) {
@@ -379,15 +391,21 @@
Objects.requireNonNull(primaryLsmIndex, "no primary index found in " + indexes);
idGenerator.refresh();
ILSMComponentId nextComponentId = idGenerator.getId();
- Map<String, Object> flushMap = new HashMap<>();
- flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, 0L);
- flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
- ILSMIndexAccessor accessor = primaryLsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- accessor.getOpContext().setParameters(flushMap);
- accessor.deleteComponents(c -> c.getType() == ILSMComponent.LSMComponentType.MEMORY);
+ if (onlyPrimaryIndex) {
+ deleteMemoryComponent(primaryLsmIndex, nextComponentId);
+ } else {
+ for (ILSMIndex lsmIndex : indexes) {
+ deleteMemoryComponent(lsmIndex, nextComponentId);
+ }
+ }
}
private boolean canSafelyFlush() {
return numActiveOperations.get() == 0;
}
+
+ public Map<String, FlushOperation> getLastFlushOperation() {
+ return lastFlushOperation;
+ }
+
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 281b069..922c352 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.api.IRequestTracker;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.cluster.IGlobalTxManager;
import org.apache.asterix.common.config.ExtensionProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.external.IAdapterFactoryService;
@@ -38,6 +39,7 @@
import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.storage.common.IStorageManager;
/**
@@ -165,4 +167,8 @@
* @return the data partitioing provider
*/
IDataPartitioningProvider getDataPartitioningProvider();
+
+ IGlobalTxManager getGlobalTxManager();
+
+ IOManager getIoManager();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index 421fb7b..64748ce 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -19,9 +19,12 @@
package org.apache.asterix.common.dataflow;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.messaging.AtomicJobPreparedMessage;
import org.apache.asterix.common.transactions.ILogMarkerCallback;
import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
import org.apache.hyracks.api.comm.VSizeFrame;
@@ -32,6 +35,8 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -45,10 +50,12 @@
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.LocalResource;
@@ -101,6 +108,9 @@
IIndexDataflowHelper indexHelper = indexHelpers[i];
indexHelper.open();
indexes[i] = indexHelper.getIndexInstance();
+ if (((ILSMIndex) indexes[i]).isAtomic() && isPrimary()) {
+ ((PrimaryIndexOperationTracker) ((ILSMIndex) indexes[i]).getOperationTracker()).clear();
+ }
LocalResource resource = indexHelper.getResource();
modCallbacks[i] = modOpCallbackFactory.createModificationOperationCallback(resource, ctx, this);
IIndexAccessParameters iap = new IndexAccessParameters(modCallbacks[i], NoOpOperationCallback.INSTANCE);
@@ -238,11 +248,31 @@
private void commitAtomicInsertDelete() throws HyracksDataException {
if (isPrimary) {
+ final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
+ int datasetID = -1;
+ boolean atomic = false;
for (IIndex index : indexes) {
if (((ILSMIndex) index).isAtomic()) {
PrimaryIndexOperationTracker opTracker =
((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
- opTracker.commit();
+ opTracker.finishAllFlush();
+ for (Map.Entry<String, FlushOperation> entry : opTracker.getLastFlushOperation().entrySet()) {
+ componentIdMap.put(entry.getKey(), entry.getValue().getFlushingComponent().getId());
+ }
+ datasetID = opTracker.getDatasetInfo().getDatasetID();
+ atomic = true;
+ }
+ }
+
+ if (atomic) {
+ AtomicJobPreparedMessage message = new AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
+ ctx.getJobletContext().getServiceContext().getNodeId(), datasetID, componentIdMap);
+ try {
+ ((NodeControllerService) ctx.getJobletContext().getServiceContext().getControllerService())
+ .sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
+ JavaSerializationUtils.serialize(message), null);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
new file mode 100644
index 0000000..8adbf49
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+import java.util.Map;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Message sent from an NC to CC for every partition handled by it after all
+ * the components generated by an atomic statement/job are flushed to disk.
+ */
+public class AtomicJobPreparedMessage implements ICcAddressedMessage {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 1L;
+ private final JobId jobId;
+ private final String nodeId;
+ private final int datasetId;
+ private final Map<String, ILSMComponentId> componentIdMap;
+
+ public AtomicJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
+ Map<String, ILSMComponentId> componentIdMap) {
+ this.nodeId = nodeId;
+ this.datasetId = datasetId;
+ this.componentIdMap = componentIdMap;
+ this.jobId = jobId;
+ }
+
+ @Override
+ public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ appCtx.getGlobalTxManager().handleJobPreparedMessage(jobId, nodeId, datasetId, componentIdMap);
+ }
+
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index beb8e07..c43f4f0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -102,6 +102,8 @@
*/
void delete();
+ void deleteLatest(long latestId, int historyToDelete);
+
/**
* Gets the index last valid component sequence.
*
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IGlobalTransactionContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IGlobalTransactionContext.java
new file mode 100644
index 0000000..5a175ac
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IGlobalTransactionContext.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.cluster.IGlobalTxManager;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+
+public interface IGlobalTransactionContext {
+
+ JobId getJobId();
+
+ int incrementAndGetAcksReceived();
+
+ int getAcksReceived();
+
+ int getNumNodes();
+
+ int getNumPartitions();
+
+ void resetAcksReceived();
+
+ void setTxnStatus(IGlobalTxManager.TransactionStatus status);
+
+ IGlobalTxManager.TransactionStatus getTxnStatus();
+
+ List<Integer> getDatasetIds();
+
+ Map<String, Map<String, ILSMComponentId>> getNodeResourceMap();
+
+ void persist(IOManager ioManager);
+
+ void delete(IOManager ioManager);
+
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 1321c96..7b6ed0d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.common.utils;
+import java.nio.file.Paths;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -30,6 +31,8 @@
*/
public class StorageConstants {
+ public static final String CC_STORAGE_ROOT_DIR = "/tmp/";
+ public static final String CC_TX_LOG_DIR = Paths.get("cc", "txnlogs").toString();
public static final String STORAGE_ROOT_DIR_NAME = "storage";
public static final String INGESTION_LOGS_DIR_NAME = "ingestion_logs";
public static final String PARTITION_DIR_PREFIX = "partition_";
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
index a2ebd9a..52fba60 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
@@ -33,6 +33,7 @@
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.constraints.Constraint;
import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
+import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
@@ -81,4 +82,11 @@
.map(ConstantExpression::getValue).map(Object::toString).filter(nodeJobs::containsKey)
.collect(Collectors.toSet());
}
+
+ @Override
+ public int getNumParticipatingPartitions(JobSpecification spec) {
+ return spec.getUserConstraints().stream().filter(ce -> ce.getLValue() instanceof PartitionCountExpression)
+ .map(Constraint::getRValue).map(ConstantExpression.class::cast).map(ConstantExpression::getValue)
+ .map(Object::toString).map(Integer::parseInt).max(Integer::compare).get();
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 6e767ed..ec016bc 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -19,11 +19,14 @@
package org.apache.asterix.runtime.operators;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
+import org.apache.asterix.common.messaging.AtomicJobPreparedMessage;
import org.apache.asterix.common.transactions.ILogMarkerCallback;
import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallback;
@@ -35,6 +38,8 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -54,10 +59,12 @@
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -216,6 +223,9 @@
indexHelper.open();
indexes[i] = indexHelper.getIndexInstance();
IIndex index = indexes[i];
+ if (((ILSMIndex) indexes[i]).isAtomic()) {
+ ((PrimaryIndexOperationTracker) ((ILSMIndex) indexes[i]).getOperationTracker()).clear();
+ }
IIndexDataflowHelper keyIndexHelper = keyIndexHelpers[i];
IIndex indexForUniquessCheck;
if (keyIndexHelper != null) {
@@ -341,11 +351,31 @@
}
private void commitAtomicInsert() throws HyracksDataException {
+ final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
+ int datasetID = -1;
+ boolean atomic = false;
for (IIndex index : indexes) {
if (((ILSMIndex) index).isAtomic()) {
PrimaryIndexOperationTracker opTracker =
((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
- opTracker.commit();
+ opTracker.finishAllFlush();
+ for (Map.Entry<String, FlushOperation> entry : opTracker.getLastFlushOperation().entrySet()) {
+ componentIdMap.put(entry.getKey(), entry.getValue().getFlushingComponent().getId());
+ }
+ datasetID = opTracker.getDatasetInfo().getDatasetID();
+ atomic = true;
+ }
+ }
+
+ if (atomic) {
+ AtomicJobPreparedMessage message = new AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
+ ctx.getJobletContext().getServiceContext().getNodeId(), datasetID, componentIdMap);
+ try {
+ ((NodeControllerService) ctx.getJobletContext().getServiceContext().getControllerService())
+ .sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
+ JavaSerializationUtils.serialize(message), null);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 03130ae..de95d60 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -24,11 +24,14 @@
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.messaging.AtomicJobPreparedMessage;
import org.apache.asterix.common.transactions.ILogMarkerCallback;
import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
import org.apache.asterix.om.base.AInt8;
@@ -48,6 +51,8 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -69,10 +74,12 @@
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -297,6 +304,9 @@
IIndexDataflowHelper indexHelper = indexHelpers[i];
indexHelper.open();
indexes[i] = indexHelper.getIndexInstance();
+ if (((ILSMIndex) indexes[i]).isAtomic()) {
+ ((PrimaryIndexOperationTracker) ((ILSMIndex) indexes[i]).getOperationTracker()).clear();
+ }
if (ctx.getSharedObject() != null && i == 0) {
PrimaryIndexLogMarkerCallback callback =
new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) indexes[0]);
@@ -557,12 +567,33 @@
// No op since nextFrame flushes by default
}
+ // TODO: Refactor and remove duplicated code
private void commitAtomicUpsert() throws HyracksDataException {
+ final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
+ int datasetID = -1;
+ boolean atomic = false;
for (IIndex index : indexes) {
if (((ILSMIndex) index).isAtomic()) {
PrimaryIndexOperationTracker opTracker =
((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
- opTracker.commit();
+ opTracker.finishAllFlush();
+ for (Map.Entry<String, FlushOperation> entry : opTracker.getLastFlushOperation().entrySet()) {
+ componentIdMap.put(entry.getKey(), entry.getValue().getFlushingComponent().getId());
+ }
+ datasetID = opTracker.getDatasetInfo().getDatasetID();
+ atomic = true;
+ }
+ }
+
+ if (atomic) {
+ AtomicJobPreparedMessage message = new AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
+ ctx.getJobletContext().getServiceContext().getNodeId(), datasetID, componentIdMap);
+ try {
+ ((NodeControllerService) ctx.getJobletContext().getServiceContext().getControllerService())
+ .sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
+ JavaSerializationUtils.serialize(message), null);
+ } catch (Exception e) {
+ throw new ACIDException(e);
}
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
index 2312f15..39c6043 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
@@ -77,4 +77,9 @@
message.getBuffer().flip();
}
}
+
+ @Override
+ public void fail() {
+ failed = true;
+ }
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
index cfa39c6..3576ba1 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
@@ -59,7 +59,7 @@
for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
PrimaryIndexOperationTracker primaryIndexOpTracker = (PrimaryIndexOperationTracker) opTrackerRef;
try {
- primaryIndexOpTracker.deleteMemoryComponent();
+ primaryIndexOpTracker.deleteMemoryComponent(true);
} catch (HyracksDataException e) {
throw new ACIDException(e);
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionLog.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionLog.java
new file mode 100644
index 0000000..dc068bd
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionLog.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AtomicTransactionLog {
+
+ private JobId jobId;
+ private List<Integer> datasetIds;
+ private Set<String> nodeIds;
+ private Map<String, Map<String, ILSMComponentId>> nodeResourceMap;
+ private int numPartitions;
+
+ @JsonCreator
+ public AtomicTransactionLog(@JsonProperty("jobId") JobId jobId,
+ @JsonProperty("datasetIds") List<Integer> datasetIds, @JsonProperty("nodeIds") Set<String> nodeIds,
+ @JsonProperty("nodeResourceMap") Map<String, Map<String, ILSMComponentId>> nodeResourceMap,
+ @JsonProperty("numPartitions") int numPartitions) {
+ this.jobId = jobId;
+ this.datasetIds = datasetIds;
+ this.nodeIds = nodeIds;
+ this.nodeResourceMap = nodeResourceMap;
+ this.numPartitions = numPartitions;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public List<Integer> getDatasetIds() {
+ return datasetIds;
+ }
+
+ public Set<String> getNodeIds() {
+ return nodeIds;
+ }
+
+ public Map<String, Map<String, ILSMComponentId>> getNodeResourceMap() {
+ return nodeResourceMap;
+ }
+
+ public int getNumPartitions() {
+ return numPartitions;
+ }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTransactionContext.java
new file mode 100644
index 0000000..9d1dd1d
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTransactionContext.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.common.cluster.IGlobalTxManager.TransactionStatus;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.IGlobalTransactionContext;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+@ThreadSafe
+public class GlobalTransactionContext implements IGlobalTransactionContext {
+
+ protected final JobId jobId;
+ private AtomicInteger acksReceived;
+ private final int numNodes;
+ private TransactionStatus status;
+ private final List<Integer> datasetIds;
+ private final int numPartitions;
+ private final Map<String, Map<String, ILSMComponentId>> nodeResourceMap;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public GlobalTransactionContext(JobId jobId, List<Integer> datasetIds, int numNodes, int numPartitions) {
+ this.jobId = jobId;
+ this.datasetIds = datasetIds;
+ this.numNodes = numNodes;
+ this.numPartitions = numPartitions;
+ this.acksReceived = new AtomicInteger(0);
+ this.nodeResourceMap = new HashMap<>();
+ this.status = TransactionStatus.ACTIVE;
+ }
+
+ public GlobalTransactionContext(FileReference txnLogFileRef, IOManager ioManager) {
+ try {
+ AtomicTransactionLog txnLog = OBJECT_MAPPER.readValue(new String(ioManager.readAllBytes(txnLogFileRef)),
+ AtomicTransactionLog.class);
+ this.jobId = txnLog.getJobId();
+ this.datasetIds = txnLog.getDatasetIds();
+ this.nodeResourceMap = txnLog.getNodeResourceMap();
+ this.numNodes = nodeResourceMap.keySet().size();
+ this.numPartitions = txnLog.getNumPartitions();
+ this.acksReceived = new AtomicInteger(0);
+ } catch (JsonProcessingException | HyracksDataException e) {
+ throw new ACIDException(e);
+ }
+ }
+
+ @Override
+ public void setTxnStatus(TransactionStatus status) {
+ this.status = status;
+ }
+
+ @Override
+ public TransactionStatus getTxnStatus() {
+ return status;
+ }
+
+ @Override
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public int incrementAndGetAcksReceived() {
+ return acksReceived.incrementAndGet();
+ }
+
+ @Override
+ public int getAcksReceived() {
+ return acksReceived.get();
+ }
+
+ @Override
+ public void resetAcksReceived() {
+ acksReceived = new AtomicInteger(0);
+ }
+
+ public int getNumNodes() {
+ return numNodes;
+ }
+
+ public int getNumPartitions() {
+ return numPartitions;
+ }
+
+ public List<Integer> getDatasetIds() {
+ return datasetIds;
+ }
+
+ public Map<String, Map<String, ILSMComponentId>> getNodeResourceMap() {
+ return nodeResourceMap;
+ }
+
+ @Override
+ public void persist(IOManager ioManager) {
+ try {
+ FileReference fref = ioManager
+ .resolve(Paths.get(StorageConstants.CC_TX_LOG_DIR, String.format("%s.log", jobId)).toString());
+ AtomicTransactionLog txnLog = new AtomicTransactionLog(jobId, datasetIds, nodeResourceMap.keySet(),
+ nodeResourceMap, numPartitions);
+ ioManager.overwrite(fref,
+ OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(txnLog).getBytes());
+ } catch (HyracksDataException | JsonProcessingException e) {
+ throw new ACIDException(e);
+ }
+ }
+
+ @Override
+ public void delete(IOManager ioManager) {
+ try {
+ FileReference fref = ioManager
+ .resolve(Paths.get(StorageConstants.CC_TX_LOG_DIR, String.format("%s.log", jobId)).toString());
+ ioManager.delete(fref);
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String prettyPrint() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n" + jobId + "\n");
+ sb.append("TransactionState: " + status + "\n");
+ return sb.toString();
+ }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTxInfo.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTxInfo.java
new file mode 100644
index 0000000..b99da87
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTxInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class GlobalTxInfo implements Serializable {
+
+ private final int numNodes;
+ private final List<Integer> datasetIds;
+ private final int numPartitions;
+
+ public GlobalTxInfo(List<Integer> datasetIds, int numNodes, int numPartitions) {
+ this.datasetIds = datasetIds;
+ this.numNodes = numNodes;
+ this.numPartitions = numPartitions;
+ }
+
+ public int getNumNodes() {
+ return numNodes;
+ }
+
+ public List<Integer> getDatasetIds() {
+ return datasetIds;
+ }
+
+ public int getNumPartitions() {
+ return numPartitions;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
index 366b585..136807c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
@@ -98,5 +98,9 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
index de6b5ff..10db9f1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
@@ -31,6 +31,11 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IWritable;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
public final class JobId implements IWritable, Serializable, Comparable {
private static final Pattern jobIdPattern = Pattern.compile("^JID:(\\d+)\\.(\\d+)$");
@@ -50,6 +55,7 @@
private JobId() {
}
+ @JsonCreator
public JobId(long id) {
this.id = id;
}
@@ -58,6 +64,7 @@
return id;
}
+ @JsonIgnore
public CcId getCcId() {
if (ccId == null) {
ccId = CcId.valueOf((int) (id >>> CcIdPartitionedLongFactory.ID_BITS));
@@ -65,6 +72,7 @@
return ccId;
}
+ @JsonIgnore
public long getIdOnly() {
return id & CcIdPartitionedLongFactory.MAX_ID;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
index ba932e1..009a5f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
@@ -98,5 +98,9 @@
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
index c3835eb..356c8c9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
@@ -18,11 +18,18 @@
*/
package org.apache.hyracks.storage.am.lsm.common.api;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
/**
* Stores the id of the disk component, which is a interval (minId, maxId).
* It is generated by {@link ILSMComponentIdGenerator}
*
*/
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type")
+@JsonSubTypes({ @JsonSubTypes.Type(value = LSMComponentId.class, name = "lsmComponentId"), })
public interface ILSMComponentId {
public enum IdCompareResult {
UNKNOWN,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index b10dd5c..33fd38d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -300,7 +300,6 @@
@Override
public void abort() throws HyracksDataException {
- resetMemoryComponents();
for (ILSMDiskComponent c : temporaryDiskComponents) {
c.deactivateAndDestroy();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
index cf6c4a2..8260dde 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
@@ -19,9 +19,14 @@
package org.apache.hyracks.storage.am.lsm.common.impls;
+import java.io.Serializable;
+
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
-public class LSMComponentId implements ILSMComponentId {
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class LSMComponentId implements ILSMComponentId, Serializable {
public static final long NOT_FOUND = -1;
public static final long MIN_VALID_COMPONENT_ID = 0;
@@ -37,7 +42,8 @@
private long maxId;
- public LSMComponentId(long minId, long maxId) {
+ @JsonCreator
+ public LSMComponentId(@JsonProperty("minId") long minId, @JsonProperty("maxId") long maxId) {
assert minId <= maxId;
this.minId = minId;
this.maxId = maxId;