Merge branch 'gerrit/stabilization-667a908755'
Change-Id: I28b4034f3cf952b35b26ea23ddfebf0d232c4687
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index f0f0470..bee5ff9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -70,6 +70,9 @@
}
NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
(NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
+ if (localCounter != null) {
+ LOGGER.debug("returning local counters to cc: {}", localCounter);
+ }
// wrap the returned partitions in a hash set to make it serializable
Set<Integer> nodeActivePartitions = new HashSet<>(appCtx.getReplicaManager().getPartitions());
NCLifecycleTaskReportMessage result =
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b017648..e098a89 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4640,9 +4640,9 @@
}
private interface IMetadataLocker {
- void lock() throws AlgebricksException;
+ void lock() throws HyracksDataException, AlgebricksException, InterruptedException;
- void unlock() throws AlgebricksException;
+ void unlock() throws HyracksDataException, AlgebricksException;
}
private interface IResultPrinter {
@@ -4657,10 +4657,19 @@
IResultSet resultSet, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats,
IRequestParameters requestParameters, Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter)
throws Exception {
+ final IRequestTracker requestTracker = appCtx.getRequestTracker();
+ final ClientRequest clientRequest =
+ (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
final IMetadataLocker locker = new IMetadataLocker() {
@Override
- public void lock() {
- compilationLock.readLock().lock();
+ public void lock() throws RuntimeDataException, InterruptedException {
+ try {
+ compilationLock.readLock().lockInterruptibly();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ ensureNotCancelled(clientRequest);
+ throw e;
+ }
}
@Override
@@ -4813,18 +4822,20 @@
final IRequestTracker requestTracker = appCtx.getRequestTracker();
final ClientRequest clientRequest =
(ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
+ if (cancellable) {
+ clientRequest.markCancellable();
+ }
locker.lock();
try {
final JobSpecification jobSpec = compiler.compile();
if (jobSpec == null) {
return;
}
- if (cancellable) {
- clientRequest.markCancellable();
- }
final SchedulableClientRequest schedulableRequest =
SchedulableClientRequest.of(clientRequest, requestParameters, metadataProvider, jobSpec);
appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
+ // ensure request not cancelled before running job
+ ensureNotCancelled(clientRequest);
final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
clientRequest.setJobId(jobId);
if (jId != null) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index ba6c602..61562d8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -244,8 +244,10 @@
ActiveNotificationHandler activeNotificationHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
IMetadataLockManager lockManager = appCtx.getMetadataLockManager();
+ LOGGER.debug("attempting to acquire dataset {} upgrade lock", source.getDatasetName());
lockManager.upgradeDatasetLockToWrite(metadataProvider.getLocks(), source.getDataverseName(),
source.getDatasetName());
+ LOGGER.debug("acquired dataset {} upgrade lock", source.getDatasetName());
LOGGER.info("Updating dataset {} node group from {} to {}", source.getDatasetName(), source.getNodeGroupName(),
target.getNodeGroupName());
try {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
index 1f77aa0..d491ea4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
@@ -63,7 +63,7 @@
* @param mode
* lock mode
*/
- void lock(IMetadataLock.Mode mode);
+ void lock(IMetadataLock.Mode mode) throws InterruptedException;
/**
* Release a lock
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
index 43a1849..06a317e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
@@ -49,7 +49,12 @@
if (isContained(mode, lock)) {
return;
}
- lock.lock(mode);
+ try {
+ lock.lock(mode);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new AsterixException(e);
+ }
indexes.put(lock.getKey(), locks.size());
locks.add(MutablePair.of(lock, mode));
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
index 8956b93..ea4e7ad 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
public class NcLocalCounters implements Serializable {
private static final long serialVersionUID = 3798954558299915995L;
@@ -41,6 +42,7 @@
public static NcLocalCounters collect(CcId ccId, NodeControllerService ncs) throws HyracksDataException {
final INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext();
+ resetGlobalCounters(ncs, appContext);
long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
long maxTxnId = appContext.getMaxTxnId();
@@ -65,4 +67,10 @@
return "NcLocalCounters{" + "maxResourceId=" + maxResourceId + ", maxTxnId=" + maxTxnId + ", maxJobId="
+ maxJobId + '}';
}
+
+ private static void resetGlobalCounters(NodeControllerService ncs, INcApplicationContext appContext) {
+ IResourceIdFactory resourceIdFactory =
+ appContext.getStorageComponentProvider().getStorageManager().getResourceIdFactory(ncs.getContext());
+ resourceIdFactory.reset();
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
index e0a6725..41d0e97 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
@@ -79,8 +79,8 @@
lock.writeLock().unlock();
}
- private void upgradeReadLock() {
- upgradeLock.readLock().lock();
+ private void upgradeReadLock() throws InterruptedException {
+ upgradeLock.readLock().lockInterruptibly();
}
private void modifyReadLock() {
@@ -185,7 +185,7 @@
}
@Override
- public void lock(IMetadataLock.Mode mode) {
+ public void lock(IMetadataLock.Mode mode) throws InterruptedException {
switch (mode) {
case INDEX_BUILD:
readLock();
@@ -203,8 +203,7 @@
writeLock();
break;
case READ:
- readLock();
- upgradeReadLock();
+ atomicReadLock();
break;
default:
throw new IllegalStateException("locking mode " + mode + " is not supported");
@@ -264,6 +263,17 @@
return Objects.equals(key, ((DatasetLock) o).key);
}
+ private void atomicReadLock() throws InterruptedException {
+ readLock();
+ try {
+ upgradeReadLock();
+ } catch (InterruptedException e) {
+ readUnlock();
+ Thread.currentThread().interrupt();
+ throw e;
+ }
+ }
+
@Override
public String toString() {
return String.valueOf(key);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index 2bd4f81..908663f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -41,26 +41,45 @@
public class GlobalResourceIdFactory implements IResourceIdFactory {
private static final Logger LOGGER = LogManager.getLogger();
- private static final int RESOURCE_ID_BLOCK_SIZE = 25;
+ private static final int RESOURCE_ID_INITIAL_BLOCK_SIZE = 24;
+ private static final int MAX_BLOCK_SIZE = 35;
private final INCServiceContext serviceCtx;
private final LongPriorityQueue resourceIds =
- LongPriorityQueues.synchronize(new LongArrayFIFOQueue(RESOURCE_ID_BLOCK_SIZE));
+ LongPriorityQueues.synchronize(new LongArrayFIFOQueue(RESOURCE_ID_INITIAL_BLOCK_SIZE));
private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> resourceIdResponseQ;
private final String nodeId;
+ private volatile boolean reset = false;
+ private int currentBlockSize;
public GlobalResourceIdFactory(INCServiceContext serviceCtx) {
this.serviceCtx = serviceCtx;
this.resourceIdResponseQ = new LinkedBlockingQueue<>();
this.nodeId = serviceCtx.getNodeId();
+ this.currentBlockSize = RESOURCE_ID_INITIAL_BLOCK_SIZE;
}
- public void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse) throws InterruptedException {
+ public synchronized void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse)
+ throws InterruptedException {
LOGGER.debug("rec'd block of ids: {}", resourceIdResponse);
+ // to ensure any block that was requested before a reset call isn't processed, we will ignore blocks where their
+ // block size doesn't match the current block size
+ if (resourceIdResponse.getBlockSize() != currentBlockSize) {
+ LOGGER.debug("dropping outdated block size of resource ids: {}, current block size: {}", resourceIdResponse,
+ currentBlockSize);
+ return;
+ }
resourceIdResponseQ.put(resourceIdResponse);
}
@Override
public long createId() throws HyracksDataException {
+ synchronized (resourceIds) {
+ if (reset) {
+ resourceIds.clear();
+ resourceIdResponseQ.clear();
+ reset = false;
+ }
+ }
try {
final long resourceId = resourceIds.dequeueLong();
if (resourceIds.isEmpty()) {
@@ -97,9 +116,19 @@
}
}
- protected void requestNewBlock() throws Exception {
+ @Override
+ public synchronized void reset() {
+ reset = true;
+ currentBlockSize += 1;
+ if (currentBlockSize > MAX_BLOCK_SIZE) {
+ currentBlockSize = RESOURCE_ID_INITIAL_BLOCK_SIZE;
+ }
+ LOGGER.debug("current resource ids block size: {}", currentBlockSize);
+ }
+
+ protected synchronized void requestNewBlock() throws Exception {
// queue is empty; request a new block
- ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, RESOURCE_ID_BLOCK_SIZE);
+ ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, currentBlockSize);
((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index bb3cde5..f9bf175 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -253,7 +253,10 @@
for (File file : files) {
final LocalResource localResource = readLocalResource(file);
if (filter.test(localResource)) {
- resourcesMap.put(localResource.getId(), localResource);
+ LocalResource duplicate = resourcesMap.putIfAbsent(localResource.getId(), localResource);
+ if (duplicate != null) {
+ LOGGER.warn("found duplicate resource ids {} and {}", localResource, duplicate);
+ }
}
}
} catch (IOException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
index 6fa6224..481c2e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.control.cc.cluster.INodeManager;
import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
+import org.apache.hyracks.util.NetworkUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -50,10 +51,11 @@
ncState.getNodeController().heartbeatAck(ccs.getCcId(), null);
} else {
// unregistered nc- let him know
+ InetSocketAddress refreshedNcAddress = NetworkUtil.refresh(ncAddress);
LOGGER.info("received a heartbeat from unregistered node {}; sending negative ack to node address {}",
- nodeId, ncAddress);
- NodeControllerRemoteProxy nc =
- new NodeControllerRemoteProxy(ccs.getCcId(), ccs.getClusterIPC().getReconnectingHandle(ncAddress));
+ nodeId, refreshedNcAddress);
+ NodeControllerRemoteProxy nc = new NodeControllerRemoteProxy(ccs.getCcId(),
+ ccs.getClusterIPC().getReconnectingHandle(refreshedNcAddress));
nc.heartbeatAck(ccs.getCcId(), HyracksDataException.create(ErrorCode.NO_SUCH_NODE, nodeId));
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index f1fe86f..ea108cb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -1139,7 +1139,9 @@
}
private ICachedPage confiscatePage(long dpid, int multiplier) throws HyracksDataException {
- return getPageLoop(dpid, multiplier, true);
+ ICachedPage page = getPageLoop(dpid, multiplier, true);
+ page.getBuffer().clear();
+ return page;
}
private ICachedPage confiscateInner(long dpid, int multiplier) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
index 9f67540..5e38b16 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
@@ -20,12 +20,18 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
-@FunctionalInterface
public interface IResourceIdFactory {
/**
* @return A unique id
- * @throws Exception
+ * @throws HyracksDataException
*/
long createId() throws HyracksDataException;
+
+ /**
+ * Resets this factory to the last value used
+ */
+ default void reset() {
+ // no op
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
index 593b00d..6ed4a09 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
@@ -63,7 +63,7 @@
private static final int NUM_PAGES = 10;
private static final int MAX_OPEN_FILES = 20;
private static final int HYRACKS_FRAME_SIZE = PAGE_SIZE;
- private IHyracksTaskContext ctx = TestUtils.create(HYRACKS_FRAME_SIZE);
+ private final IHyracksTaskContext ctx = TestUtils.create(HYRACKS_FRAME_SIZE);
private static final Random rnd = new Random(50);
@@ -443,6 +443,28 @@
bufferCache.closeFile(fileId);
}
+ @Test
+ public void testClearingConfiscatedPages() throws HyracksDataException {
+ TestStorageManagerComponentHolder.init(PAGE_SIZE, 1, MAX_OPEN_FILES);
+ IBufferCache bufferCache =
+ TestStorageManagerComponentHolder.getBufferCache(ctx.getJobletContext().getServiceContext());
+ String fileName = getFileName();
+ IIOManager ioManager = TestStorageManagerComponentHolder.getIOManager();
+ FileReference file = ioManager.resolve(fileName);
+ int fileId = bufferCache.createFile(file);
+ int testPageId = 0;
+ bufferCache.openFile(fileId);
+ ICachedPage aPage = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, testPageId));
+ Assert.assertEquals(PAGE_SIZE, aPage.getBuffer().limit());
+ Assert.assertEquals(0, aPage.getBuffer().position());
+ aPage.getBuffer().limit(5);
+ aPage.getBuffer().position(1);
+ bufferCache.returnPage(aPage);
+ aPage = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, testPageId));
+ Assert.assertEquals(PAGE_SIZE, aPage.getBuffer().limit());
+ Assert.assertEquals(0, aPage.getBuffer().position());
+ }
+
@AfterClass
public static void cleanup() throws Exception {
for (String s : openedFiles) {