[ASTERIXDB-3346][MTD] Fix GlobalResourceIdFactory race cond.

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Creating a new ID can wait in indefinitely due to
a race condition (see ASTERIXDB-3346) for more details.

Change-Id: Ic7ff15abbc70277a9f1ae340314335253aa23308
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18144
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index 30877d9..8682937 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -18,9 +18,12 @@
  */
 package org.apache.asterix.runtime.transaction;
 
-import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.runtime.message.ResourceIdRequestMessage;
 import org.apache.asterix.runtime.message.ResourceIdRequestResponseMessage;
@@ -32,7 +35,6 @@
 
 import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue;
 import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
-import it.unimi.dsi.fastutil.longs.LongPriorityQueues;
 
 /**
  * A resource id factory that generates unique resource ids across all NCs by requesting
@@ -41,23 +43,44 @@
 public class GlobalResourceIdFactory implements IResourceIdFactory {
 
     private static final Logger LOGGER = LogManager.getLogger();
+    private static final long INVALID_ID = -1L;
+    /**
+     * Maximum number of attempts to request a new block of IDs
+     */
+    private static final int MAX_NUMBER_OF_ATTEMPTS = 3;
+    /**
+     * Time threshold to consider a block request was lost
+     */
+    private static final long WAIT_FOR_REQUEST_TIME_THRESHOLD_NS = TimeUnit.SECONDS.toNanos(2);
+    /**
+     * Wait time by threads waiting for the response with the new block
+     */
+    private static final long WAIT_FOR_BLOCK_ID_TIME_MS = TimeUnit.SECONDS.toMillis(2);
     private final INCServiceContext serviceCtx;
     private final LongPriorityQueue resourceIds;
-    private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> resourceIdResponseQ;
     private final String nodeId;
     private final int initialBlockSize;
     private final int maxBlockSize;
+    /**
+     * Current number of failed block requests
+     */
+    private final AtomicInteger numberOfFailedRequests;
+    /**
+     * Last time a request of a block is initiated
+     */
+    private final AtomicLong requestTime;
     private int currentBlockSize;
     private volatile boolean reset = false;
 
     public GlobalResourceIdFactory(INCServiceContext serviceCtx, int initialBlockSize) {
         this.serviceCtx = serviceCtx;
-        this.resourceIdResponseQ = new LinkedBlockingQueue<>();
         this.nodeId = serviceCtx.getNodeId();
         this.initialBlockSize = initialBlockSize;
         maxBlockSize = initialBlockSize * 2;
         currentBlockSize = initialBlockSize;
-        resourceIds = LongPriorityQueues.synchronize(new LongArrayFIFOQueue(initialBlockSize));
+        resourceIds = new LongArrayFIFOQueue(initialBlockSize);
+        numberOfFailedRequests = new AtomicInteger();
+        requestTime = new AtomicLong();
     }
 
     public synchronized void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse)
@@ -70,52 +93,23 @@
                     currentBlockSize);
             return;
         }
-        resourceIdResponseQ.put(resourceIdResponse);
+        populateIDs(resourceIdResponse);
     }
 
     @Override
     public long createId() throws HyracksDataException {
-        synchronized (resourceIds) {
-            if (reset) {
-                resourceIds.clear();
-                resourceIdResponseQ.clear();
-                reset = false;
-            }
+        // Rest IDs if requested to reset
+        resetIDsIfNeeded();
+        // Get a new ID if possible or request a new block
+        long id = getID();
+        while (id == INVALID_ID) {
+            // All IDs in the previous block were consumed, wait for the new block
+            waitForID();
+            // Retry getting a new ID again
+            id = getID();
         }
-        try {
-            final long resourceId = resourceIds.dequeueLong();
-            if (resourceIds.isEmpty()) {
-                serviceCtx.getControllerService().getExecutor().submit(() -> {
-                    try {
-                        requestNewBlock();
-                    } catch (Exception e) {
-                        LOGGER.warn("failed on preemptive block request", e);
-                    }
-                });
-            }
-            return resourceId;
-        } catch (NoSuchElementException e) {
-            // fallthrough
-        }
-        try {
-            // if there already exists a response, use it
-            ResourceIdRequestResponseMessage response = resourceIdResponseQ.poll();
-            if (response == null) {
-                requestNewBlock();
-                response = resourceIdResponseQ.take();
-            }
-            if (response.getException() != null) {
-                throw HyracksDataException.create(response.getException());
-            }
-            // take the first id, queue the rest
-            final long startingId = response.getResourceId();
-            for (int i = 1; i < response.getBlockSize(); i++) {
-                resourceIds.enqueue(startingId + i);
-            }
-            return startingId;
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
+
+        return id;
     }
 
     @Override
@@ -128,9 +122,106 @@
         LOGGER.debug("current resource ids block size: {}", currentBlockSize);
     }
 
-    protected synchronized void requestNewBlock() throws Exception {
-        // queue is empty; request a new block
-        ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, currentBlockSize);
-        ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
+    private void populateIDs(ResourceIdRequestResponseMessage response) {
+        synchronized (resourceIds) {
+            long startingId = response.getResourceId();
+            for (int i = 0; i < response.getBlockSize(); i++) {
+                resourceIds.enqueue(startingId + i);
+            }
+            // Notify all waiting threads that a new block of IDs was acquired
+            resourceIds.notifyAll();
+        }
+    }
+
+    private void resetIDsIfNeeded() throws HyracksDataException {
+        synchronized (resourceIds) {
+            if (reset) {
+                resourceIds.clear();
+                reset = false;
+                // Request the initial block
+                requestNewBlock();
+            }
+        }
+    }
+
+    private long getID() throws HyracksDataException {
+        long id = INVALID_ID;
+        // Record the time of which getID was called
+        long time = System.nanoTime();
+        int size;
+        synchronized (resourceIds) {
+            size = resourceIds.size();
+            if (size > 0) {
+                id = resourceIds.dequeueLong();
+            }
+        }
+        if (size == 1 || size == 0 && shouldRequestNewBlock(time)) {
+            // The last ID was taken. Preemptively request a new block.
+            // Or the last request failed, retry
+            // Or waiting time for the response exceeded the maximum waiting time threshold
+            requestNewBlock();
+        }
+
+        return id;
+    }
+
+    private void waitForID() throws HyracksDataException {
+        long time = System.nanoTime();
+        try {
+            synchronized (resourceIds) {
+                while (resourceIds.isEmpty() && !shouldRequestNewBlock(time)) {
+                    resourceIds.wait(WAIT_FOR_BLOCK_ID_TIME_MS);
+                    time = System.nanoTime();
+                }
+            }
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private boolean shouldRequestNewBlock(long time) {
+        int failures = numberOfFailedRequests.get();
+        long timeDiff = time - requestTime.get();
+        if (failures > 0 || timeDiff >= WAIT_FOR_REQUEST_TIME_THRESHOLD_NS) {
+            long thresholdSec = TimeUnit.NANOSECONDS.toSeconds(WAIT_FOR_REQUEST_TIME_THRESHOLD_NS);
+            long timeDiffSec = TimeUnit.NANOSECONDS.toSeconds(timeDiff);
+            LOGGER.warn(
+                    "Preemptive requests are either failed or lost "
+                            + "(failures:{}, number-of-failures-threshold: {}),"
+                            + " (time-since-last-request: {}s, time-threshold: {}s)",
+                    failures, MAX_NUMBER_OF_ATTEMPTS, timeDiffSec, thresholdSec);
+            return true;
+        }
+        return false;
+    }
+
+    private synchronized void requestNewBlock() throws HyracksDataException {
+        int attempts = numberOfFailedRequests.get();
+        if (attempts >= MAX_NUMBER_OF_ATTEMPTS) {
+            synchronized (resourceIds) {
+                // Notify all waiting threads so they can fail as well
+                resourceIds.notifyAll();
+            }
+            throw new RuntimeDataException(ErrorCode.ILLEGAL_STATE, "New block request was attempted (" + attempts
+                    + " times) - exceeding the maximum number of allowed retries. See the logs for more information.");
+        }
+
+        requestTime.set(System.nanoTime());
+        serviceCtx.getControllerService().getExecutor().submit(() -> {
+            try {
+                ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, currentBlockSize);
+                ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
+                // Reset the number failures
+                numberOfFailedRequests.set(0);
+            } catch (Exception e) {
+                LOGGER.warn("failed to request a new block", e);
+                // Increment the number of failures
+                numberOfFailedRequests.incrementAndGet();
+                synchronized (resourceIds) {
+                    // Notify a waiting thread (if any) to request a new block
+                    resourceIds.notify();
+                }
+            }
+        });
     }
 }