[ASTERIXDB-3188][*DB] Clear cached global resource ids on NCs
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Ensure any cached global resource ids on NCs are cleared after
reporting max used resource id to CC.
Change-Id: I3286915bcf313a88aaa465fa28ffeb56b3e9a0cd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17541
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index f0f0470..bee5ff9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -70,6 +70,9 @@
}
NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
(NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
+ if (localCounter != null) {
+ LOGGER.debug("returning local counters to cc: {}", localCounter);
+ }
// wrap the returned partitions in a hash set to make it serializable
Set<Integer> nodeActivePartitions = new HashSet<>(appCtx.getReplicaManager().getPartitions());
NCLifecycleTaskReportMessage result =
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
index 8956b93..ea4e7ad 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
public class NcLocalCounters implements Serializable {
private static final long serialVersionUID = 3798954558299915995L;
@@ -41,6 +42,7 @@
public static NcLocalCounters collect(CcId ccId, NodeControllerService ncs) throws HyracksDataException {
final INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext();
+ resetGlobalCounters(ncs, appContext);
long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
long maxTxnId = appContext.getMaxTxnId();
@@ -65,4 +67,10 @@
return "NcLocalCounters{" + "maxResourceId=" + maxResourceId + ", maxTxnId=" + maxTxnId + ", maxJobId="
+ maxJobId + '}';
}
+
+ private static void resetGlobalCounters(NodeControllerService ncs, INcApplicationContext appContext) {
+ IResourceIdFactory resourceIdFactory =
+ appContext.getStorageComponentProvider().getStorageManager().getResourceIdFactory(ncs.getContext());
+ resourceIdFactory.reset();
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index 2bd4f81..908663f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -41,26 +41,45 @@
public class GlobalResourceIdFactory implements IResourceIdFactory {
private static final Logger LOGGER = LogManager.getLogger();
- private static final int RESOURCE_ID_BLOCK_SIZE = 25;
+ private static final int RESOURCE_ID_INITIAL_BLOCK_SIZE = 24;
+ private static final int MAX_BLOCK_SIZE = 35;
private final INCServiceContext serviceCtx;
private final LongPriorityQueue resourceIds =
- LongPriorityQueues.synchronize(new LongArrayFIFOQueue(RESOURCE_ID_BLOCK_SIZE));
+ LongPriorityQueues.synchronize(new LongArrayFIFOQueue(RESOURCE_ID_INITIAL_BLOCK_SIZE));
private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> resourceIdResponseQ;
private final String nodeId;
+ private volatile boolean reset = false;
+ private int currentBlockSize;
public GlobalResourceIdFactory(INCServiceContext serviceCtx) {
this.serviceCtx = serviceCtx;
this.resourceIdResponseQ = new LinkedBlockingQueue<>();
this.nodeId = serviceCtx.getNodeId();
+ this.currentBlockSize = RESOURCE_ID_INITIAL_BLOCK_SIZE;
}
- public void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse) throws InterruptedException {
+ public synchronized void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse)
+ throws InterruptedException {
LOGGER.debug("rec'd block of ids: {}", resourceIdResponse);
+ // to ensure any block that was requested before a reset call isn't processed, we will ignore blocks where their
+ // block size doesn't match the current block size
+ if (resourceIdResponse.getBlockSize() != currentBlockSize) {
+ LOGGER.debug("dropping outdated block size of resource ids: {}, current block size: {}", resourceIdResponse,
+ currentBlockSize);
+ return;
+ }
resourceIdResponseQ.put(resourceIdResponse);
}
@Override
public long createId() throws HyracksDataException {
+ synchronized (resourceIds) {
+ if (reset) {
+ resourceIds.clear();
+ resourceIdResponseQ.clear();
+ reset = false;
+ }
+ }
try {
final long resourceId = resourceIds.dequeueLong();
if (resourceIds.isEmpty()) {
@@ -97,9 +116,19 @@
}
}
- protected void requestNewBlock() throws Exception {
+ @Override
+ public synchronized void reset() {
+ reset = true;
+ currentBlockSize += 1;
+ if (currentBlockSize > MAX_BLOCK_SIZE) {
+ currentBlockSize = RESOURCE_ID_INITIAL_BLOCK_SIZE;
+ }
+ LOGGER.debug("current resource ids block size: {}", currentBlockSize);
+ }
+
+ protected synchronized void requestNewBlock() throws Exception {
// queue is empty; request a new block
- ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, RESOURCE_ID_BLOCK_SIZE);
+ ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, currentBlockSize);
((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index bb3cde5..f9bf175 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -253,7 +253,10 @@
for (File file : files) {
final LocalResource localResource = readLocalResource(file);
if (filter.test(localResource)) {
- resourcesMap.put(localResource.getId(), localResource);
+ LocalResource duplicate = resourcesMap.putIfAbsent(localResource.getId(), localResource);
+ if (duplicate != null) {
+ LOGGER.warn("found duplicate resource ids {} and {}", localResource, duplicate);
+ }
}
}
} catch (IOException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
index 9f67540..5e38b16 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
@@ -20,12 +20,18 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
-@FunctionalInterface
public interface IResourceIdFactory {
/**
* @return A unique id
- * @throws Exception
+ * @throws HyracksDataException
*/
long createId() throws HyracksDataException;
+
+ /**
+ * Resets this factory to the last value used
+ */
+ default void reset() {
+ // no op
+ }
}