Merge branch 'gerrit/stabilization-667a908755'

Change-Id: I28b4034f3cf952b35b26ea23ddfebf0d232c4687
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index f0f0470..bee5ff9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -70,6 +70,9 @@
             }
             NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
                     (NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
+            if (localCounter != null) {
+                LOGGER.debug("returning local counters to cc: {}", localCounter);
+            }
             // wrap the returned partitions in a hash set to make it serializable
             Set<Integer> nodeActivePartitions = new HashSet<>(appCtx.getReplicaManager().getPartitions());
             NCLifecycleTaskReportMessage result =
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b017648..e098a89 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4640,9 +4640,9 @@
     }
 
     private interface IMetadataLocker {
-        void lock() throws AlgebricksException;
+        void lock() throws HyracksDataException, AlgebricksException, InterruptedException;
 
-        void unlock() throws AlgebricksException;
+        void unlock() throws HyracksDataException, AlgebricksException;
     }
 
     private interface IResultPrinter {
@@ -4657,10 +4657,19 @@
             IResultSet resultSet, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats,
             IRequestParameters requestParameters, Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter)
             throws Exception {
+        final IRequestTracker requestTracker = appCtx.getRequestTracker();
+        final ClientRequest clientRequest =
+                (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
-            public void lock() {
-                compilationLock.readLock().lock();
+            public void lock() throws RuntimeDataException, InterruptedException {
+                try {
+                    compilationLock.readLock().lockInterruptibly();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    ensureNotCancelled(clientRequest);
+                    throw e;
+                }
             }
 
             @Override
@@ -4813,18 +4822,20 @@
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
         final ClientRequest clientRequest =
                 (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
+        if (cancellable) {
+            clientRequest.markCancellable();
+        }
         locker.lock();
         try {
             final JobSpecification jobSpec = compiler.compile();
             if (jobSpec == null) {
                 return;
             }
-            if (cancellable) {
-                clientRequest.markCancellable();
-            }
             final SchedulableClientRequest schedulableRequest =
                     SchedulableClientRequest.of(clientRequest, requestParameters, metadataProvider, jobSpec);
             appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
+            // ensure request not cancelled before running job
+            ensureNotCancelled(clientRequest);
             final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
             clientRequest.setJobId(jobId);
             if (jId != null) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index ba6c602..61562d8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -244,8 +244,10 @@
         ActiveNotificationHandler activeNotificationHandler =
                 (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
         IMetadataLockManager lockManager = appCtx.getMetadataLockManager();
+        LOGGER.debug("attempting to acquire dataset {} upgrade lock", source.getDatasetName());
         lockManager.upgradeDatasetLockToWrite(metadataProvider.getLocks(), source.getDataverseName(),
                 source.getDatasetName());
+        LOGGER.debug("acquired dataset {} upgrade lock", source.getDatasetName());
         LOGGER.info("Updating dataset {} node group from {} to {}", source.getDatasetName(), source.getNodeGroupName(),
                 target.getNodeGroupName());
         try {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
index 1f77aa0..d491ea4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
@@ -63,7 +63,7 @@
      * @param mode
      *            lock mode
      */
-    void lock(IMetadataLock.Mode mode);
+    void lock(IMetadataLock.Mode mode) throws InterruptedException;
 
     /**
      * Release a lock
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
index 43a1849..06a317e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
@@ -49,7 +49,12 @@
         if (isContained(mode, lock)) {
             return;
         }
-        lock.lock(mode);
+        try {
+            lock.lock(mode);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new AsterixException(e);
+        }
         indexes.put(lock.getKey(), locks.size());
         locks.add(MutablePair.of(lock, mode));
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
index 8956b93..ea4e7ad 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 public class NcLocalCounters implements Serializable {
     private static final long serialVersionUID = 3798954558299915995L;
@@ -41,6 +42,7 @@
 
     public static NcLocalCounters collect(CcId ccId, NodeControllerService ncs) throws HyracksDataException {
         final INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext();
+        resetGlobalCounters(ncs, appContext);
         long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
                 MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
         long maxTxnId = appContext.getMaxTxnId();
@@ -65,4 +67,10 @@
         return "NcLocalCounters{" + "maxResourceId=" + maxResourceId + ", maxTxnId=" + maxTxnId + ", maxJobId="
                 + maxJobId + '}';
     }
+
+    private static void resetGlobalCounters(NodeControllerService ncs, INcApplicationContext appContext) {
+        IResourceIdFactory resourceIdFactory =
+                appContext.getStorageComponentProvider().getStorageManager().getResourceIdFactory(ncs.getContext());
+        resourceIdFactory.reset();
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
index e0a6725..41d0e97 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
@@ -79,8 +79,8 @@
         lock.writeLock().unlock();
     }
 
-    private void upgradeReadLock() {
-        upgradeLock.readLock().lock();
+    private void upgradeReadLock() throws InterruptedException {
+        upgradeLock.readLock().lockInterruptibly();
     }
 
     private void modifyReadLock() {
@@ -185,7 +185,7 @@
     }
 
     @Override
-    public void lock(IMetadataLock.Mode mode) {
+    public void lock(IMetadataLock.Mode mode) throws InterruptedException {
         switch (mode) {
             case INDEX_BUILD:
                 readLock();
@@ -203,8 +203,7 @@
                 writeLock();
                 break;
             case READ:
-                readLock();
-                upgradeReadLock();
+                atomicReadLock();
                 break;
             default:
                 throw new IllegalStateException("locking mode " + mode + " is not supported");
@@ -264,6 +263,17 @@
         return Objects.equals(key, ((DatasetLock) o).key);
     }
 
+    private void atomicReadLock() throws InterruptedException {
+        readLock();
+        try {
+            upgradeReadLock();
+        } catch (InterruptedException e) {
+            readUnlock();
+            Thread.currentThread().interrupt();
+            throw e;
+        }
+    }
+
     @Override
     public String toString() {
         return String.valueOf(key);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index 2bd4f81..908663f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -41,26 +41,45 @@
 public class GlobalResourceIdFactory implements IResourceIdFactory {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final int RESOURCE_ID_BLOCK_SIZE = 25;
+    private static final int RESOURCE_ID_INITIAL_BLOCK_SIZE = 24;
+    private static final int MAX_BLOCK_SIZE = 35;
     private final INCServiceContext serviceCtx;
     private final LongPriorityQueue resourceIds =
-            LongPriorityQueues.synchronize(new LongArrayFIFOQueue(RESOURCE_ID_BLOCK_SIZE));
+            LongPriorityQueues.synchronize(new LongArrayFIFOQueue(RESOURCE_ID_INITIAL_BLOCK_SIZE));
     private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> resourceIdResponseQ;
     private final String nodeId;
+    private volatile boolean reset = false;
+    private int currentBlockSize;
 
     public GlobalResourceIdFactory(INCServiceContext serviceCtx) {
         this.serviceCtx = serviceCtx;
         this.resourceIdResponseQ = new LinkedBlockingQueue<>();
         this.nodeId = serviceCtx.getNodeId();
+        this.currentBlockSize = RESOURCE_ID_INITIAL_BLOCK_SIZE;
     }
 
-    public void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse) throws InterruptedException {
+    public synchronized void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse)
+            throws InterruptedException {
         LOGGER.debug("rec'd block of ids: {}", resourceIdResponse);
+        // to ensure any block that was requested before a reset call isn't processed, we will ignore blocks where their
+        // block size doesn't match the current block size
+        if (resourceIdResponse.getBlockSize() != currentBlockSize) {
+            LOGGER.debug("dropping outdated block size of resource ids: {}, current block size: {}", resourceIdResponse,
+                    currentBlockSize);
+            return;
+        }
         resourceIdResponseQ.put(resourceIdResponse);
     }
 
     @Override
     public long createId() throws HyracksDataException {
+        synchronized (resourceIds) {
+            if (reset) {
+                resourceIds.clear();
+                resourceIdResponseQ.clear();
+                reset = false;
+            }
+        }
         try {
             final long resourceId = resourceIds.dequeueLong();
             if (resourceIds.isEmpty()) {
@@ -97,9 +116,19 @@
         }
     }
 
-    protected void requestNewBlock() throws Exception {
+    @Override
+    public synchronized void reset() {
+        reset = true;
+        currentBlockSize += 1;
+        if (currentBlockSize > MAX_BLOCK_SIZE) {
+            currentBlockSize = RESOURCE_ID_INITIAL_BLOCK_SIZE;
+        }
+        LOGGER.debug("current resource ids block size: {}", currentBlockSize);
+    }
+
+    protected synchronized void requestNewBlock() throws Exception {
         // queue is empty; request a new block
-        ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, RESOURCE_ID_BLOCK_SIZE);
+        ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, currentBlockSize);
         ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
     }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index bb3cde5..f9bf175 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -253,7 +253,10 @@
                 for (File file : files) {
                     final LocalResource localResource = readLocalResource(file);
                     if (filter.test(localResource)) {
-                        resourcesMap.put(localResource.getId(), localResource);
+                        LocalResource duplicate = resourcesMap.putIfAbsent(localResource.getId(), localResource);
+                        if (duplicate != null) {
+                            LOGGER.warn("found duplicate resource ids {} and {}", localResource, duplicate);
+                        }
                     }
                 }
             } catch (IOException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
index 6fa6224..481c2e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
 import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
+import org.apache.hyracks.util.NetworkUtil;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -50,10 +51,11 @@
             ncState.getNodeController().heartbeatAck(ccs.getCcId(), null);
         } else {
             // unregistered nc- let him know
+            InetSocketAddress refreshedNcAddress = NetworkUtil.refresh(ncAddress);
             LOGGER.info("received a heartbeat from unregistered node {}; sending negative ack to node address {}",
-                    nodeId, ncAddress);
-            NodeControllerRemoteProxy nc =
-                    new NodeControllerRemoteProxy(ccs.getCcId(), ccs.getClusterIPC().getReconnectingHandle(ncAddress));
+                    nodeId, refreshedNcAddress);
+            NodeControllerRemoteProxy nc = new NodeControllerRemoteProxy(ccs.getCcId(),
+                    ccs.getClusterIPC().getReconnectingHandle(refreshedNcAddress));
             nc.heartbeatAck(ccs.getCcId(), HyracksDataException.create(ErrorCode.NO_SUCH_NODE, nodeId));
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index f1fe86f..ea108cb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -1139,7 +1139,9 @@
     }
 
     private ICachedPage confiscatePage(long dpid, int multiplier) throws HyracksDataException {
-        return getPageLoop(dpid, multiplier, true);
+        ICachedPage page = getPageLoop(dpid, multiplier, true);
+        page.getBuffer().clear();
+        return page;
     }
 
     private ICachedPage confiscateInner(long dpid, int multiplier) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
index 9f67540..5e38b16 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
@@ -20,12 +20,18 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-@FunctionalInterface
 public interface IResourceIdFactory {
 
     /**
      * @return A unique id
-     * @throws Exception
+     * @throws HyracksDataException
      */
     long createId() throws HyracksDataException;
+
+    /**
+     * Resets this factory to the last value used
+     */
+    default void reset() {
+        // no op
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
index 593b00d..6ed4a09 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
@@ -63,7 +63,7 @@
     private static final int NUM_PAGES = 10;
     private static final int MAX_OPEN_FILES = 20;
     private static final int HYRACKS_FRAME_SIZE = PAGE_SIZE;
-    private IHyracksTaskContext ctx = TestUtils.create(HYRACKS_FRAME_SIZE);
+    private final IHyracksTaskContext ctx = TestUtils.create(HYRACKS_FRAME_SIZE);
 
     private static final Random rnd = new Random(50);
 
@@ -443,6 +443,28 @@
         bufferCache.closeFile(fileId);
     }
 
+    @Test
+    public void testClearingConfiscatedPages() throws HyracksDataException {
+        TestStorageManagerComponentHolder.init(PAGE_SIZE, 1, MAX_OPEN_FILES);
+        IBufferCache bufferCache =
+                TestStorageManagerComponentHolder.getBufferCache(ctx.getJobletContext().getServiceContext());
+        String fileName = getFileName();
+        IIOManager ioManager = TestStorageManagerComponentHolder.getIOManager();
+        FileReference file = ioManager.resolve(fileName);
+        int fileId = bufferCache.createFile(file);
+        int testPageId = 0;
+        bufferCache.openFile(fileId);
+        ICachedPage aPage = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, testPageId));
+        Assert.assertEquals(PAGE_SIZE, aPage.getBuffer().limit());
+        Assert.assertEquals(0, aPage.getBuffer().position());
+        aPage.getBuffer().limit(5);
+        aPage.getBuffer().position(1);
+        bufferCache.returnPage(aPage);
+        aPage = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, testPageId));
+        Assert.assertEquals(PAGE_SIZE, aPage.getBuffer().limit());
+        Assert.assertEquals(0, aPage.getBuffer().position());
+    }
+
     @AfterClass
     public static void cleanup() throws Exception {
         for (String s : openedFiles) {