[ASTERIXDB-2281][RT] Consider reserved txn ids when determining max
Change-Id: I88f14fb351976db239ed752693e59882da62d588
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2368
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index d42db39..6e25856 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -513,4 +513,14 @@
public ICoordinationService getCoordinationService() {
return NoOpCoordinationService.INSTANCE;
}
+
+ @Override
+ public long getMaxTxnId() {
+ if (txnSubsystem == null) {
+ throw new IllegalStateException("cannot determine max txn id before txnSubsystem is initialized!");
+ }
+
+ return Math.max(MetadataManager.INSTANCE == null ? 0 : MetadataManager.INSTANCE.getMaxTxnId(),
+ txnSubsystem.getTransactionManager().getMaxTxnId());
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 551e6aa..699892e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -178,7 +178,7 @@
throws AlgebricksException, IOException {
return new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE,
globalRecoveryManager, lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider,
- new MetadataLockManager(), MetadataManager::getTxnIdBlockFactory);
+ new MetadataLockManager());
}
protected GlobalRecoveryManager createGlobalRecoveryManager() throws Exception {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 7a74940..e4fe4f3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -95,7 +95,13 @@
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Received message: " + absMessage);
}
- absMessage.handle(appContext);
+ ncs.getExecutor().submit(() -> {
+ try {
+ absMessage.handle(appContext);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "Could not process message: {}", message, e);
+ }
+ });
}
public ConcurrentFramePool getMessagingFramePool() {
@@ -105,7 +111,7 @@
private void sendMessageToChannel(IChannelControlBlock ccb, INcAddressedMessage msg) throws IOException {
byte[] serializedMsg = JavaSerializationUtils.serialize(msg);
if (serializedMsg.length > maxMsgSize) {
- throw new HyracksDataException("Message exceded maximum size");
+ throw new HyracksDataException("Message exceeded maximum size");
}
// Prepare the message buffer
ByteBuffer msgBuffer = messagingFramePool.get();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 82936b3..19b4d61 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -124,4 +124,6 @@
IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider();
IReplicaManager getReplicaManager();
+
+ long getMaxTxnId();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
index 3c60432..be4a1f8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
@@ -28,6 +28,8 @@
*/
TxnId create() throws AlgebricksException;
+ long getIdBlock(int blockSize);
+
/**
* Ensure that future transaction ids are larger than the supplied id
*
@@ -35,4 +37,11 @@
* the value to ensure future created transaction ids are larger than
*/
void ensureMinimumId(long id) throws AlgebricksException;
+
+ /**
+ * The highest transaction id this factory has created
+ *
+ * @return the max transaction id
+ */
+ long getMaxTxnId();
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
similarity index 71%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java
rename to asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
index 82bbe6b..d44bb13 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
@@ -16,30 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.runtime.utils;
+package org.apache.asterix.metadata;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
-import org.apache.asterix.common.transactions.ILongBlockFactory;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.runtime.message.TxnIdBlockRequest;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* Represents a factory to generate unique transaction IDs.
*/
-class CcTxnIdFactory implements ITxnIdFactory {
- private static final int TXN_BLOCK_SIZE = 1024;
+class CachingTxnIdFactory implements ITxnIdFactory {
private static final Logger LOGGER = LogManager.getLogger();
- private final Supplier<ILongBlockFactory> blockFactorySupplier;
+ private final INcApplicationContext appCtx;
private volatile Block block = new Block(0, 0);
- public CcTxnIdFactory(Supplier<ILongBlockFactory> blockFactorySupplier) {
- this.blockFactorySupplier = blockFactorySupplier;
+ public CachingTxnIdFactory(INcApplicationContext appCtx) {
+ this.appCtx = appCtx;
}
@Override
@@ -50,14 +50,30 @@
} catch (BlockExhaustedException ex) {
// retry
LOGGER.info("block exhausted; obtaining new block from supplier");
- block = new Block(blockFactorySupplier.get().getBlock(TXN_BLOCK_SIZE), TXN_BLOCK_SIZE);
+ TxnIdBlockRequest.Block newBlock;
+ try {
+ newBlock = TxnIdBlockRequest.send(appCtx);
+ } catch (HyracksDataException e) {
+ throw new AlgebricksException(e);
+ }
+ block = new Block(newBlock.getStartingId(), newBlock.getBlockSize());
}
}
}
@Override
public void ensureMinimumId(long id) throws AlgebricksException {
- blockFactorySupplier.get().ensureMinimum(id);
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getIdBlock(int blockSize) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMaxTxnId() {
+ return block.endExclusive - 1;
}
static class Block {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index b4b304e..b2757f2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -35,7 +35,7 @@
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.transactions.ILongBlockFactory;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.metadata.api.IAsterixStateProxy;
@@ -92,7 +92,7 @@
* cluster, i.e., metadata transaction ids shall never "accidentally" overlap
* with transaction ids of regular jobs or other metadata transactions.
*/
-public abstract class MetadataManager implements IMetadataManager, ILongBlockFactory {
+public abstract class MetadataManager implements IMetadataManager {
private final MetadataCache cache = new MetadataCache();
protected final Collection<IAsterixStateProxy> proxies;
protected IMetadataNode metadataNode;
@@ -119,13 +119,19 @@
this.metadataLatch = new ReentrantReadWriteLock(true);
}
+ protected abstract TxnId createTxnId();
+
@Override
public void init() throws HyracksDataException {
// no op
}
@Override
- public abstract MetadataTransactionContext beginTransaction() throws RemoteException, ACIDException;
+ public MetadataTransactionContext beginTransaction() throws RemoteException {
+ TxnId txnId = createTxnId();
+ metadataNode.beginTransaction(txnId);
+ return new MetadataTransactionContext(txnId);
+ }
@Override
public void commitTransaction(MetadataTransactionContext ctx) throws RemoteException, ACIDException {
@@ -997,34 +1003,6 @@
rebindMetadataNode = true;
}
- @Override
- public void ensureMinimum(long value) throws AlgebricksException {
- try {
- metadataNode.ensureMinimumTxnId(value);
- } catch (RemoteException e) {
- throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
- }
- }
-
- @Override
- public long getBlock(int blockSize) throws AlgebricksException {
- try {
- return metadataNode.reserveTxnIdBlock(blockSize);
- } catch (RemoteException e) {
- throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
- }
- }
-
- public static ILongBlockFactory getTxnIdBlockFactory() {
- try {
- INSTANCE.init();
- } catch (HyracksDataException e) {
- throw new IllegalStateException(e);
- }
- return (ILongBlockFactory) INSTANCE;
-
- }
-
public static void initialize(IAsterixStateProxy proxy, MetadataProperties metadataProperties,
ICcApplicationContext appCtx) {
INSTANCE = new CCMetadataManagerImpl(proxy, metadataProperties, appCtx);
@@ -1046,15 +1024,19 @@
}
@Override
- public MetadataTransactionContext beginTransaction() throws RemoteException {
+ protected TxnId createTxnId() {
TxnId txnId;
try {
txnId = appCtx.getTxnIdFactory().create();
} catch (AlgebricksException e) {
throw new ACIDException(e);
}
- metadataNode.beginTransaction(txnId);
- return new MetadataTransactionContext(txnId);
+ return txnId;
+ }
+
+ @Override
+ public long getMaxTxnId() {
+ return appCtx.getTxnIdFactory().getMaxTxnId();
}
@Override
@@ -1083,15 +1065,25 @@
}
private static class NCMetadataManagerImpl extends MetadataManager {
+ private final ITxnIdFactory txnIdFactory;
+
NCMetadataManagerImpl(Collection<IAsterixStateProxy> proxies, MetadataNode metadataNode) {
super(proxies, metadataNode);
+ txnIdFactory = metadataNode.getTxnIdFactory();
}
@Override
- public MetadataTransactionContext beginTransaction() throws RemoteException {
- TxnId txnId = new TxnId(metadataNode.reserveTxnIdBlock(1));
- metadataNode.beginTransaction(txnId);
- return new MetadataTransactionContext(txnId);
+ protected TxnId createTxnId() {
+ try {
+ return txnIdFactory.create();
+ } catch (AlgebricksException e) {
+ throw new ACIDException(e);
+ }
+ }
+
+ @Override
+ public long getMaxTxnId() {
+ return txnIdFactory.getMaxTxnId();
}
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index fd21941..681bae7 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -39,6 +39,7 @@
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.transactions.ImmutableDatasetId;
import org.apache.asterix.common.transactions.TransactionOptions;
import org.apache.asterix.common.transactions.TxnId;
@@ -132,7 +133,7 @@
private IDatasetLifecycleManager datasetLifecycleManager;
private ITransactionSubsystem transactionSubsystem;
private int metadataStoragePartition;
- private transient BulkTxnIdFactory txnIdFactory;
+ private transient CachingTxnIdFactory txnIdFactory;
// core only
private transient MetadataTupleTranslatorProvider tupleTranslatorProvider;
// extension only
@@ -159,7 +160,7 @@
}
}
}
- this.txnIdFactory = new BulkTxnIdFactory();
+ this.txnIdFactory = new CachingTxnIdFactory(runtimeContext);
}
public int getMetadataStoragePartition() {
@@ -167,16 +168,6 @@
}
@Override
- public void ensureMinimumTxnId(long maxId) throws ACIDException, RemoteException {
- txnIdFactory.ensureMinimumId(maxId);
- }
-
- @Override
- public long reserveTxnIdBlock(int blockSize) throws ACIDException, RemoteException {
- return txnIdFactory.reserveIdBlock(blockSize);
- }
-
- @Override
public void beginTransaction(TxnId transactionId) throws ACIDException, RemoteException {
TransactionOptions options = new TransactionOptions(AtomicityLevel.ATOMIC);
transactionSubsystem.getTransactionManager().beginTransaction(transactionId, options);
@@ -2009,4 +2000,8 @@
throw new AlgebricksException(e);
}
}
+
+ public ITxnIdFactory getTxnIdFactory() {
+ return txnIdFactory;
+ }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index b2d0d3e..e030db3 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -724,4 +724,6 @@
List<FeedConnection> getFeedConections(MetadataTransactionContext ctx, String dataverseName, String feedName)
throws AlgebricksException;
+
+ long getMaxTxnId();
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index c3f9d7f..f6abc53 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -26,7 +26,6 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.transactions.ITxnIdBlockProvider;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.metadata.entities.CompactionPolicy;
@@ -52,28 +51,7 @@
* lock/access metadata shall always go through the MetadataManager, and should
* never call methods on the MetadataNode directly for any reason.
*/
-public interface IMetadataNode extends Remote, Serializable, ITxnIdBlockProvider {
-
- /**
- * Allocates a block of transaction ids of specified block size
- *
- * @param maxId
- * The txn id to ensure future txn ids are larger than
- * @throws ACIDException
- * @throws RemoteException
- */
- void ensureMinimumTxnId(long maxId) throws ACIDException, RemoteException;
-
- /**
- * Allocates a block of transaction ids of specified block size
- *
- * @param blockSize
- * The size of the transaction id block to reserve
- * @return the start of the reserved block
- * @throws ACIDException
- * @throws RemoteException
- */
- long reserveTxnIdBlock(int blockSize) throws ACIDException, RemoteException;
+public interface IMetadataNode extends Remote, Serializable {
/**
* Begins a local transaction against the metadata.
@@ -828,5 +806,4 @@
List<FeedConnection> getFeedConnections(TxnId txnId, String dataverseName, String feedName)
throws AlgebricksException, RemoteException;
-
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
index db2a044..fe9a5b8 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
@@ -65,7 +65,7 @@
INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext();
long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
- long maxTxnId = appContext.getTransactionSubsystem().getTransactionManager().getMaxTxnId();
+ long maxTxnId = appContext.getMaxTxnId();
long maxJobId = ncs.getMaxJobId(ccId);
ReportLocalCountersMessage countersMessage =
new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId, maxJobId);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index a2f4aa1..3e172c7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -39,20 +39,20 @@
public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
try {
ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
- ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage();
+ ResourceIdRequestResponseMessage response = new ResourceIdRequestResponseMessage();
IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
if (!clusterStateManager.isClusterActive()) {
- reponse.setResourceId(-1);
- reponse.setException(new Exception("Cannot generate global resource id when cluster is not active."));
+ response.setResourceId(-1);
+ response.setException(new Exception("Cannot generate global resource id when cluster is not active."));
} else {
IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
- reponse.setResourceId(resourceIdManager.createResourceId());
- if (reponse.getResourceId() < 0) {
- reponse.setException(new Exception("One or more nodes has not reported max resource id."));
+ response.setResourceId(resourceIdManager.createResourceId());
+ if (response.getResourceId() < 0) {
+ response.setException(new Exception("One or more nodes has not reported max resource id."));
}
requestMaxResourceID(clusterStateManager, resourceIdManager, broker);
}
- broker.sendApplicationMessageToNC(reponse, src);
+ broker.sendApplicationMessageToNC(response, src);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
new file mode 100644
index 0000000..b8578ec
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class TxnIdBlockRequest implements ICcAddressedMessage {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final int BLOCK_SIZE = 100;
+ private static final long serialVersionUID = 1L;
+
+ private static BlockingQueue<TxnIdBlockResponse> blockQueue = new LinkedBlockingQueue<>();
+ private final String nodeId;
+ private final int blockSizeRequested;
+
+ public TxnIdBlockRequest(String nodeId, int blockSizeRequested) {
+ this.nodeId = nodeId;
+ this.blockSizeRequested = blockSizeRequested;
+ }
+
+ @Override
+ public void handle(ICcApplicationContext appCtx) throws HyracksDataException {
+ try {
+ ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ long startingId = appCtx.getTxnIdFactory().getIdBlock(blockSizeRequested);
+ TxnIdBlockResponse response = new TxnIdBlockResponse(startingId, blockSizeRequested);
+ broker.sendApplicationMessageToNC(response, nodeId);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public static Block send(INcApplicationContext ncs) throws HyracksDataException {
+ TxnIdBlockRequest blockRequestMessage = new TxnIdBlockRequest(ncs.getServiceContext().getNodeId(), BLOCK_SIZE);
+ try {
+ ((INCMessageBroker) ncs.getServiceContext().getMessageBroker()).sendMessageToPrimaryCC(blockRequestMessage);
+ TxnIdBlockResponse response = blockQueue.take();
+ return new Block(response.getStartingId(), response.getBlockSize());
+ } catch (Exception e) {
+ LOGGER.log(Level.ERROR, "Unable to request transaction id block", e);
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ static void addResponse(TxnIdBlockResponse response) {
+ blockQueue.offer(response);
+ }
+
+ @Override
+ public String toString() {
+ return TxnIdBlockRequest.class.getSimpleName();
+ }
+
+ public static class Block {
+
+ private final long startingId;
+ private final int blockSize;
+
+ public Block(long startingId, int blockSize) {
+ this.startingId = startingId;
+ this.blockSize = blockSize;
+ }
+
+ public long getStartingId() {
+ return startingId;
+ }
+
+ public int getBlockSize() {
+ return blockSize;
+ }
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockResponse.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockResponse.java
new file mode 100644
index 0000000..46d742a
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockResponse.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class TxnIdBlockResponse implements INcAddressedMessage {
+ private static final long serialVersionUID = 1L;
+ private final long startingId;
+ private final int blockSize;
+
+ public TxnIdBlockResponse(long startingId, int blockSize) {
+ this.startingId = startingId;
+ this.blockSize = blockSize;
+ }
+
+ public long getStartingId() {
+ return startingId;
+ }
+
+ public int getBlockSize() {
+ return blockSize;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ TxnIdBlockRequest.addResponse(this);
+ }
+
+ @Override
+ public String toString() {
+ return TxnIdBlockResponse.class.getSimpleName();
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
similarity index 88%
rename from asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
index 8ac6b63..542bc17 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.asterix.metadata;
+package org.apache.asterix.runtime.utils;
import java.util.concurrent.atomic.AtomicLong;
@@ -33,7 +33,8 @@
return new TxnId(maxId.incrementAndGet());
}
- public long reserveIdBlock(int blockSize) {
+ @Override
+ public long getIdBlock(int blockSize) {
if (blockSize < 1) {
throw new IllegalArgumentException("block size cannot be smaller than 1, but was " + blockSize);
}
@@ -44,4 +45,9 @@
public void ensureMinimumId(long id) {
this.maxId.getAndUpdate(next -> Math.max(next, id));
}
+
+ @Override
+ public long getMaxTxnId() {
+ return maxId.get();
+ }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index b83df6c..4157e16 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -24,7 +24,6 @@
import org.apache.asterix.common.api.ICoordinationService;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.api.INodeJobTracker;
-import org.apache.asterix.common.transactions.ILongBlockFactory;
import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
@@ -93,8 +92,7 @@
ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier,
IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator ftStrategy,
IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider,
- IMetadataLockManager mdLockManager, Supplier<ILongBlockFactory> txnIdBlockSupplier)
- throws AlgebricksException, IOException {
+ IMetadataLockManager mdLockManager) throws AlgebricksException, IOException {
this.ccServiceCtx = ccServiceCtx;
this.hcc = hcc;
this.libraryManager = libraryManager;
@@ -122,7 +120,8 @@
clusterStateManager.setCcAppCtx(this);
this.resourceIdManager = new ResourceIdManager(clusterStateManager);
nodeJobTracker = new NodeJobTracker();
- txnIdFactory = new CcTxnIdFactory(txnIdBlockSupplier);
+ txnIdFactory = new BulkTxnIdFactory();
+
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 58b4b27..d0e4655 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -22,9 +22,6 @@
import java.io.Serializable;
import org.apache.hyracks.api.util.ErrorMessageUtil;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
/**
* The main execution time exception type for runtime errors in a hyracks environment
@@ -32,7 +29,6 @@
public class HyracksDataException extends HyracksException {
private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = LogManager.getLogger();
public static HyracksDataException create(Throwable cause) {
if (cause instanceof HyracksDataException || cause == null) {
@@ -40,11 +36,8 @@
} else if (cause instanceof Error) {
// don't wrap errors, allow them to propagate
throw (Error) cause;
- } else if (cause instanceof InterruptedException && !Thread.currentThread().isInterrupted()) {
- // TODO(mblow): why not force interrupt on current thread?
- LOGGER.log(Level.WARN,
- "Wrapping an InterruptedException in HyracksDataException and current thread is not interrupted",
- cause);
+ } else if (cause instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
}
return new HyracksDataException(cause);
}
@@ -65,10 +58,8 @@
// don't suppress errors into a HyracksDataException, allow them to propagate
th.addSuppressed(root);
throw (Error) th;
- } else if (th instanceof InterruptedException && !Thread.currentThread().isInterrupted()) {
- // TODO(mblow): why not force interrupt on current thread?
- LOGGER.log(Level.WARN, "Suppressing an InterruptedException in a HyracksDataException and current "
- + "thread is not interrupted", th);
+ } else if (th instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
}
root.addSuppressed(th);
return root;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index a95ae3d..5e3c3d4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -21,7 +21,6 @@
import java.util.List;
import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.deployment.DeploymentId;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 5c6d078..3d505f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -1314,6 +1314,7 @@
}
public static class ShutdownResponseFunction extends Function {
+ private static final long serialVersionUID = 1L;
private final String nodeId;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index ae40ea3..027316e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -21,7 +21,6 @@
import java.util.List;
import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.deployment.DeploymentId;