[ASTERIXDB-2281][RT] Consider reserved txn ids when determining max

Change-Id: I88f14fb351976db239ed752693e59882da62d588
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2368
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index d42db39..6e25856 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -513,4 +513,14 @@
     public ICoordinationService getCoordinationService() {
         return NoOpCoordinationService.INSTANCE;
     }
+
+    @Override
+    public long getMaxTxnId() {
+        if (txnSubsystem == null) {
+            throw new IllegalStateException("cannot determine max txn id before txnSubsystem is initialized!");
+        }
+
+        return Math.max(MetadataManager.INSTANCE == null ? 0 : MetadataManager.INSTANCE.getMaxTxnId(),
+                txnSubsystem.getTransactionManager().getMaxTxnId());
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 551e6aa..699892e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -178,7 +178,7 @@
             throws AlgebricksException, IOException {
         return new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE,
                 globalRecoveryManager, lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider,
-                new MetadataLockManager(), MetadataManager::getTxnIdBlockFactory);
+                new MetadataLockManager());
     }
 
     protected GlobalRecoveryManager createGlobalRecoveryManager() throws Exception {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 7a74940..e4fe4f3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -95,7 +95,13 @@
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("Received message: " + absMessage);
         }
-        absMessage.handle(appContext);
+        ncs.getExecutor().submit(() -> {
+            try {
+                absMessage.handle(appContext);
+            } catch (Exception e) {
+                LOGGER.log(Level.WARN, "Could not process message: {}", message, e);
+            }
+        });
     }
 
     public ConcurrentFramePool getMessagingFramePool() {
@@ -105,7 +111,7 @@
     private void sendMessageToChannel(IChannelControlBlock ccb, INcAddressedMessage msg) throws IOException {
         byte[] serializedMsg = JavaSerializationUtils.serialize(msg);
         if (serializedMsg.length > maxMsgSize) {
-            throw new HyracksDataException("Message exceded maximum size");
+            throw new HyracksDataException("Message exceeded maximum size");
         }
         // Prepare the message buffer
         ByteBuffer msgBuffer = messagingFramePool.get();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 82936b3..19b4d61 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -124,4 +124,6 @@
     IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider();
 
     IReplicaManager getReplicaManager();
+
+    long getMaxTxnId();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
index 3c60432..be4a1f8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
@@ -28,6 +28,8 @@
      */
     TxnId create() throws AlgebricksException;
 
+    long getIdBlock(int blockSize);
+
     /**
      * Ensure that future transaction ids are larger than the supplied id
      *
@@ -35,4 +37,11 @@
      *            the value to ensure future created transaction ids are larger than
      */
     void ensureMinimumId(long id) throws AlgebricksException;
+
+    /**
+     * The highest transaction id this factory has created
+     *
+     * @return the max transaction id
+     */
+    long getMaxTxnId();
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
similarity index 71%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java
rename to asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
index 82bbe6b..d44bb13 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
@@ -16,30 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.runtime.utils;
+package org.apache.asterix.metadata;
 
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
 
-import org.apache.asterix.common.transactions.ILongBlockFactory;
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.runtime.message.TxnIdBlockRequest;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 /**
  * Represents a factory to generate unique transaction IDs.
  */
-class CcTxnIdFactory implements ITxnIdFactory {
-    private static final int TXN_BLOCK_SIZE = 1024;
+class CachingTxnIdFactory implements ITxnIdFactory {
     private static final Logger LOGGER = LogManager.getLogger();
 
-    private final Supplier<ILongBlockFactory> blockFactorySupplier;
+    private final INcApplicationContext appCtx;
     private volatile Block block = new Block(0, 0);
 
-    public CcTxnIdFactory(Supplier<ILongBlockFactory> blockFactorySupplier) {
-        this.blockFactorySupplier = blockFactorySupplier;
+    public CachingTxnIdFactory(INcApplicationContext appCtx) {
+        this.appCtx = appCtx;
     }
 
     @Override
@@ -50,14 +50,30 @@
             } catch (BlockExhaustedException ex) {
                 // retry
                 LOGGER.info("block exhausted; obtaining new block from supplier");
-                block = new Block(blockFactorySupplier.get().getBlock(TXN_BLOCK_SIZE), TXN_BLOCK_SIZE);
+                TxnIdBlockRequest.Block newBlock;
+                try {
+                    newBlock = TxnIdBlockRequest.send(appCtx);
+                } catch (HyracksDataException e) {
+                    throw new AlgebricksException(e);
+                }
+                block = new Block(newBlock.getStartingId(), newBlock.getBlockSize());
             }
         }
     }
 
     @Override
     public void ensureMinimumId(long id) throws AlgebricksException {
-        blockFactorySupplier.get().ensureMinimum(id);
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getIdBlock(int blockSize) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getMaxTxnId() {
+        return block.endExclusive - 1;
     }
 
     static class Block {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index b4b304e..b2757f2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -35,7 +35,7 @@
 import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.transactions.ILongBlockFactory;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
@@ -92,7 +92,7 @@
  * cluster, i.e., metadata transaction ids shall never "accidentally" overlap
  * with transaction ids of regular jobs or other metadata transactions.
  */
-public abstract class MetadataManager implements IMetadataManager, ILongBlockFactory {
+public abstract class MetadataManager implements IMetadataManager {
     private final MetadataCache cache = new MetadataCache();
     protected final Collection<IAsterixStateProxy> proxies;
     protected IMetadataNode metadataNode;
@@ -119,13 +119,19 @@
         this.metadataLatch = new ReentrantReadWriteLock(true);
     }
 
+    protected abstract TxnId createTxnId();
+
     @Override
     public void init() throws HyracksDataException {
         // no op
     }
 
     @Override
-    public abstract MetadataTransactionContext beginTransaction() throws RemoteException, ACIDException;
+    public MetadataTransactionContext beginTransaction() throws RemoteException {
+        TxnId txnId = createTxnId();
+        metadataNode.beginTransaction(txnId);
+        return new MetadataTransactionContext(txnId);
+    }
 
     @Override
     public void commitTransaction(MetadataTransactionContext ctx) throws RemoteException, ACIDException {
@@ -997,34 +1003,6 @@
         rebindMetadataNode = true;
     }
 
-    @Override
-    public void ensureMinimum(long value) throws AlgebricksException {
-        try {
-            metadataNode.ensureMinimumTxnId(value);
-        } catch (RemoteException e) {
-            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
-        }
-    }
-
-    @Override
-    public long getBlock(int blockSize) throws AlgebricksException {
-        try {
-            return metadataNode.reserveTxnIdBlock(blockSize);
-        } catch (RemoteException e) {
-            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
-        }
-    }
-
-    public static ILongBlockFactory getTxnIdBlockFactory() {
-        try {
-            INSTANCE.init();
-        } catch (HyracksDataException e) {
-            throw new IllegalStateException(e);
-        }
-        return (ILongBlockFactory) INSTANCE;
-
-    }
-
     public static void initialize(IAsterixStateProxy proxy, MetadataProperties metadataProperties,
             ICcApplicationContext appCtx) {
         INSTANCE = new CCMetadataManagerImpl(proxy, metadataProperties, appCtx);
@@ -1046,15 +1024,19 @@
         }
 
         @Override
-        public MetadataTransactionContext beginTransaction() throws RemoteException {
+        protected TxnId createTxnId() {
             TxnId txnId;
             try {
                 txnId = appCtx.getTxnIdFactory().create();
             } catch (AlgebricksException e) {
                 throw new ACIDException(e);
             }
-            metadataNode.beginTransaction(txnId);
-            return new MetadataTransactionContext(txnId);
+            return txnId;
+        }
+
+        @Override
+        public long getMaxTxnId() {
+            return appCtx.getTxnIdFactory().getMaxTxnId();
         }
 
         @Override
@@ -1083,15 +1065,25 @@
     }
 
     private static class NCMetadataManagerImpl extends MetadataManager {
+        private final ITxnIdFactory txnIdFactory;
+
         NCMetadataManagerImpl(Collection<IAsterixStateProxy> proxies, MetadataNode metadataNode) {
             super(proxies, metadataNode);
+            txnIdFactory = metadataNode.getTxnIdFactory();
         }
 
         @Override
-        public MetadataTransactionContext beginTransaction() throws RemoteException {
-            TxnId txnId = new TxnId(metadataNode.reserveTxnIdBlock(1));
-            metadataNode.beginTransaction(txnId);
-            return new MetadataTransactionContext(txnId);
+        protected TxnId createTxnId() {
+            try {
+                return txnIdFactory.create();
+            } catch (AlgebricksException e) {
+                throw new ACIDException(e);
+            }
+        }
+
+        @Override
+        public long getMaxTxnId() {
+            return txnIdFactory.getMaxTxnId();
         }
     }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index fd21941..681bae7 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -39,6 +39,7 @@
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.common.transactions.ImmutableDatasetId;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
@@ -132,7 +133,7 @@
     private IDatasetLifecycleManager datasetLifecycleManager;
     private ITransactionSubsystem transactionSubsystem;
     private int metadataStoragePartition;
-    private transient BulkTxnIdFactory txnIdFactory;
+    private transient CachingTxnIdFactory txnIdFactory;
     // core only
     private transient MetadataTupleTranslatorProvider tupleTranslatorProvider;
     // extension only
@@ -159,7 +160,7 @@
                 }
             }
         }
-        this.txnIdFactory = new BulkTxnIdFactory();
+        this.txnIdFactory = new CachingTxnIdFactory(runtimeContext);
     }
 
     public int getMetadataStoragePartition() {
@@ -167,16 +168,6 @@
     }
 
     @Override
-    public void ensureMinimumTxnId(long maxId) throws ACIDException, RemoteException {
-        txnIdFactory.ensureMinimumId(maxId);
-    }
-
-    @Override
-    public long reserveTxnIdBlock(int blockSize) throws ACIDException, RemoteException {
-        return txnIdFactory.reserveIdBlock(blockSize);
-    }
-
-    @Override
     public void beginTransaction(TxnId transactionId) throws ACIDException, RemoteException {
         TransactionOptions options = new TransactionOptions(AtomicityLevel.ATOMIC);
         transactionSubsystem.getTransactionManager().beginTransaction(transactionId, options);
@@ -2009,4 +2000,8 @@
             throw new AlgebricksException(e);
         }
     }
+
+    public ITxnIdFactory getTxnIdFactory() {
+        return txnIdFactory;
+    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index b2d0d3e..e030db3 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -724,4 +724,6 @@
 
     List<FeedConnection> getFeedConections(MetadataTransactionContext ctx, String dataverseName, String feedName)
             throws AlgebricksException;
+
+    long getMaxTxnId();
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index c3f9d7f..f6abc53 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -26,7 +26,6 @@
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.transactions.ITxnIdBlockProvider;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.metadata.entities.CompactionPolicy;
@@ -52,28 +51,7 @@
  * lock/access metadata shall always go through the MetadataManager, and should
  * never call methods on the MetadataNode directly for any reason.
  */
-public interface IMetadataNode extends Remote, Serializable, ITxnIdBlockProvider {
-
-    /**
-     * Allocates a block of transaction ids of specified block size
-     *
-     * @param maxId
-     *            The txn id to ensure future txn ids are larger than
-     * @throws ACIDException
-     * @throws RemoteException
-     */
-    void ensureMinimumTxnId(long maxId) throws ACIDException, RemoteException;
-
-    /**
-     * Allocates a block of transaction ids of specified block size
-     *
-     * @param blockSize
-     *            The size of the transaction id block to reserve
-     * @return the start of the reserved block
-     * @throws ACIDException
-     * @throws RemoteException
-     */
-    long reserveTxnIdBlock(int blockSize) throws ACIDException, RemoteException;
+public interface IMetadataNode extends Remote, Serializable {
 
     /**
      * Begins a local transaction against the metadata.
@@ -828,5 +806,4 @@
 
     List<FeedConnection> getFeedConnections(TxnId txnId, String dataverseName, String feedName)
             throws AlgebricksException, RemoteException;
-
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
index db2a044..fe9a5b8 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
@@ -65,7 +65,7 @@
         INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext();
         long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
                 MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
-        long maxTxnId = appContext.getTransactionSubsystem().getTransactionManager().getMaxTxnId();
+        long maxTxnId = appContext.getMaxTxnId();
         long maxJobId = ncs.getMaxJobId(ccId);
         ReportLocalCountersMessage countersMessage =
                 new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId, maxJobId);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index a2f4aa1..3e172c7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -39,20 +39,20 @@
     public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
         try {
             ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
-            ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage();
+            ResourceIdRequestResponseMessage response = new ResourceIdRequestResponseMessage();
             IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
             if (!clusterStateManager.isClusterActive()) {
-                reponse.setResourceId(-1);
-                reponse.setException(new Exception("Cannot generate global resource id when cluster is not active."));
+                response.setResourceId(-1);
+                response.setException(new Exception("Cannot generate global resource id when cluster is not active."));
             } else {
                 IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
-                reponse.setResourceId(resourceIdManager.createResourceId());
-                if (reponse.getResourceId() < 0) {
-                    reponse.setException(new Exception("One or more nodes has not reported max resource id."));
+                response.setResourceId(resourceIdManager.createResourceId());
+                if (response.getResourceId() < 0) {
+                    response.setException(new Exception("One or more nodes has not reported max resource id."));
                 }
                 requestMaxResourceID(clusterStateManager, resourceIdManager, broker);
             }
-            broker.sendApplicationMessageToNC(reponse, src);
+            broker.sendApplicationMessageToNC(response, src);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
new file mode 100644
index 0000000..b8578ec
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class TxnIdBlockRequest implements ICcAddressedMessage {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final int BLOCK_SIZE = 100;
+    private static final long serialVersionUID = 1L;
+
+    private static BlockingQueue<TxnIdBlockResponse> blockQueue = new LinkedBlockingQueue<>();
+    private final String nodeId;
+    private final int blockSizeRequested;
+
+    public TxnIdBlockRequest(String nodeId, int blockSizeRequested) {
+        this.nodeId = nodeId;
+        this.blockSizeRequested = blockSizeRequested;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws HyracksDataException {
+        try {
+            ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+            long startingId = appCtx.getTxnIdFactory().getIdBlock(blockSizeRequested);
+            TxnIdBlockResponse response = new TxnIdBlockResponse(startingId, blockSizeRequested);
+            broker.sendApplicationMessageToNC(response, nodeId);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static Block send(INcApplicationContext ncs) throws HyracksDataException {
+        TxnIdBlockRequest blockRequestMessage = new TxnIdBlockRequest(ncs.getServiceContext().getNodeId(), BLOCK_SIZE);
+        try {
+            ((INCMessageBroker) ncs.getServiceContext().getMessageBroker()).sendMessageToPrimaryCC(blockRequestMessage);
+            TxnIdBlockResponse response = blockQueue.take();
+            return new Block(response.getStartingId(), response.getBlockSize());
+        } catch (Exception e) {
+            LOGGER.log(Level.ERROR, "Unable to request transaction id block", e);
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    static void addResponse(TxnIdBlockResponse response) {
+        blockQueue.offer(response);
+    }
+
+    @Override
+    public String toString() {
+        return TxnIdBlockRequest.class.getSimpleName();
+    }
+
+    public static class Block {
+
+        private final long startingId;
+        private final int blockSize;
+
+        public Block(long startingId, int blockSize) {
+            this.startingId = startingId;
+            this.blockSize = blockSize;
+        }
+
+        public long getStartingId() {
+            return startingId;
+        }
+
+        public int getBlockSize() {
+            return blockSize;
+        }
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockResponse.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockResponse.java
new file mode 100644
index 0000000..46d742a
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockResponse.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class TxnIdBlockResponse implements INcAddressedMessage {
+    private static final long serialVersionUID = 1L;
+    private final long startingId;
+    private final int blockSize;
+
+    public TxnIdBlockResponse(long startingId, int blockSize) {
+        this.startingId = startingId;
+        this.blockSize = blockSize;
+    }
+
+    public long getStartingId() {
+        return startingId;
+    }
+
+    public int getBlockSize() {
+        return blockSize;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        TxnIdBlockRequest.addResponse(this);
+    }
+
+    @Override
+    public String toString() {
+        return TxnIdBlockResponse.class.getSimpleName();
+    }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
similarity index 88%
rename from asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
index 8ac6b63..542bc17 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.asterix.metadata;
+package org.apache.asterix.runtime.utils;
 
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -33,7 +33,8 @@
         return new TxnId(maxId.incrementAndGet());
     }
 
-    public long reserveIdBlock(int blockSize) {
+    @Override
+    public long getIdBlock(int blockSize) {
         if (blockSize < 1) {
             throw new IllegalArgumentException("block size cannot be smaller than 1, but was " + blockSize);
         }
@@ -44,4 +45,9 @@
     public void ensureMinimumId(long id) {
         this.maxId.getAndUpdate(next -> Math.max(next, id));
     }
+
+    @Override
+    public long getMaxTxnId() {
+        return maxId.get();
+    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index b83df6c..4157e16 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -24,7 +24,6 @@
 import org.apache.asterix.common.api.ICoordinationService;
 import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.api.INodeJobTracker;
-import org.apache.asterix.common.transactions.ILongBlockFactory;
 import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
@@ -93,8 +92,7 @@
             ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier,
             IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator ftStrategy,
             IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider,
-            IMetadataLockManager mdLockManager, Supplier<ILongBlockFactory> txnIdBlockSupplier)
-            throws AlgebricksException, IOException {
+            IMetadataLockManager mdLockManager) throws AlgebricksException, IOException {
         this.ccServiceCtx = ccServiceCtx;
         this.hcc = hcc;
         this.libraryManager = libraryManager;
@@ -122,7 +120,8 @@
         clusterStateManager.setCcAppCtx(this);
         this.resourceIdManager = new ResourceIdManager(clusterStateManager);
         nodeJobTracker = new NodeJobTracker();
-        txnIdFactory = new CcTxnIdFactory(txnIdBlockSupplier);
+        txnIdFactory = new BulkTxnIdFactory();
+
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 58b4b27..d0e4655 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -22,9 +22,6 @@
 import java.io.Serializable;
 
 import org.apache.hyracks.api.util.ErrorMessageUtil;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 /**
  * The main execution time exception type for runtime errors in a hyracks environment
@@ -32,7 +29,6 @@
 public class HyracksDataException extends HyracksException {
 
     private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = LogManager.getLogger();
 
     public static HyracksDataException create(Throwable cause) {
         if (cause instanceof HyracksDataException || cause == null) {
@@ -40,11 +36,8 @@
         } else if (cause instanceof Error) {
             // don't wrap errors, allow them to propagate
             throw (Error) cause;
-        } else if (cause instanceof InterruptedException && !Thread.currentThread().isInterrupted()) {
-            // TODO(mblow): why not force interrupt on current thread?
-            LOGGER.log(Level.WARN,
-                    "Wrapping an InterruptedException in HyracksDataException and current thread is not interrupted",
-                    cause);
+        } else if (cause instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
         }
         return new HyracksDataException(cause);
     }
@@ -65,10 +58,8 @@
             // don't suppress errors into a HyracksDataException, allow them to propagate
             th.addSuppressed(root);
             throw (Error) th;
-        } else if (th instanceof InterruptedException && !Thread.currentThread().isInterrupted()) {
-            // TODO(mblow): why not force interrupt on current thread?
-            LOGGER.log(Level.WARN, "Suppressing an InterruptedException in a HyracksDataException and current "
-                    + "thread is not interrupted", th);
+        } else if (th instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
         }
         root.addSuppressed(th);
         return root;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index a95ae3d..5e3c3d4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -21,7 +21,6 @@
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 5c6d078..3d505f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -1314,6 +1314,7 @@
     }
 
     public static class ShutdownResponseFunction extends Function {
+        private static final long serialVersionUID = 1L;
 
         private final String nodeId;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index ae40ea3..027316e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -21,7 +21,6 @@
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;