[NO ISSUE][*DB][RT] Obtain ResourceIds in blocks, to reduce roundtrips between NC and CC
Change-Id: I22840013b1f03255dfb487217bbbb75db420c42d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10125
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
index d36d383..1301dea 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
@@ -20,10 +20,19 @@
public interface IResourceIdManager {
+ /**
+ * @return the created resource id, or <code>-1</code> if a resource cannot be created
+ */
long createResourceId();
boolean reported(String nodeId);
void report(String nodeId, long maxResourceId);
+ /**
+ * @param blockSize the size of resource id block to create
+ * @return the starting id of contiguous block of resource ids, or <code>-1</code> if
+ * the resource block cannot be created
+ */
+ long createResourceIdBlock(int blockSize);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index fbfca55..6198acc 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -26,11 +26,13 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class ResourceIdRequestMessage implements ICcAddressedMessage {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final String src;
+ private final int blockSize;
- public ResourceIdRequestMessage(String src) {
+ public ResourceIdRequestMessage(String src, int blockSize) {
this.src = src;
+ this.blockSize = blockSize;
}
@Override
@@ -40,11 +42,11 @@
ResourceIdRequestResponseMessage response = new ResourceIdRequestResponseMessage();
IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
- response.setResourceId(resourceIdManager.createResourceId());
+ response.setResourceIdBlock(resourceIdManager.createResourceIdBlock(blockSize), blockSize);
if (response.getResourceId() < 0) {
if (!(clusterStateManager.isClusterActive())) {
response.setException(
- new Exception("Cannot generate global resource id when cluster is not active."));
+ new Exception("Cannot generate global resource id(s) when cluster is not active."));
} else {
response.setException(new Exception("One or more nodes has not reported max resource id."));
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
index 6a9ed35..05e6b12 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
@@ -24,19 +24,30 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class ResourceIdRequestResponseMessage implements INcAddressedMessage {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private long resourceId;
+ private int blockSize = 1;
+
private Exception exception;
public long getResourceId() {
return resourceId;
}
+ public int getBlockSize() {
+ return blockSize;
+ }
+
public void setResourceId(long resourceId) {
this.resourceId = resourceId;
}
+ public void setResourceIdBlock(long resourceId, int blockSize) {
+ this.resourceId = resourceId;
+ this.blockSize = blockSize;
+ }
+
public Exception getException() {
return exception;
}
@@ -52,6 +63,7 @@
@Override
public String toString() {
- return ResourceIdRequestResponseMessage.class.getSimpleName();
+ return "ResourceIdRequestResponseMessage{" + "resourceId=" + resourceId + ", blockSize=" + blockSize
+ + ", exception=" + exception + '}';
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index 78b1f17..2bd4f81 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.runtime.transaction;
+import java.util.NoSuchElementException;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
@@ -26,6 +27,12 @@
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue;
+import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
+import it.unimi.dsi.fastutil.longs.LongPriorityQueues;
/**
* A resource id factory that generates unique resource ids across all NCs by requesting
@@ -33,7 +40,11 @@
*/
public class GlobalResourceIdFactory implements IResourceIdFactory {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final int RESOURCE_ID_BLOCK_SIZE = 25;
private final INCServiceContext serviceCtx;
+ private final LongPriorityQueue resourceIds =
+ LongPriorityQueues.synchronize(new LongArrayFIFOQueue(RESOURCE_ID_BLOCK_SIZE));
private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> resourceIdResponseQ;
private final String nodeId;
@@ -44,33 +55,51 @@
}
public void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse) throws InterruptedException {
+ LOGGER.debug("rec'd block of ids: {}", resourceIdResponse);
resourceIdResponseQ.put(resourceIdResponse);
}
@Override
public long createId() throws HyracksDataException {
try {
- ResourceIdRequestResponseMessage reponse = null;
- //if there already exists a response, use it
- if (!resourceIdResponseQ.isEmpty()) {
- synchronized (resourceIdResponseQ) {
- if (!resourceIdResponseQ.isEmpty()) {
- reponse = resourceIdResponseQ.take();
+ final long resourceId = resourceIds.dequeueLong();
+ if (resourceIds.isEmpty()) {
+ serviceCtx.getControllerService().getExecutor().submit(() -> {
+ try {
+ requestNewBlock();
+ } catch (Exception e) {
+ LOGGER.warn("failed on preemptive block request", e);
}
- }
+ });
}
- //if no response available or it has an exception, request a new one
- if (reponse == null || reponse.getException() != null) {
- ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId);
- ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
- reponse = resourceIdResponseQ.take();
- if (reponse.getException() != null) {
- throw HyracksDataException.create(reponse.getException());
- }
+ return resourceId;
+ } catch (NoSuchElementException e) {
+ // fallthrough
+ }
+ try {
+ // if there already exists a response, use it
+ ResourceIdRequestResponseMessage response = resourceIdResponseQ.poll();
+ if (response == null) {
+ requestNewBlock();
+ response = resourceIdResponseQ.take();
}
- return reponse.getResourceId();
+ if (response.getException() != null) {
+ throw HyracksDataException.create(response.getException());
+ }
+ // take the first id, queue the rest
+ final long startingId = response.getResourceId();
+ for (int i = 1; i < response.getBlockSize(); i++) {
+ resourceIds.enqueue(startingId + i);
+ }
+ return startingId;
} catch (Exception e) {
throw HyracksDataException.create(e);
}
}
+
+ protected void requestNewBlock() throws Exception {
+ // queue is empty; request a new block
+ ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, RESOURCE_ID_BLOCK_SIZE);
+ ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
index 5bcd5aa..8b4fd68 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
@@ -37,8 +37,11 @@
@Override
public long createResourceId() {
- return csm.isClusterActive() || reportedNodes.containsAll(csm.getParticipantNodes(true))
- ? globalResourceId.incrementAndGet() : -1;
+ return readyState() ? globalResourceId.incrementAndGet() : -1;
+ }
+
+ protected boolean readyState() {
+ return csm.isClusterActive() || reportedNodes.containsAll(csm.getParticipantNodes(true));
}
@Override
@@ -51,4 +54,9 @@
globalResourceId.updateAndGet(prev -> Math.max(maxResourceId, prev));
reportedNodes.add(nodeId);
}
+
+ @Override
+ public long createResourceIdBlock(int blockSize) {
+ return readyState() ? globalResourceId.getAndAdd(blockSize) + 1 : -1;
+ }
}