[NO ISSUE][*DB][RT] Obtain ResourceIds in blocks, to reduce roundtrips between NC and CC

Change-Id: I22840013b1f03255dfb487217bbbb75db420c42d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10125
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
index d36d383..1301dea 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
@@ -20,10 +20,19 @@
 
 public interface IResourceIdManager {
 
+    /**
+     * @return the created resource id, or <code>-1</code> if a resource cannot be created
+     */
     long createResourceId();
 
     boolean reported(String nodeId);
 
     void report(String nodeId, long maxResourceId);
 
+    /**
+     * @param blockSize the size of resource id block to create
+     * @return the starting id of contiguous block of resource ids, or <code>-1</code> if
+     *         the resource block cannot be created
+     */
+    long createResourceIdBlock(int blockSize);
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index fbfca55..6198acc 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -26,11 +26,13 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ResourceIdRequestMessage implements ICcAddressedMessage {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private final String src;
+    private final int blockSize;
 
-    public ResourceIdRequestMessage(String src) {
+    public ResourceIdRequestMessage(String src, int blockSize) {
         this.src = src;
+        this.blockSize = blockSize;
     }
 
     @Override
@@ -40,11 +42,11 @@
             ResourceIdRequestResponseMessage response = new ResourceIdRequestResponseMessage();
             IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
             IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
-            response.setResourceId(resourceIdManager.createResourceId());
+            response.setResourceIdBlock(resourceIdManager.createResourceIdBlock(blockSize), blockSize);
             if (response.getResourceId() < 0) {
                 if (!(clusterStateManager.isClusterActive())) {
                     response.setException(
-                            new Exception("Cannot generate global resource id when cluster is not active."));
+                            new Exception("Cannot generate global resource id(s) when cluster is not active."));
                 } else {
                     response.setException(new Exception("One or more nodes has not reported max resource id."));
                 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
index 6a9ed35..05e6b12 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
@@ -24,19 +24,30 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ResourceIdRequestResponseMessage implements INcAddressedMessage {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     private long resourceId;
+    private int blockSize = 1;
+
     private Exception exception;
 
     public long getResourceId() {
         return resourceId;
     }
 
+    public int getBlockSize() {
+        return blockSize;
+    }
+
     public void setResourceId(long resourceId) {
         this.resourceId = resourceId;
     }
 
+    public void setResourceIdBlock(long resourceId, int blockSize) {
+        this.resourceId = resourceId;
+        this.blockSize = blockSize;
+    }
+
     public Exception getException() {
         return exception;
     }
@@ -52,6 +63,7 @@
 
     @Override
     public String toString() {
-        return ResourceIdRequestResponseMessage.class.getSimpleName();
+        return "ResourceIdRequestResponseMessage{" + "resourceId=" + resourceId + ", blockSize=" + blockSize
+                + ", exception=" + exception + '}';
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index 78b1f17..2bd4f81 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.runtime.transaction;
 
+import java.util.NoSuchElementException;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
@@ -26,6 +27,12 @@
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue;
+import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
+import it.unimi.dsi.fastutil.longs.LongPriorityQueues;
 
 /**
  * A resource id factory that generates unique resource ids across all NCs by requesting
@@ -33,7 +40,11 @@
  */
 public class GlobalResourceIdFactory implements IResourceIdFactory {
 
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final int RESOURCE_ID_BLOCK_SIZE = 25;
     private final INCServiceContext serviceCtx;
+    private final LongPriorityQueue resourceIds =
+            LongPriorityQueues.synchronize(new LongArrayFIFOQueue(RESOURCE_ID_BLOCK_SIZE));
     private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> resourceIdResponseQ;
     private final String nodeId;
 
@@ -44,33 +55,51 @@
     }
 
     public void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse) throws InterruptedException {
+        LOGGER.debug("rec'd block of ids: {}", resourceIdResponse);
         resourceIdResponseQ.put(resourceIdResponse);
     }
 
     @Override
     public long createId() throws HyracksDataException {
         try {
-            ResourceIdRequestResponseMessage reponse = null;
-            //if there already exists a response, use it
-            if (!resourceIdResponseQ.isEmpty()) {
-                synchronized (resourceIdResponseQ) {
-                    if (!resourceIdResponseQ.isEmpty()) {
-                        reponse = resourceIdResponseQ.take();
+            final long resourceId = resourceIds.dequeueLong();
+            if (resourceIds.isEmpty()) {
+                serviceCtx.getControllerService().getExecutor().submit(() -> {
+                    try {
+                        requestNewBlock();
+                    } catch (Exception e) {
+                        LOGGER.warn("failed on preemptive block request", e);
                     }
-                }
+                });
             }
-            //if no response available or it has an exception, request a new one
-            if (reponse == null || reponse.getException() != null) {
-                ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId);
-                ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
-                reponse = resourceIdResponseQ.take();
-                if (reponse.getException() != null) {
-                    throw HyracksDataException.create(reponse.getException());
-                }
+            return resourceId;
+        } catch (NoSuchElementException e) {
+            // fallthrough
+        }
+        try {
+            // if there already exists a response, use it
+            ResourceIdRequestResponseMessage response = resourceIdResponseQ.poll();
+            if (response == null) {
+                requestNewBlock();
+                response = resourceIdResponseQ.take();
             }
-            return reponse.getResourceId();
+            if (response.getException() != null) {
+                throw HyracksDataException.create(response.getException());
+            }
+            // take the first id, queue the rest
+            final long startingId = response.getResourceId();
+            for (int i = 1; i < response.getBlockSize(); i++) {
+                resourceIds.enqueue(startingId + i);
+            }
+            return startingId;
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
     }
+
+    protected void requestNewBlock() throws Exception {
+        // queue is empty; request a new block
+        ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, RESOURCE_ID_BLOCK_SIZE);
+        ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
index 5bcd5aa..8b4fd68 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
@@ -37,8 +37,11 @@
 
     @Override
     public long createResourceId() {
-        return csm.isClusterActive() || reportedNodes.containsAll(csm.getParticipantNodes(true))
-                ? globalResourceId.incrementAndGet() : -1;
+        return readyState() ? globalResourceId.incrementAndGet() : -1;
+    }
+
+    protected boolean readyState() {
+        return csm.isClusterActive() || reportedNodes.containsAll(csm.getParticipantNodes(true));
     }
 
     @Override
@@ -51,4 +54,9 @@
         globalResourceId.updateAndGet(prev -> Math.max(maxResourceId, prev));
         reportedNodes.add(nodeId);
     }
+
+    @Override
+    public long createResourceIdBlock(int blockSize) {
+        return readyState() ? globalResourceId.getAndAdd(blockSize) + 1 : -1;
+    }
 }