[ASTERIXDB-3188][*DB] Clear cached global resource ids on NCs

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Ensure any cached global resource ids on NCs are cleared after
  reporting max used resource id to CC.

Change-Id: I3286915bcf313a88aaa465fa28ffeb56b3e9a0cd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17541
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index f0f0470..bee5ff9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -70,6 +70,9 @@
             }
             NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
                     (NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
+            if (localCounter != null) {
+                LOGGER.debug("returning local counters to cc: {}", localCounter);
+            }
             // wrap the returned partitions in a hash set to make it serializable
             Set<Integer> nodeActivePartitions = new HashSet<>(appCtx.getReplicaManager().getPartitions());
             NCLifecycleTaskReportMessage result =
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
index 8956b93..ea4e7ad 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 public class NcLocalCounters implements Serializable {
     private static final long serialVersionUID = 3798954558299915995L;
@@ -41,6 +42,7 @@
 
     public static NcLocalCounters collect(CcId ccId, NodeControllerService ncs) throws HyracksDataException {
         final INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext();
+        resetGlobalCounters(ncs, appContext);
         long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
                 MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
         long maxTxnId = appContext.getMaxTxnId();
@@ -65,4 +67,10 @@
         return "NcLocalCounters{" + "maxResourceId=" + maxResourceId + ", maxTxnId=" + maxTxnId + ", maxJobId="
                 + maxJobId + '}';
     }
+
+    private static void resetGlobalCounters(NodeControllerService ncs, INcApplicationContext appContext) {
+        IResourceIdFactory resourceIdFactory =
+                appContext.getStorageComponentProvider().getStorageManager().getResourceIdFactory(ncs.getContext());
+        resourceIdFactory.reset();
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index 2bd4f81..908663f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -41,26 +41,45 @@
 public class GlobalResourceIdFactory implements IResourceIdFactory {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final int RESOURCE_ID_BLOCK_SIZE = 25;
+    private static final int RESOURCE_ID_INITIAL_BLOCK_SIZE = 24;
+    private static final int MAX_BLOCK_SIZE = 35;
     private final INCServiceContext serviceCtx;
     private final LongPriorityQueue resourceIds =
-            LongPriorityQueues.synchronize(new LongArrayFIFOQueue(RESOURCE_ID_BLOCK_SIZE));
+            LongPriorityQueues.synchronize(new LongArrayFIFOQueue(RESOURCE_ID_INITIAL_BLOCK_SIZE));
     private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> resourceIdResponseQ;
     private final String nodeId;
+    private volatile boolean reset = false;
+    private int currentBlockSize;
 
     public GlobalResourceIdFactory(INCServiceContext serviceCtx) {
         this.serviceCtx = serviceCtx;
         this.resourceIdResponseQ = new LinkedBlockingQueue<>();
         this.nodeId = serviceCtx.getNodeId();
+        this.currentBlockSize = RESOURCE_ID_INITIAL_BLOCK_SIZE;
     }
 
-    public void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse) throws InterruptedException {
+    public synchronized void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse)
+            throws InterruptedException {
         LOGGER.debug("rec'd block of ids: {}", resourceIdResponse);
+        // to ensure any block that was requested before a reset call isn't processed, we will ignore blocks where their
+        // block size doesn't match the current block size
+        if (resourceIdResponse.getBlockSize() != currentBlockSize) {
+            LOGGER.debug("dropping outdated block size of resource ids: {}, current block size: {}", resourceIdResponse,
+                    currentBlockSize);
+            return;
+        }
         resourceIdResponseQ.put(resourceIdResponse);
     }
 
     @Override
     public long createId() throws HyracksDataException {
+        synchronized (resourceIds) {
+            if (reset) {
+                resourceIds.clear();
+                resourceIdResponseQ.clear();
+                reset = false;
+            }
+        }
         try {
             final long resourceId = resourceIds.dequeueLong();
             if (resourceIds.isEmpty()) {
@@ -97,9 +116,19 @@
         }
     }
 
-    protected void requestNewBlock() throws Exception {
+    @Override
+    public synchronized void reset() {
+        reset = true;
+        currentBlockSize += 1;
+        if (currentBlockSize > MAX_BLOCK_SIZE) {
+            currentBlockSize = RESOURCE_ID_INITIAL_BLOCK_SIZE;
+        }
+        LOGGER.debug("current resource ids block size: {}", currentBlockSize);
+    }
+
+    protected synchronized void requestNewBlock() throws Exception {
         // queue is empty; request a new block
-        ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, RESOURCE_ID_BLOCK_SIZE);
+        ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, currentBlockSize);
         ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
     }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index bb3cde5..f9bf175 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -253,7 +253,10 @@
                 for (File file : files) {
                     final LocalResource localResource = readLocalResource(file);
                     if (filter.test(localResource)) {
-                        resourcesMap.put(localResource.getId(), localResource);
+                        LocalResource duplicate = resourcesMap.putIfAbsent(localResource.getId(), localResource);
+                        if (duplicate != null) {
+                            LOGGER.warn("found duplicate resource ids {} and {}", localResource, duplicate);
+                        }
                     }
                 }
             } catch (IOException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
index 9f67540..5e38b16 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
@@ -20,12 +20,18 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-@FunctionalInterface
 public interface IResourceIdFactory {
 
     /**
      * @return A unique id
-     * @throws Exception
+     * @throws HyracksDataException
      */
     long createId() throws HyracksDataException;
+
+    /**
+     * Resets this factory to the last value used
+     */
+    default void reset() {
+        // no op
+    }
 }