[ASTERIXDB-3421][STO] Multiple fixes in cloud disk caching
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Metadata partition should not be evicted
- Buffer cache read context should call onPin(ICachedPage) in a synchronized block on the pinning page
- Written pages should contain the compressed page size and offset
- Multiple issues when calculating page IDs of the requested columns
Change-Id: Ic94cc1e63ee4618b18c6d4c6e3e74101a7753400
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18352
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
index 0f9a4de..3c0f1df 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
@@ -28,6 +28,7 @@
import org.apache.asterix.common.cloud.IPartitionBootstrapper;
import org.apache.asterix.common.config.CloudProperties;
import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -37,6 +38,7 @@
import org.apache.hyracks.cloud.cache.service.CloudDiskCacheMonitoringAndPrefetchingService;
import org.apache.hyracks.cloud.cache.service.CloudDiskResourceCacheLockNotifier;
import org.apache.hyracks.cloud.cache.service.DiskCacheSweeperThread;
+import org.apache.hyracks.cloud.cache.service.IEvictableLocalResourceFilter;
import org.apache.hyracks.cloud.filesystem.PhysicalDrive;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.io.IOManager;
@@ -55,6 +57,8 @@
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
public final class CloudConfigurator {
+ private static final IEvictableLocalResourceFilter FILTER =
+ (x -> StoragePathUtil.getPartitionNumFromRelativePath(x.getPath()) != StorageConstants.METADATA_PARTITION);
private final CloudProperties cloudProperties;
private final IOManager localIoManager;
private final AbstractCloudIOManager cloudIOManager;
@@ -157,7 +161,7 @@
private static IDiskResourceCacheLockNotifier createLockNotifier(boolean diskCacheManagerRequired) {
if (diskCacheManagerRequired) {
- return new CloudDiskResourceCacheLockNotifier(StorageConstants.METADATA_PARTITION);
+ return new CloudDiskResourceCacheLockNotifier(FILTER);
}
return NoOpDiskResourceCacheLockNotifier.INSTANCE;
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index f395362..97169db 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -120,7 +120,7 @@
public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
guardian.checkReadAccess(bucket, path);
profiler.objectGet();
- long readTo = offset + buffer.remaining();
+ long readTo = offset + buffer.remaining() - 1;
GetObjectRequest rangeGetObjectRequest =
GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo).bucket(bucket).key(path).build();
@@ -163,7 +163,7 @@
public InputStream getObjectStream(String bucket, String path, long offset, long length) {
guardian.checkReadAccess(bucket, path);
profiler.objectGet();
- long readTo = offset + length;
+ long readTo = offset + length - 1;
GetObjectRequest getReq =
GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo).bucket(bucket).key(path).build();
try {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
index 218015d..d8aecc1 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
@@ -48,6 +48,10 @@
@Override
public void doEvict(FileReference directory) throws HyracksDataException {
+ if (!localIoManager.exists(directory)) {
+ return;
+ }
+
if (!directory.getFile().isDirectory()) {
throw new IllegalStateException(directory + " is not a directory");
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 3e0fbd4..43b5d1b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -123,11 +123,11 @@
int did = getDIDfromResourcePath(resourcePath);
LocalResource resource = resourceRepository.get(resourcePath);
DatasetResource datasetResource = datasets.get(did);
+ lockNotifier.onRegister(resource, index);
if (datasetResource == null) {
datasetResource = getDatasetLifecycle(did);
}
datasetResource.register(resource, (ILSMIndex) index);
- lockNotifier.onRegister(resource, index, datasetResource.getIndexInfo(resource.getId()).getPartition());
}
private int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
index 036a812..fc55c7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
@@ -38,14 +38,14 @@
public final class CloudDiskResourceCacheLockNotifier implements IDiskResourceCacheLockNotifier {
private static final Logger LOGGER = LogManager.getLogger();
- private final int metadataPartition;
+ private final IEvictableLocalResourceFilter filter;
private final Long2ObjectMap<LocalResource> inactiveResources;
private final Long2ObjectMap<UnsweepableIndexUnit> unsweepableIndexes;
private final Long2ObjectMap<SweepableIndexUnit> sweepableIndexes;
private final ReentrantReadWriteLock evictionLock;
- public CloudDiskResourceCacheLockNotifier(int metadataPartition) {
- this.metadataPartition = metadataPartition;
+ public CloudDiskResourceCacheLockNotifier(IEvictableLocalResourceFilter filter) {
+ this.filter = filter;
inactiveResources = Long2ObjectMaps.synchronize(new Long2ObjectOpenHashMap<>());
unsweepableIndexes = Long2ObjectMaps.synchronize(new Long2ObjectOpenHashMap<>());
sweepableIndexes = Long2ObjectMaps.synchronize(new Long2ObjectOpenHashMap<>());
@@ -53,19 +53,19 @@
}
@Override
- public void onRegister(LocalResource localResource, IIndex index, int partition) {
+ public void onRegister(LocalResource localResource, IIndex index) {
ILSMIndex lsmIndex = (ILSMIndex) index;
evictionLock.readLock().lock();
try {
- if (partition != metadataPartition) {
+ if (filter.accept(localResource)) {
long resourceId = localResource.getId();
if (lsmIndex.getDiskCacheManager().isSweepable()) {
sweepableIndexes.put(resourceId, new SweepableIndexUnit(localResource, lsmIndex));
} else {
unsweepableIndexes.put(resourceId, new UnsweepableIndexUnit(localResource));
}
+ inactiveResources.remove(localResource.getId());
}
- inactiveResources.remove(localResource.getId());
} finally {
evictionLock.readLock().unlock();
}
@@ -75,7 +75,7 @@
public void onUnregister(long resourceId) {
evictionLock.readLock().lock();
try {
- AbstractIndexUnit indexUnit = getUnit(resourceId);
+ AbstractIndexUnit indexUnit = removeUnit(resourceId);
if (indexUnit != null) {
indexUnit.drop();
} else {
@@ -86,14 +86,6 @@
}
}
- private AbstractIndexUnit getUnit(long resourceId) {
- AbstractIndexUnit indexUnit = sweepableIndexes.get(resourceId);
- if (indexUnit == null) {
- indexUnit = unsweepableIndexes.get(resourceId);
- }
- return indexUnit;
- }
-
@Override
public void onOpen(long resourceId) {
evictionLock.readLock().lock();
@@ -122,7 +114,22 @@
} finally {
evictionLock.readLock().unlock();
}
+ }
+ private AbstractIndexUnit getUnit(long resourceId) {
+ AbstractIndexUnit indexUnit = sweepableIndexes.get(resourceId);
+ if (indexUnit == null) {
+ indexUnit = unsweepableIndexes.get(resourceId);
+ }
+ return indexUnit;
+ }
+
+ private AbstractIndexUnit removeUnit(long resourceId) {
+ AbstractIndexUnit indexUnit = sweepableIndexes.remove(resourceId);
+ if (indexUnit == null) {
+ indexUnit = unsweepableIndexes.remove(resourceId);
+ }
+ return indexUnit;
}
ReentrantReadWriteLock getEvictionLock() {
@@ -133,7 +140,8 @@
inactiveResources.clear();
// First check whatever we had already
for (LocalResource lr : localResources.values()) {
- if (unsweepableIndexes.containsKey(lr.getId()) || sweepableIndexes.containsKey(lr.getId())) {
+ if (!filter.accept(lr) || unsweepableIndexes.containsKey(lr.getId())
+ || sweepableIndexes.containsKey(lr.getId())) {
// We already have this resource
continue;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
index f594e06..e466758 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
@@ -31,7 +31,6 @@
import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
import org.apache.hyracks.cloud.cache.unit.UnsweepableIndexUnit;
import org.apache.hyracks.cloud.io.ICloudIOManager;
-import org.apache.hyracks.cloud.sweeper.ISweeper;
import org.apache.hyracks.cloud.sweeper.Sweeper;
import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.storage.common.buffercache.BufferCache;
@@ -48,7 +47,7 @@
private final IPhysicalDrive physicalDrive;
private final List<SweepableIndexUnit> indexes;
private final ICloudIOManager cloudIOManager;
- private final ISweeper sweeper;
+ private final Sweeper sweeper;
private final long inactiveTimeThreshold;
public DiskCacheSweeperThread(ExecutorService executorService, long waitTime,
@@ -161,7 +160,7 @@
}
@CriticalPath
- private static void sweepIndexes(ISweeper sweeper, List<SweepableIndexUnit> indexes) {
+ private static void sweepIndexes(Sweeper sweeper, List<SweepableIndexUnit> indexes) {
for (int i = 0; i < indexes.size(); i++) {
SweepableIndexUnit index = indexes.get(i);
if (!index.isSweeping()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/IEvictableLocalResourceFilter.java
similarity index 67%
rename from hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
rename to hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/IEvictableLocalResourceFilter.java
index 9067a3f..3712918 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/IEvictableLocalResourceFilter.java
@@ -16,20 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.cloud.sweeper;
+package org.apache.hyracks.cloud.cache.service;
-import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
-import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.LocalResource;
-/**
- * Sweeps an index to relieve the pressure on a local {@link IPhysicalDrive}
- */
@FunctionalInterface
-public interface ISweeper {
+public interface IEvictableLocalResourceFilter {
/**
- * Sweep an index
+ * Whether a local resource is evictable
*
- * @param indexUnit to sweep
+ * @param resource resource to test
+ * @return true if it is cacheable, false otherwise
*/
- void sweep(SweepableIndexUnit indexUnit) throws InterruptedException;
+ boolean accept(LocalResource resource);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
deleted file mode 100644
index ca103ab..0000000
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.cloud.sweeper;
-
-import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
-
-public final class NoOpSweeper implements ISweeper {
- public static final ISweeper INSTANCE = new NoOpSweeper();
-
- private NoOpSweeper() {
- }
-
- @Override
- public void sweep(SweepableIndexUnit indexUnit) {
- // NoOp
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
index 245c957..6e12b79 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
@@ -35,7 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-public final class Sweeper implements ISweeper {
+public final class Sweeper {
private static final Logger LOGGER = LogManager.getLogger();
private static final SweepRequest POISON = new SweepRequest();
private final BlockingQueue<SweepRequest> requests;
@@ -55,7 +55,6 @@
}
}
- @Override
public void sweep(SweepableIndexUnit indexUnit) throws InterruptedException {
SweepRequest request = freeRequests.take();
request.reset(indexUnit);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
index 9fc60fa..5c6fe09 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -20,7 +20,8 @@
import static org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeperUtil.EMPTY;
import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getColumnPageIndex;
-import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfPages;
+import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getColumnStartOffset;
+import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfRemainingPages;
import java.util.BitSet;
@@ -171,15 +172,18 @@
}
/**
- * Length of a column in pages
+ * The number of pages the column occupies
*
* @param columnIndex column index
* @return number of pages
*/
public int getColumnNumberOfPages(int columnIndex) {
int pageSize = leafFrame.getBuffer().capacity();
- int numberOfPages = getNumberOfPages(getColumnLength(columnIndex), pageSize);
- return numberOfPages == 0 ? 1 : numberOfPages;
+ int offset = getColumnStartOffset(leafFrame.getColumnOffset(columnIndex), pageSize);
+ int firstBufferLength = pageSize - offset;
+ int remainingLength = getColumnLength(columnIndex) - firstBufferLength;
+ // 1 for the first page + the number of remaining pages
+ return 1 + getNumberOfRemainingPages(remainingLength, pageSize);
}
/**
@@ -231,6 +235,10 @@
return columnsOrder;
}
+ public int getTotalNumberOfPages() {
+ return leafFrame.getMegaLeafNodeNumberOfPages();
+ }
+
private void init() {
int numberOfColumns = leafFrame.getNumberOfColumns();
offsetColumnIndexPairs = LongArrays.ensureCapacity(offsetColumnIndexPairs, numberOfColumns + 1, 0);
@@ -312,5 +320,4 @@
}
builder.append('\n');
}
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index 2d6b2fd..7623698 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
import static org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext.readAndPersistPage;
+import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE;
import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY;
import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
@@ -45,9 +46,12 @@
import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.util.annotations.NotThreadSafe;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
@NotThreadSafe
public final class CloudColumnReadContext implements IColumnReadContext {
+ private static final Logger LOGGER = LogManager.getLogger();
private final ColumnProjectorType operation;
private final IPhysicalDrive drive;
private final BitSet plan;
@@ -130,10 +134,26 @@
return;
}
- // TODO What if every other page is requested. That would do N/2 request, where N is the number of pages.
- // TODO This should be optimized in a way that minimizes the number of requests
columnRanges.reset(leafFrame, projectedColumns, plan, cloudOnlyColumns);
int pageZeroId = leafFrame.getPageId();
+
+ if (operation == MERGE) {
+ pinAll(fileId, pageZeroId, leafFrame.getMegaLeafNodeNumberOfPages() - 1, bufferCache);
+ } else {
+ pinProjected(fileId, pageZeroId, bufferCache);
+ }
+ }
+
+ private void pinAll(int fileId, int pageZeroId, int numberOfPages, IBufferCache bufferCache)
+ throws HyracksDataException {
+ columnCtx.prepare(numberOfPages);
+ pin(bufferCache, fileId, pageZeroId, 1, numberOfPages);
+ }
+
+ private void pinProjected(int fileId, int pageZeroId, IBufferCache bufferCache) throws HyracksDataException {
+ // TODO What if every other page is requested. That would do N/2 request, where N is the number of pages.
+ // TODO This should be optimized in a way that minimizes the number of requests
+
int[] columnsOrders = columnRanges.getColumnsOrder();
int i = 0;
int columnIndex = columnsOrders[i];
@@ -143,38 +163,52 @@
continue;
}
- int startPageId = columnRanges.getColumnStartPageIndex(columnIndex);
- // Will increment the number pages if the next column's pages are contiguous to this column's pages
- int numberOfPages = columnRanges.getColumnNumberOfPages(columnIndex);
+ int firstPageIdx = columnRanges.getColumnStartPageIndex(columnIndex);
+ // last page of the column
+ int lastPageIdx = firstPageIdx + columnRanges.getColumnNumberOfPages(columnIndex) - 1;
// Advance to the next column to check if it has contiguous pages
columnIndex = columnsOrders[++i];
while (columnIndex > -1) {
+ int sharedPageCount = 0;
// Get the next column's start page ID
- int nextStartPageId = columnRanges.getColumnStartPageIndex(columnIndex);
- if (nextStartPageId > startPageId + numberOfPages + 1) {
- // The next startPageId is not contiguous, stop.
+ int nextStartPageIdx = columnRanges.getColumnStartPageIndex(columnIndex);
+ if (nextStartPageIdx > lastPageIdx + 1) {
+ // The nextStartPageIdx is not contiguous, stop.
break;
+ } else if (nextStartPageIdx == lastPageIdx) {
+ // A shared page
+ sharedPageCount = 1;
}
- // Last page of this column
- int nextLastPage = nextStartPageId + columnRanges.getColumnNumberOfPages(columnIndex);
- // The next column's pages are contiguous. Combine its ranges with the previous one.
- numberOfPages = nextLastPage - startPageId;
+ lastPageIdx += columnRanges.getColumnNumberOfPages(columnIndex) - sharedPageCount;
// Advance to the next column
columnIndex = columnsOrders[++i];
}
+ if (lastPageIdx >= columnRanges.getTotalNumberOfPages()) {
+ throw new IndexOutOfBoundsException("lastPageIdx=" + lastPageIdx + ">=" + "megaLeafNodePages="
+ + columnRanges.getTotalNumberOfPages());
+ }
+
+ int numberOfPages = lastPageIdx - firstPageIdx + 1;
columnCtx.prepare(numberOfPages);
- pin(bufferCache, fileId, pageZeroId, startPageId, numberOfPages);
+ pin(bufferCache, fileId, pageZeroId, firstPageIdx, numberOfPages);
}
}
- private void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numOfRequestedPages)
+ private void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages)
throws HyracksDataException {
- for (int i = start; i < start + numOfRequestedPages; i++) {
+ for (int i = start; i < start + numberOfPages; i++) {
long dpid = BufferedFileHandle.getDiskPageId(fileId, pageZeroId + i);
- pinnedPages.add(bufferCache.pin(dpid, columnCtx));
+ try {
+ pinnedPages.add(bufferCache.pin(dpid, columnCtx));
+ } catch (Throwable e) {
+ LOGGER.error("Error while pinning page number {} with number of pages {}. {}\n columnRanges:\n {}", i,
+ numberOfPages, columnCtx, columnRanges);
+ throw e;
+ }
+
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
index f1b2fd4..21d5ce7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -19,7 +19,6 @@
package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE;
-import static org.apache.hyracks.storage.common.buffercache.IBufferCache.RESERVED_HEADER_BYTES;
import static org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider.DEFAULT;
import java.io.IOException;
@@ -49,10 +48,15 @@
private final ColumnProjectorType operation;
private final ColumnRanges columnRanges;
private final IPhysicalDrive drive;
+
private int numberOfContiguousPages;
private int pageCounter;
private InputStream gapStream;
+ // For debugging
+ private long streamOffset;
+ private long remainingStreamBytes;
+
CloudMegaPageReadContext(ColumnProjectorType operation, ColumnRanges columnRanges, IPhysicalDrive drive) {
this.operation = operation;
this.columnRanges = columnRanges;
@@ -76,7 +80,7 @@
* up writing the bytes of this page in the position of another page. Therefore, we should skip the bytes
* for this particular page to avoid placing the bytes of this page into another page's position.
*/
- skipCloudBytes(cachedPage);
+ skipStreamIfOpened(cachedPage);
pageCounter++;
}
}
@@ -102,8 +106,7 @@
boolean empty = BufferCacheCloudReadContextUtil.isEmpty(header);
int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
boolean cloudOnly = columnRanges.isCloudOnly(pageId);
- ByteBuffer buffer;
- if (empty || cloudOnly || gapStream != null) {
+ if (empty || cloudOnly) {
boolean evictable = columnRanges.isEvictable(pageId);
/*
* Persist iff the following conditions are satisfied:
@@ -118,26 +121,30 @@
* 'cloudOnly' is true.
*/
boolean persist = empty && !cloudOnly && !evictable && operation != MERGE && drive.hasSpace();
- buffer = readFromStream(ioManager, fileHandle, header, cPage, persist);
- buffer.position(RESERVED_HEADER_BYTES);
+ readFromStream(ioManager, fileHandle, header, cPage, persist);
} else {
/*
- * Here we can find a page that is planned for eviction, but it has not being evicted yet
- * (i.e., empty = false). This could happen if the cursor is at a point the sweeper hasn't
- * reached yet (i.e., cloudOnly = false).
+ * Here we can find a page that is planned for eviction, but it has not being evicted yet
+ * (i.e., empty = false). This could happen if the cursor is at a point the sweeper hasn't
+ * reached yet (i.e., cloudOnly = false). Thus, whatever is read from the disk is valid.
*/
- buffer = DEFAULT.processHeader(ioManager, fileHandle, header, cPage);
+ skipStreamIfOpened(cPage);
}
if (++pageCounter == numberOfContiguousPages) {
close();
}
- return buffer;
+ // Finally process the header
+ return DEFAULT.processHeader(ioManager, fileHandle, header, cPage);
}
void close() throws HyracksDataException {
if (gapStream != null) {
+ if (remainingStreamBytes != 0) {
+ LOGGER.warn("Closed cloud stream with nonzero bytes = {}", remainingStreamBytes);
+ }
+
try {
gapStream.close();
gapStream = null;
@@ -147,13 +154,14 @@
}
}
- private ByteBuffer readFromStream(IOManager ioManager, BufferedFileHandle fileHandle,
- BufferCacheHeaderHelper header, CachedPage cPage, boolean persist) throws HyracksDataException {
+ private void readFromStream(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
+ CachedPage cPage, boolean persist) throws HyracksDataException {
InputStream stream = getOrCreateStream(ioManager, fileHandle, cPage);
ByteBuffer buffer = header.getBuffer();
buffer.position(0);
+
try {
- while (buffer.remaining() != 0) {
+ while (buffer.remaining() > 0) {
int length = stream.read(buffer.array(), buffer.position(), buffer.remaining());
if (length < 0) {
throw new IllegalStateException("Stream should not be empty!");
@@ -164,6 +172,7 @@
throw HyracksDataException.create(e);
}
+ // Flip the buffer after reading to restore the correct position
buffer.flip();
if (persist) {
@@ -172,7 +181,8 @@
BufferCacheCloudReadContextUtil.persist(cloudIOManager, fileHandle.getFileHandle(), buffer, offset);
}
- return buffer;
+ streamOffset += cPage.getCompressedPageSize();
+ remainingStreamBytes -= cPage.getCompressedPageSize();
}
private InputStream getOrCreateStream(IOManager ioManager, BufferedFileHandle fileHandle, CachedPage cPage)
@@ -185,25 +195,31 @@
long offset = cPage.getCompressedPageOffset();
int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
long length = fileHandle.getPagesTotalSize(pageId, requiredNumOfPages);
+ remainingStreamBytes = length;
+ streamOffset = offset;
+ LOGGER.info(
+ "Cloud stream read for pageId={} starting from pageCounter={} out of "
+ + "numberOfContiguousPages={} (streamOffset = {}, remainingStreamBytes = {})",
+ pageId, pageCounter, numberOfContiguousPages, streamOffset, remainingStreamBytes);
- LOGGER.info("Cloud stream read for {} pages [{}, {}]", numberOfContiguousPages - pageCounter, pageId,
- pageId + requiredNumOfPages);
ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
gapStream = cloudIOManager.cloudRead(fileHandle.getFileHandle(), offset, length);
return gapStream;
}
- private void skipCloudBytes(CloudCachedPage cachedPage) throws HyracksDataException {
+ private void skipStreamIfOpened(CachedPage cPage) throws HyracksDataException {
if (gapStream == null) {
return;
}
try {
- long remaining = cachedPage.getCompressedPageSize();
+ long remaining = cPage.getCompressedPageSize();
while (remaining > 0) {
remaining -= gapStream.skip(remaining);
}
+ streamOffset += cPage.getCompressedPageSize();
+ remainingStreamBytes -= cPage.getCompressedPageSize();
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
index 48633e7..d7b0764 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
@@ -19,7 +19,7 @@
package org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples;
import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getColumnStartOffset;
-import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfPages;
+import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfRemainingPages;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
@@ -40,7 +40,7 @@
private final IColumnReadMultiPageOp multiPageOp;
private final Queue<ICachedPage> pages;
private final LongSet pinnedPages;
- private int numberOfPages;
+ private int numberOfRemainingPages;
private int startPage;
private int startOffset;
private int length;
@@ -55,7 +55,7 @@
@Override
public void reset(ColumnBTreeReadLeafFrame frame) throws HyracksDataException {
if (columnIndex >= frame.getNumberOfColumns()) {
- numberOfPages = 0;
+ numberOfRemainingPages = 0;
length = 0;
return;
}
@@ -70,8 +70,8 @@
length = ColumnUtil.readColumnLength(firstPage, startOffset, pageSize);
// Get the remaining length of the column
int remainingLength = length - firstPage.remaining();
- // Get the number of pages this column occupies
- numberOfPages = getNumberOfPages(remainingLength, pageSize);
+ // Get the number of remaining pages this column occupies
+ numberOfRemainingPages = getNumberOfRemainingPages(remainingLength, pageSize);
//+4-bytes after reading the length
startOffset += Integer.BYTES;
//-4-bytes after reading the length
@@ -84,12 +84,12 @@
buffer.clear();
buffer.position(startOffset);
buffers.add(buffer);
- for (int i = 0; i < numberOfPages; i++) {
+ for (int i = 0; i < numberOfRemainingPages; i++) {
buffer = readNext().duplicate();
buffer.clear();
buffers.add(buffer);
}
- numberOfPages = 0;
+ numberOfRemainingPages = 0;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java
index 12bb64e..fc1e460 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java
@@ -87,7 +87,7 @@
// Read the length of this column
int length = firstPage.getInt();
// Ensure the page limit to at most a full page
- firstPage.limit(Math.min(length, pageSize));
+ firstPage.limit(Math.min(startOffset + length, pageSize));
return length;
}
@@ -98,7 +98,7 @@
* @param pageSize disk buffer cache page size
* @return number of pages the column occupies
*/
- public static int getNumberOfPages(int remainingLength, int pageSize) {
+ public static int getNumberOfRemainingPages(int remainingLength, int pageSize) {
return (int) Math.ceil((double) remainingLength / pageSize);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
index 21cefb6..a0ad045 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
@@ -321,4 +321,9 @@
final String path = fileHandle.getFileReference().getAbsolutePath();
throw new IllegalStateException(String.format(ERROR_MESSAGE, op, expected, actual, path));
}
+
+ @Override
+ public String toString() {
+ return fileHandle != null ? fileHandle.getFileReference().getAbsolutePath() : "";
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index ab37532..271afad 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -203,12 +203,11 @@
}
}
- // Notify context page is going to be pinned
- context.onPin(cPage);
-
// Resolve race of multiple threads trying to read the page from
// disk.
synchronized (cPage) {
+ // Notify context page is going to be pinned
+ context.onPin(cPage);
if (!cPage.valid) {
try {
tryRead(cPage, context);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index 6fd18ff..b9d4e5a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -23,6 +23,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
/**
* @author yingyib
*/
@@ -209,4 +211,10 @@
public int getCompressedPageSize() {
return compressedSize;
}
+
+ @Override
+ public String toString() {
+ return "CachedPage:[page:" + BufferedFileHandle.getPageId(dpid) + ", compressedPageOffset:" + compressedOffset
+ + ", compressedSize:" + compressedSize + "]";
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
index 1e986b3..6ce985e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
@@ -24,7 +24,6 @@
/**
* A proxy to notify a disk-cache (a faster disk that is caching a slower resource) about resource lifecycle events.
* The notifier could block a resource from being operated on if the disk-cache manager denying access to a resource
- * TODO Do we care about dataset?
*/
public interface IDiskResourceCacheLockNotifier {
/**
@@ -33,9 +32,8 @@
*
* @param localResource resource to be registered
* @param index of the resource
- * @param partition partition
*/
- void onRegister(LocalResource localResource, IIndex index, int partition);
+ void onRegister(LocalResource localResource, IIndex index);
/**
* Notify unregistering an existing resource
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
index b83c388..2c590cb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
@@ -28,7 +28,7 @@
}
@Override
- public void onRegister(LocalResource localResource, IIndex index, int partition) {
+ public void onRegister(LocalResource localResource, IIndex index) {
// NoOp
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
index 970a0ae..ff2bd83 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
@@ -114,11 +114,13 @@
final boolean contiguousLargePages = getPageId(cPage.getDiskPageId()) + 1 == extraBlockPageId;
IFileHandle handle = getFileHandle();
long bytesWritten;
+ long offset;
try {
buf.limit(contiguousLargePages ? bufferCache.getPageSize() * totalPages : bufferCache.getPageSize());
buf.position(0);
ByteBuffer[] buffers = header.prepareWrite(cPage);
- bytesWritten = context.write(ioManager, handle, getFirstPageOffset(cPage), buffers);
+ offset = getFirstPageOffset(cPage);
+ bytesWritten = context.write(ioManager, handle, offset, buffers);
} finally {
returnHeaderHelper(header);
}
@@ -130,6 +132,9 @@
final int expectedWritten = bufferCache.getPageSizeWithHeader() + bufferCache.getPageSize() * (totalPages - 1);
verifyBytesWritten(expectedWritten, bytesWritten);
+
+ cPage.setCompressedPageOffset(offset);
+ cPage.setCompressedPageSize((int) bytesWritten);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index 6fe3846..cd882b5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -114,15 +114,16 @@
} else {
uBuffer.position(0);
}
+ long offset;
if (compressToWriteBuffer(uBuffer, cBuffer) < bufferCache.getPageSize()) {
cBuffer.position(0);
- final long offset = compressedFileManager.writePageInfo(pageId, cBuffer.remaining());
+ offset = compressedFileManager.writePageInfo(pageId, cBuffer.remaining());
expectedBytesWritten = cBuffer.limit();
bytesWritten = context.write(ioManager, handle, offset, cBuffer);
} else {
//Compression did not gain any savings
final ByteBuffer[] buffers = header.prepareWrite(cPage);
- final long offset = compressedFileManager.writePageInfo(pageId, bufferCache.getPageSizeWithHeader());
+ offset = compressedFileManager.writePageInfo(pageId, bufferCache.getPageSizeWithHeader());
expectedBytesWritten = buffers[0].limit() + (long) buffers[1].limit();
bytesWritten = context.write(ioManager, handle, offset, buffers);
}
@@ -134,6 +135,9 @@
writeExtraCompressedPages(cPage, cBuffer, totalPages, extraBlockPageId);
}
+ cPage.setCompressedPageOffset(offset);
+ cPage.setCompressedPageSize((int) bytesWritten);
+
} finally {
returnHeaderHelper(header);
}