[ASTERIXDB-3346][MTD] Fix GlobalResourceIdFactory race cond.
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Creating a new ID can wait in indefinitely due to
a race condition (see ASTERIXDB-3346) for more details.
Change-Id: Ic7ff15abbc70277a9f1ae340314335253aa23308
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18144
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index 30877d9..8682937 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -18,9 +18,12 @@
*/
package org.apache.asterix.runtime.transaction;
-import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.runtime.message.ResourceIdRequestMessage;
import org.apache.asterix.runtime.message.ResourceIdRequestResponseMessage;
@@ -32,7 +35,6 @@
import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue;
import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
-import it.unimi.dsi.fastutil.longs.LongPriorityQueues;
/**
* A resource id factory that generates unique resource ids across all NCs by requesting
@@ -41,23 +43,44 @@
public class GlobalResourceIdFactory implements IResourceIdFactory {
private static final Logger LOGGER = LogManager.getLogger();
+ private static final long INVALID_ID = -1L;
+ /**
+ * Maximum number of attempts to request a new block of IDs
+ */
+ private static final int MAX_NUMBER_OF_ATTEMPTS = 3;
+ /**
+ * Time threshold to consider a block request was lost
+ */
+ private static final long WAIT_FOR_REQUEST_TIME_THRESHOLD_NS = TimeUnit.SECONDS.toNanos(2);
+ /**
+ * Wait time by threads waiting for the response with the new block
+ */
+ private static final long WAIT_FOR_BLOCK_ID_TIME_MS = TimeUnit.SECONDS.toMillis(2);
private final INCServiceContext serviceCtx;
private final LongPriorityQueue resourceIds;
- private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> resourceIdResponseQ;
private final String nodeId;
private final int initialBlockSize;
private final int maxBlockSize;
+ /**
+ * Current number of failed block requests
+ */
+ private final AtomicInteger numberOfFailedRequests;
+ /**
+ * Last time a request of a block is initiated
+ */
+ private final AtomicLong requestTime;
private int currentBlockSize;
private volatile boolean reset = false;
public GlobalResourceIdFactory(INCServiceContext serviceCtx, int initialBlockSize) {
this.serviceCtx = serviceCtx;
- this.resourceIdResponseQ = new LinkedBlockingQueue<>();
this.nodeId = serviceCtx.getNodeId();
this.initialBlockSize = initialBlockSize;
maxBlockSize = initialBlockSize * 2;
currentBlockSize = initialBlockSize;
- resourceIds = LongPriorityQueues.synchronize(new LongArrayFIFOQueue(initialBlockSize));
+ resourceIds = new LongArrayFIFOQueue(initialBlockSize);
+ numberOfFailedRequests = new AtomicInteger();
+ requestTime = new AtomicLong();
}
public synchronized void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse)
@@ -70,52 +93,23 @@
currentBlockSize);
return;
}
- resourceIdResponseQ.put(resourceIdResponse);
+ populateIDs(resourceIdResponse);
}
@Override
public long createId() throws HyracksDataException {
- synchronized (resourceIds) {
- if (reset) {
- resourceIds.clear();
- resourceIdResponseQ.clear();
- reset = false;
- }
+ // Rest IDs if requested to reset
+ resetIDsIfNeeded();
+ // Get a new ID if possible or request a new block
+ long id = getID();
+ while (id == INVALID_ID) {
+ // All IDs in the previous block were consumed, wait for the new block
+ waitForID();
+ // Retry getting a new ID again
+ id = getID();
}
- try {
- final long resourceId = resourceIds.dequeueLong();
- if (resourceIds.isEmpty()) {
- serviceCtx.getControllerService().getExecutor().submit(() -> {
- try {
- requestNewBlock();
- } catch (Exception e) {
- LOGGER.warn("failed on preemptive block request", e);
- }
- });
- }
- return resourceId;
- } catch (NoSuchElementException e) {
- // fallthrough
- }
- try {
- // if there already exists a response, use it
- ResourceIdRequestResponseMessage response = resourceIdResponseQ.poll();
- if (response == null) {
- requestNewBlock();
- response = resourceIdResponseQ.take();
- }
- if (response.getException() != null) {
- throw HyracksDataException.create(response.getException());
- }
- // take the first id, queue the rest
- final long startingId = response.getResourceId();
- for (int i = 1; i < response.getBlockSize(); i++) {
- resourceIds.enqueue(startingId + i);
- }
- return startingId;
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
+
+ return id;
}
@Override
@@ -128,9 +122,106 @@
LOGGER.debug("current resource ids block size: {}", currentBlockSize);
}
- protected synchronized void requestNewBlock() throws Exception {
- // queue is empty; request a new block
- ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, currentBlockSize);
- ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
+ private void populateIDs(ResourceIdRequestResponseMessage response) {
+ synchronized (resourceIds) {
+ long startingId = response.getResourceId();
+ for (int i = 0; i < response.getBlockSize(); i++) {
+ resourceIds.enqueue(startingId + i);
+ }
+ // Notify all waiting threads that a new block of IDs was acquired
+ resourceIds.notifyAll();
+ }
+ }
+
+ private void resetIDsIfNeeded() throws HyracksDataException {
+ synchronized (resourceIds) {
+ if (reset) {
+ resourceIds.clear();
+ reset = false;
+ // Request the initial block
+ requestNewBlock();
+ }
+ }
+ }
+
+ private long getID() throws HyracksDataException {
+ long id = INVALID_ID;
+ // Record the time of which getID was called
+ long time = System.nanoTime();
+ int size;
+ synchronized (resourceIds) {
+ size = resourceIds.size();
+ if (size > 0) {
+ id = resourceIds.dequeueLong();
+ }
+ }
+ if (size == 1 || size == 0 && shouldRequestNewBlock(time)) {
+ // The last ID was taken. Preemptively request a new block.
+ // Or the last request failed, retry
+ // Or waiting time for the response exceeded the maximum waiting time threshold
+ requestNewBlock();
+ }
+
+ return id;
+ }
+
+ private void waitForID() throws HyracksDataException {
+ long time = System.nanoTime();
+ try {
+ synchronized (resourceIds) {
+ while (resourceIds.isEmpty() && !shouldRequestNewBlock(time)) {
+ resourceIds.wait(WAIT_FOR_BLOCK_ID_TIME_MS);
+ time = System.nanoTime();
+ }
+ }
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private boolean shouldRequestNewBlock(long time) {
+ int failures = numberOfFailedRequests.get();
+ long timeDiff = time - requestTime.get();
+ if (failures > 0 || timeDiff >= WAIT_FOR_REQUEST_TIME_THRESHOLD_NS) {
+ long thresholdSec = TimeUnit.NANOSECONDS.toSeconds(WAIT_FOR_REQUEST_TIME_THRESHOLD_NS);
+ long timeDiffSec = TimeUnit.NANOSECONDS.toSeconds(timeDiff);
+ LOGGER.warn(
+ "Preemptive requests are either failed or lost "
+ + "(failures:{}, number-of-failures-threshold: {}),"
+ + " (time-since-last-request: {}s, time-threshold: {}s)",
+ failures, MAX_NUMBER_OF_ATTEMPTS, timeDiffSec, thresholdSec);
+ return true;
+ }
+ return false;
+ }
+
+ private synchronized void requestNewBlock() throws HyracksDataException {
+ int attempts = numberOfFailedRequests.get();
+ if (attempts >= MAX_NUMBER_OF_ATTEMPTS) {
+ synchronized (resourceIds) {
+ // Notify all waiting threads so they can fail as well
+ resourceIds.notifyAll();
+ }
+ throw new RuntimeDataException(ErrorCode.ILLEGAL_STATE, "New block request was attempted (" + attempts
+ + " times) - exceeding the maximum number of allowed retries. See the logs for more information.");
+ }
+
+ requestTime.set(System.nanoTime());
+ serviceCtx.getControllerService().getExecutor().submit(() -> {
+ try {
+ ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, currentBlockSize);
+ ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
+ // Reset the number failures
+ numberOfFailedRequests.set(0);
+ } catch (Exception e) {
+ LOGGER.warn("failed to request a new block", e);
+ // Increment the number of failures
+ numberOfFailedRequests.incrementAndGet();
+ synchronized (resourceIds) {
+ // Notify a waiting thread (if any) to request a new block
+ resourceIds.notify();
+ }
+ }
+ });
}
}