[MULTIPLE ISSUES][STO] Multiple fixes for cloud dep.
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
ASTERIXDB-3436
- Make certain cloud request uninterruptible
- Retry on request failure
ASTERIXDB-3443
- Fix MergedPagesRanges out of bound exception
Change-Id: Ia8c34d4ba7a3527fea22149e5065815095c39ab2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18378
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageUnstableTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageUnstableTest.java
new file mode 100644
index 0000000..a517ed4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageUnstableTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.cloud_storage;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.api.common.LocalCloudUtil;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.Description;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Run tests in cloud deployment environment with simulated unstable connection
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class CloudStorageUnstableTest {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private final TestCaseContext tcCtx;
+ private static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
+ private static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
+ private static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage.conf";
+ private static final String DELTA_RESULT_PATH = "results_cloud";
+ private static final String EXCLUDED_TESTS = "MP";
+
+ public CloudStorageUnstableTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ System.setProperty(CloudRetryableRequestUtil.CLOUD_UNSTABLE_MODE, "true");
+ LocalCloudUtil.startS3CloudEnvironment(true);
+ TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH);
+ testExecutor.executorId = "cloud";
+ testExecutor.stripSubstring = "//DB:";
+ LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, CONFIG_FILE_NAME);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ System.clearProperty(CloudRetryableRequestUtil.CLOUD_UNSTABLE_MODE);
+ LangExecutionUtil.tearDown();
+ }
+
+ @Parameters(name = "CloudStorageUnstableTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ long seed = System.nanoTime();
+ Random random = new Random(seed);
+ LOGGER.info("CloudStorageUnstableTest seed {}", seed);
+ Collection<Object[]> tests = LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
+ List<Object[]> selected = new ArrayList<>();
+ for (Object[] test : tests) {
+ // Select 10% of the tests randomly
+ if (random.nextInt(10) == 0) {
+ selected.add(test);
+ }
+ }
+ return selected;
+ }
+
+ @Test
+ public void test() throws Exception {
+ List<TestCase.CompilationUnit> cu = tcCtx.getTestCase().getCompilationUnit();
+ Assume.assumeTrue(cu.size() > 1 || !EXCLUDED_TESTS.equals(getText(cu.get(0).getDescription())));
+ LangExecutionUtil.test(tcCtx);
+ }
+
+ private static String getText(Description description) {
+ return description == null ? "" : description.getValue();
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 35b0744..91c24e8 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -51,6 +51,10 @@
import org.apache.hyracks.api.io.IIOBulkOperation;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.stream.CloudInputStream;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -173,12 +177,30 @@
@Override
public final void cloudRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
- cloudClient.read(bucket, fHandle.getFileReference().getRelativePath(), offset, data);
+ int position = data.position();
+ ICloudRequest request =
+ () -> cloudClient.read(bucket, fHandle.getFileReference().getRelativePath(), offset, data);
+ ICloudBeforeRetryRequest retry = () -> data.position(position);
+ CloudRetryableRequestUtil.run(request, retry);
}
@Override
- public final InputStream cloudRead(IFileHandle fHandle, long offset, long length) {
- return cloudClient.getObjectStream(bucket, fHandle.getFileReference().getRelativePath(), offset, length);
+ public final CloudInputStream cloudRead(IFileHandle fHandle, long offset, long length) throws HyracksDataException {
+ return CloudRetryableRequestUtil.run(() -> new CloudInputStream(this, fHandle,
+ cloudClient.getObjectStream(bucket, fHandle.getFileReference().getRelativePath(), offset, length),
+ offset, length));
+ }
+
+ @Override
+ public void restoreStream(CloudInputStream cloudStream) {
+ LOGGER.warn("Restoring stream from cloud, {}", cloudStream);
+ /*
+ * This cloud request should not be called using CloudRetryableRequestUtil as it is the responsibility of the
+ * caller to warp this request as ICloudRequest or ICloudRetry.
+ */
+ InputStream stream = cloudClient.getObjectStream(bucket, cloudStream.getPath(), cloudStream.getOffset(),
+ cloudStream.getRemaining());
+ cloudStream.setInputStream(stream);
}
@Override
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index 9e35020..cace898 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -25,6 +25,9 @@
import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -141,7 +144,9 @@
*/
writeBuffer.flip();
try {
- bufferedWriter.uploadLast(this, writeBuffer);
+ ICloudRequest request = () -> bufferedWriter.uploadLast(this, writeBuffer);
+ ICloudBeforeRetryRequest retry = () -> writeBuffer.position(0);
+ CloudRetryableRequestUtil.runWithNoRetryOnInterruption(request, retry);
} catch (Exception e) {
LOGGER.error(e);
throw HyracksDataException.create(e);
@@ -182,7 +187,10 @@
private void uploadAndWait() throws HyracksDataException {
writeBuffer.flip();
try {
- bufferedWriter.upload(this, writeBuffer.limit());
+ ICloudRequest request = () -> bufferedWriter.upload(this, writeBuffer.limit());
+ ICloudBeforeRetryRequest retry = () -> writeBuffer.position(0);
+ // This will be interrupted and the interruption will be followed by a halt
+ CloudRetryableRequestUtil.runWithNoRetryOnInterruption(request, retry);
} catch (Exception e) {
LOGGER.error(e);
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 49b919c..f8cf2c3 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -53,6 +53,7 @@
import org.apache.hyracks.api.io.IIOBulkOperation;
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -174,7 +175,7 @@
@Override
public Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException {
- return accessor.doList(dir, filter);
+ return CloudRetryableRequestUtil.run(() -> accessor.doList(dir, filter));
}
@Override
@@ -189,18 +190,18 @@
@Override
public byte[] readAllBytes(FileReference fileRef) throws HyracksDataException {
- return accessor.doReadAllBytes(fileRef);
+ return CloudRetryableRequestUtil.run(() -> accessor.doReadAllBytes(fileRef));
}
@Override
public void delete(FileReference fileRef) throws HyracksDataException {
- accessor.doDelete(fileRef);
+ CloudRetryableRequestUtil.run(() -> CloudRetryableRequestUtil.run(() -> accessor.doDelete(fileRef)));
log("DELETE", fileRef);
}
@Override
public void overwrite(FileReference fileRef, byte[] bytes) throws HyracksDataException {
- accessor.doOverwrite(fileRef, bytes);
+ CloudRetryableRequestUtil.run(() -> accessor.doOverwrite(fileRef, bytes));
log("WRITE", fileRef);
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
index 8d28d3a..fcb7bda 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
@@ -24,6 +24,7 @@
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.control.nc.io.bulk.DeleteBulkOperation;
import org.apache.logging.log4j.LogManager;
@@ -57,7 +58,7 @@
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Bulk deleting: local: {}, cloud: {}", fileReferences, paths);
}
- cloudClient.deleteObjects(bucket, paths);
+ CloudRetryableRequestUtil.run(() -> cloudClient.deleteObjects(bucket, paths));
// Bulk delete locally as well
super.performOperation();
callBack.call(fileReferences);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index b0a1e0c..ee43a2c 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -24,8 +24,10 @@
import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
import org.apache.asterix.common.config.CloudProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
public class CloudClientProvider {
+ private static final boolean UNSTABLE = isUnstable();
public static final String S3 = "s3";
public static final String GCS = "gs";
@@ -36,13 +38,21 @@
public static ICloudClient getClient(CloudProperties cloudProperties, ICloudGuardian guardian)
throws HyracksDataException {
String storageScheme = cloudProperties.getStorageScheme();
+ ICloudClient cloudClient;
if (S3.equalsIgnoreCase(storageScheme)) {
S3ClientConfig config = S3ClientConfig.of(cloudProperties);
- return new S3CloudClient(config, guardian);
+ cloudClient = new S3CloudClient(config, guardian);
} else if (GCS.equalsIgnoreCase(storageScheme)) {
GCSClientConfig config = GCSClientConfig.of(cloudProperties);
- return new GCSCloudClient(config, guardian);
+ cloudClient = new GCSCloudClient(config, guardian);
+ } else {
+ throw new IllegalStateException("unsupported cloud storage scheme: " + storageScheme);
}
- throw new IllegalStateException("unsupported cloud storage scheme: " + storageScheme);
+
+ return UNSTABLE ? new UnstableCloudClient(cloudClient) : cloudClient;
+ }
+
+ private static boolean isUnstable() {
+ return Boolean.getBoolean(CloudRetryableRequestUtil.CLOUD_UNSTABLE_MODE);
}
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
new file mode 100644
index 0000000..2ec5fb5
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients;
+
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class UnstableCloudClient implements ICloudClient {
+ // 10% error rate
+ private static final double ERROR_RATE = 0.1d;
+ private static final Random RANDOM = new Random(0);
+ private final ICloudClient cloudClient;
+
+ public UnstableCloudClient(ICloudClient cloudClient) {
+ this.cloudClient = cloudClient;
+ }
+
+ @Override
+ public int getWriteBufferSize() {
+ return cloudClient.getWriteBufferSize();
+ }
+
+ @Override
+ public IRequestProfiler getProfiler() {
+ return cloudClient.getProfiler();
+ }
+
+ @Override
+ public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
+ if (cloudClient instanceof S3CloudClient) {
+ return createUnstableWriter((S3CloudClient) cloudClient, bucket, path, bufferProvider);
+ }
+ return cloudClient.createWriter(bucket, path, bufferProvider);
+ }
+
+ @Override
+ public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) {
+ return cloudClient.listObjects(bucket, path, filter);
+ }
+
+ @Override
+ public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
+ fail();
+ return cloudClient.read(bucket, path, offset, buffer);
+ }
+
+ @Override
+ public byte[] readAllBytes(String bucket, String path) throws HyracksDataException {
+ fail();
+ return cloudClient.readAllBytes(bucket, path);
+ }
+
+ @Override
+ public InputStream getObjectStream(String bucket, String path, long offset, long length) {
+ return cloudClient.getObjectStream(bucket, path, offset, length);
+ }
+
+ @Override
+ public void write(String bucket, String path, byte[] data) {
+ cloudClient.write(bucket, path, data);
+ }
+
+ @Override
+ public void copy(String bucket, String srcPath, FileReference destPath) {
+ cloudClient.copy(bucket, srcPath, destPath);
+ }
+
+ @Override
+ public void deleteObjects(String bucket, Collection<String> paths) {
+ cloudClient.deleteObjects(bucket, paths);
+ }
+
+ @Override
+ public long getObjectSize(String bucket, String path) throws HyracksDataException {
+ fail();
+ return cloudClient.getObjectSize(bucket, path);
+ }
+
+ @Override
+ public boolean exists(String bucket, String path) throws HyracksDataException {
+ fail();
+ return cloudClient.exists(bucket, path);
+ }
+
+ @Override
+ public boolean isEmptyPrefix(String bucket, String path) throws HyracksDataException {
+ fail();
+ return cloudClient.isEmptyPrefix(bucket, path);
+ }
+
+ @Override
+ public IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager)
+ throws HyracksDataException {
+ return cloudClient.createParallelDownloader(bucket, ioManager);
+ }
+
+ @Override
+ public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
+ return cloudClient.listAsJson(objectMapper, bucket);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ cloudClient.close();
+ }
+
+ private static void fail() throws HyracksDataException {
+ double prob = RANDOM.nextInt(100) / 100.0d;
+ if (prob <= ERROR_RATE) {
+ throw HyracksDataException.create(new IOException("Simulated error"));
+ }
+ }
+
+ private static ICloudWriter createUnstableWriter(S3CloudClient cloudClient, String bucket, String path,
+ IWriteBufferProvider bufferProvider) {
+ ICloudBufferedWriter bufferedWriter =
+ new UnstableCloudBufferedWriter(cloudClient.createBufferedWriter(bucket, path));
+ return new CloudResettableInputStream(bufferedWriter, bufferProvider);
+ }
+
+ private static class UnstableCloudBufferedWriter implements ICloudBufferedWriter {
+ private final ICloudBufferedWriter bufferedWriter;
+
+ private UnstableCloudBufferedWriter(ICloudBufferedWriter bufferedWriter) {
+ this.bufferedWriter = bufferedWriter;
+ }
+
+ @Override
+ public void upload(InputStream stream, int length) throws HyracksDataException {
+ fail();
+ bufferedWriter.upload(stream, length);
+ }
+
+ @Override
+ public void uploadLast(InputStream stream, ByteBuffer buffer) throws HyracksDataException {
+ fail();
+ bufferedWriter.uploadLast(stream, buffer);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return bufferedWriter.isEmpty();
+ }
+
+ @Override
+ public void finish() throws HyracksDataException {
+ bufferedWriter.finish();
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ bufferedWriter.abort();
+ }
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
index d0dda2a..05a9fc1 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
@@ -22,12 +22,12 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -84,7 +84,6 @@
if (uploadId == null) {
profiler.objectWrite();
PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build();
- // TODO make retryable
s3Client.putObject(request, RequestBody.fromByteBuffer(buffer));
// Only set the uploadId if the putObject succeeds
uploadId = PUT_UPLOAD_ID;
@@ -111,28 +110,9 @@
CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder().parts(partQueue).build();
CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder()
.bucket(bucket).key(path).uploadId(uploadId).multipartUpload(completedMultipartUpload).build();
- int retries = 0;
- while (true) {
- try {
- completeMultipartUpload(completeMultipartUploadRequest);
- break;
- } catch (Exception e) {
- retries++;
- if (retries == MAX_RETRIES) {
- throw HyracksDataException.create(e);
- }
- LOGGER.info(() -> "S3 storage write retry, encountered: " + e.getMessage());
-
- // Backoff for 1 sec for the first 2 retries, and 2 seconds from there onward
- try {
- Thread.sleep(TimeUnit.SECONDS.toMillis(retries < 2 ? 1 : 2));
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw HyracksDataException.create(ex);
- }
- }
- }
-
+ // This will be interrupted and the interruption will be followed by a halt
+ CloudRetryableRequestUtil
+ .runWithNoRetryOnInterruption(() -> completeMultipartUpload(completeMultipartUploadRequest));
log("FINISHED");
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 5ce1f43..254bd03 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -288,6 +288,13 @@
s3Client.close();
}
+ /**
+ * FOR TESTING ONLY
+ */
+ public ICloudBufferedWriter createBufferedWriter(String bucket, String path) {
+ return new S3BufferedWriter(s3Client, profiler, guardian, bucket, path);
+ }
+
private static S3Client buildClient(S3ClientConfig config) {
S3ClientBuilder builder = S3Client.builder();
builder.credentialsProvider(config.createCredentialsProvider());
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index 0dca417..ab57139 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -18,11 +18,14 @@
*/
package org.apache.hyracks.cloud.io;
-import java.io.InputStream;
import java.nio.ByteBuffer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.stream.CloudInputStream;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
/**
* Certain operations needed to be provided by {@link org.apache.hyracks.api.io.IIOManager} to support cloud
@@ -45,7 +48,17 @@
* @param offset starting offset
* @return input stream of the required data
*/
- InputStream cloudRead(IFileHandle fHandle, long offset, long length);
+ CloudInputStream cloudRead(IFileHandle fHandle, long offset, long length) throws HyracksDataException;
+
+ /**
+ * Tries to restore the stream created by {@link #cloudRead(IFileHandle, long, long)}
+ * NOTE: The implementer of this method should not use {@link CloudRetryableRequestUtil} when calling this method.
+ * It is the responsibility of the caller to either call this method as a
+ * {@link ICloudRequest} or as a {@link ICloudBeforeRetryRequest}.
+ *
+ * @param stream to restore
+ */
+ void restoreStream(CloudInputStream stream);
/**
* Write to local drive only
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
new file mode 100644
index 0000000..4dab80d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.request;
+
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
+
+/**
+ * Certain cloud requests require some cleanup (or restoring a state) before a retry is performed.
+ * An implementation of This interface should be provided if such clean is required when
+ * reattempting a request using {@link CloudRetryableRequestUtil}
+ */
+@FunctionalInterface
+public interface ICloudBeforeRetryRequest {
+ /**
+ * Run pre-retry routine before reattempting {@link ICloudRequest} or {@link ICloudReturnableRequest}
+ */
+ void beforeRetry();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRequest.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRequest.java
new file mode 100644
index 0000000..4f25323
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRequest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.request;
+
+import java.io.IOException;
+
+/**
+ * A cloud request that can be retried using {@link org.apache.hyracks.cloud.util.CloudRetryableRequestUtil}
+ */
+@FunctionalInterface
+public interface ICloudRequest {
+ /**
+ * Run the cloud request
+ */
+ void call() throws IOException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudReturnableRequest.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudReturnableRequest.java
new file mode 100644
index 0000000..748165a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudReturnableRequest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.request;
+
+import java.io.IOException;
+
+/**
+ * A returnable cloud request that can be retried using {@link org.apache.hyracks.cloud.util.CloudRetryableRequestUtil}
+ */
+@FunctionalInterface
+public interface ICloudReturnableRequest<T> {
+ /**
+ * Run the cloud request
+ *
+ * @return the value of the request
+ */
+ T call() throws IOException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/stream/CloudInputStream.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/stream/CloudInputStream.java
new file mode 100644
index 0000000..df12003
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/stream/CloudInputStream.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class CloudInputStream {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final ICloudIOManager cloudIOManager;
+ private final IFileHandle handle;
+ private InputStream in;
+ private long offset;
+ private long remaining;
+
+ public CloudInputStream(ICloudIOManager cloudIOManager, IFileHandle handle, InputStream in, long offset,
+ long length) {
+ this.cloudIOManager = cloudIOManager;
+ this.handle = handle;
+ this.in = in;
+ this.offset = offset;
+ this.remaining = length;
+ }
+
+ public String getPath() {
+ return handle.getFileReference().getRelativePath();
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getRemaining() {
+ return remaining;
+ }
+
+ public void read(ByteBuffer buffer) throws HyracksDataException {
+ int position = buffer.position();
+ ICloudRequest read = () -> {
+ while (buffer.remaining() > 0) {
+ int length = in.read(buffer.array(), buffer.position(), buffer.remaining());
+ if (length < 0) {
+ throw new IllegalStateException("Stream should not be empty!");
+ }
+ buffer.position(buffer.position() + length);
+ }
+ };
+
+ ICloudBeforeRetryRequest retry = () -> {
+ buffer.position(position);
+ cloudIOManager.restoreStream(this);
+ };
+
+ CloudRetryableRequestUtil.run(read, retry);
+
+ offset += buffer.limit();
+ remaining -= buffer.limit();
+ }
+
+ public void skipTo(long newOffset) throws HyracksDataException {
+ if (newOffset > offset) {
+ skip(newOffset - offset);
+ }
+ }
+
+ public void close() {
+ if (remaining != 0) {
+ LOGGER.warn("Closed cloud stream with nonzero bytes = {}", remaining);
+ }
+
+ try {
+ in.close();
+ } catch (IOException e) {
+ LOGGER.error("Failed to close stream", e);
+ }
+ }
+
+ public void setInputStream(InputStream in) {
+ this.in = in;
+ }
+
+ private void skip(long n) throws HyracksDataException {
+ /*
+ * Advance offset and reduce the remaining so that the streamRestore will start from where we want to skip
+ * in case the stream has to be restored.
+ */
+ offset += n;
+ remaining -= n;
+
+ try {
+ long remaining = n;
+ while (remaining > 0) {
+ remaining -= in.skip(remaining);
+ }
+ } catch (Throwable e) {
+ if (remaining > 0) {
+ // Only restore the stream if additional bytes are required
+ CloudRetryableRequestUtil.run(() -> cloudIOManager.restoreStream(this));
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "{file: " + handle.getFileReference() + ", streamOffset: " + offset + ", streamRemaining: " + remaining
+ + "}";
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
new file mode 100644
index 0000000..2d0afbe
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.util;
+
+import java.io.IOException;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.request.ICloudReturnableRequest;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Run {@link ICloudRequest} and {@link ICloudReturnableRequest} with retries
+ */
+public class CloudRetryableRequestUtil {
+ /**
+ * Whether simulating/testing unstable cloud environment or not. This value affects the number of retries.
+ * Set this as a system property to 'true' to indicate running an unstable cloud environment.
+ *
+ * @see System#setProperty(String, String)
+ */
+ public static final String CLOUD_UNSTABLE_MODE = "cloud.unstable.mode";
+ private static final int STABLE_NUMBER_OF_RETRIES = 5;
+ private static final int UNSTABLE_NUMBER_OF_RETRIES = 100;
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final int NUMBER_OF_RETRIES = getNumberOfRetries();
+
+ private static final ICloudBeforeRetryRequest NO_OP_RETRY = () -> {
+ };
+
+ private CloudRetryableRequestUtil() {
+ }
+
+ /**
+ * Run an idempotent request and will retry if failed or interrupted
+ *
+ * @param request request to run
+ */
+ public static void run(ICloudRequest request) throws HyracksDataException {
+ run(request, NO_OP_RETRY);
+ }
+
+ /**
+ * Run a none-idempotent request and will retry if failed or interrupted.
+ * As the operation is not idempotent, {@link ICloudBeforeRetryRequest} ensures the idempotency of the provided operation
+ *
+ * @param request request to run
+ * @param retry a pre-retry routine to make the operation idempotent
+ */
+ public static void run(ICloudRequest request, ICloudBeforeRetryRequest retry) throws HyracksDataException {
+ boolean interrupted = Thread.interrupted();
+ try {
+ while (true) {
+ try {
+ doRun(request, retry);
+ break;
+ } catch (Throwable e) {
+ // First, clear the interrupted flag
+ interrupted |= Thread.interrupted();
+ if (!ExceptionUtils.causedByInterrupt(e)) {
+ // The cause isn't an interruption, rethrow
+ throw e;
+ }
+ retry.beforeRetry();
+ LOGGER.warn("Ignored interrupting ICloudReturnableRequest", e);
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Run an idempotent returnable request and will retry if failed or interrupted.
+ *
+ * @param request request to run
+ * @param <T> return type
+ * @return a value of return type
+ */
+ public static <T> T run(ICloudReturnableRequest<T> request) throws HyracksDataException {
+ return run(request, NO_OP_RETRY);
+ }
+
+ /**
+ * Run a none-idempotent returnable request and will retry if failed or interrupted.
+ * As the operation is not idempotent, {@link ICloudBeforeRetryRequest} ensures the idempotency of the provided operation
+ *
+ * @param request request to run
+ * @param <T> return type
+ * @param retry a pre-retry routine to make the operation idempotent
+ * @return a value of return type
+ */
+ public static <T> T run(ICloudReturnableRequest<T> request, ICloudBeforeRetryRequest retry)
+ throws HyracksDataException {
+ boolean interrupted = Thread.interrupted();
+ try {
+ while (true) {
+ try {
+ return doRun(request, retry);
+ } catch (Throwable e) {
+ // First, clear the interrupted flag
+ interrupted |= Thread.interrupted();
+ if (!ExceptionUtils.causedByInterrupt(e)) {
+ // The cause isn't an interruption, rethrow
+ throw e;
+ }
+ retry.beforeRetry();
+ LOGGER.warn("Ignored interrupting ICloudReturnableRequest", e);
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Run an idempotent request and will retry if failed.
+ * This will not retry if the thread is interrupted
+ *
+ * @param request request to run
+ */
+ public static void runWithNoRetryOnInterruption(ICloudRequest request) throws HyracksDataException {
+ doRun(request, NO_OP_RETRY);
+ }
+
+ /**
+ * Run a none-idempotent request and will retry if failed
+ * This will not retry if the thread is interrupted.
+ * As the operation is not idempotent, {@link ICloudBeforeRetryRequest} ensures the idempotency of the provided operation
+ *
+ * @param request request to run
+ * @param retry a pre-retry routine to make the operation idempotent
+ */
+ public static void runWithNoRetryOnInterruption(ICloudRequest request, ICloudBeforeRetryRequest retry)
+ throws HyracksDataException {
+ doRun(request, retry);
+ }
+
+ private static <T> T doRun(ICloudReturnableRequest<T> request, ICloudBeforeRetryRequest retry)
+ throws HyracksDataException {
+ int attempt = 1;
+ while (true) {
+ try {
+ return request.call();
+ } catch (IOException e) {
+ if (attempt > NUMBER_OF_RETRIES) {
+ throw HyracksDataException.create(e);
+ }
+ attempt++;
+ retry.beforeRetry();
+ LOGGER.warn("Failed to perform ICloudReturnableRequest, performing {}/{}", attempt, NUMBER_OF_RETRIES,
+ e);
+ }
+ }
+ }
+
+ private static void doRun(ICloudRequest request, ICloudBeforeRetryRequest retry) throws HyracksDataException {
+ int attempt = 1;
+ while (true) {
+ try {
+ request.call();
+ break;
+ } catch (IOException e) {
+ if (attempt > NUMBER_OF_RETRIES) {
+ throw HyracksDataException.create(e);
+ }
+ attempt++;
+ retry.beforeRetry();
+ LOGGER.warn("Failed to perform ICloudRequest, performing {}/{}", attempt, NUMBER_OF_RETRIES, e);
+ }
+ }
+ }
+
+ private static int getNumberOfRetries() {
+ boolean unstable = Boolean.getBoolean(CLOUD_UNSTABLE_MODE);
+ return unstable ? UNSTABLE_NUMBER_OF_RETRIES : STABLE_NUMBER_OF_RETRIES;
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/AbstractPageRangesComputer.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/AbstractPageRangesComputer.java
new file mode 100644
index 0000000..76b9650
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/AbstractPageRangesComputer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+
+public abstract class AbstractPageRangesComputer {
+ protected static final int INITIAL_SIZE = 40;
+ // Indicates a page is requested or not
+ protected final BitSet requestedPages;
+
+ AbstractPageRangesComputer() {
+ requestedPages = new BitSet();
+ }
+
+ abstract int getMaxNumberOfRanges();
+
+ /**
+ * Clear ranges
+ */
+ abstract void clear();
+
+ /**
+ * Add a range
+ *
+ * @param start range start
+ * @param end range end
+ */
+ abstract void addRange(int start, int end);
+
+ /**
+ * Pin the calculated ranges
+ *
+ * @param ctx Column mega-page buffer cache read context
+ * @param bufferCache buffer cache
+ * @param fileId fileId
+ * @param pageZeroId page zero ID
+ */
+ abstract void pin(CloudMegaPageReadContext ctx, IBufferCache bufferCache, int fileId, int pageZeroId)
+ throws HyracksDataException;
+
+ /**
+ * Creates a new range computer
+ *
+ * @param maxNumberOfRanges maximum number of ranges
+ * @return a new instance of {@link AbstractPageRangesComputer}
+ */
+ static AbstractPageRangesComputer create(int maxNumberOfRanges) {
+ if (maxNumberOfRanges == 1) {
+ return new SinglePageRangeComputer();
+ }
+
+ return new PageRangesComputer(maxNumberOfRanges);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index fef150a..1ecc509 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -22,6 +22,7 @@
import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE;
import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY;
import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+import static org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.CloudMegaPageReadContext.ALL_PAGES;
import java.nio.ByteBuffer;
import java.util.BitSet;
@@ -55,7 +56,7 @@
private final ColumnRanges columnRanges;
private final CloudMegaPageReadContext columnCtx;
private final BitSet projectedColumns;
- private final MergedPageRanges mergedPageRanges;
+ private final AbstractPageRangesComputer mergedPageRanges;
public CloudColumnReadContext(IColumnProjectionInfo projectionInfo, IPhysicalDrive drive, BitSet plan) {
this.operation = projectionInfo.getProjectorType();
@@ -65,7 +66,7 @@
cloudOnlyColumns = new BitSet();
columnCtx = new CloudMegaPageReadContext(operation, columnRanges, drive);
projectedColumns = new BitSet();
- mergedPageRanges = new MergedPageRanges(columnCtx, MAX_RANGES_COUNT);
+ mergedPageRanges = AbstractPageRangesComputer.create(MAX_RANGES_COUNT);
if (operation == QUERY || operation == MODIFY) {
for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns(); i++) {
int columnIndex = projectionInfo.getColumnIndex(i);
@@ -113,10 +114,10 @@
public ICachedPage pinNext(ColumnBTreeReadLeafFrame leafFrame, IBufferCache bufferCache, int fileId)
throws HyracksDataException {
int nextLeaf = leafFrame.getNextLeaf();
- // Release the previous pages (including page0)
+ // Release the previous pages
release(bufferCache);
+ // Release page0
bufferCache.unpin(leafFrame.getPage(), this);
-
// pin the next page0
ICachedPage nextPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, nextLeaf), this);
leafFrame.setPage(nextPage);
@@ -142,11 +143,11 @@
private void pinAll(int fileId, int pageZeroId, int numberOfPages, IBufferCache bufferCache)
throws HyracksDataException {
- columnCtx.pin(bufferCache, fileId, pageZeroId, 1, numberOfPages, numberOfPages, MergedPageRanges.EMPTY);
+ columnCtx.pin(bufferCache, fileId, pageZeroId, 1, numberOfPages, ALL_PAGES);
}
private void pinProjected(int fileId, int pageZeroId, IBufferCache bufferCache) throws HyracksDataException {
- mergedPageRanges.reset();
+ mergedPageRanges.clear();
int[] columnsOrder = columnRanges.getColumnsOrder();
int i = 0;
int columnIndex = columnsOrder[i];
@@ -186,8 +187,9 @@
mergedPageRanges.addRange(firstPageIdx, lastPageIdx);
}
+
// pin the calculated pageRanges
- mergedPageRanges.pin(fileId, pageZeroId, bufferCache);
+ mergedPageRanges.pin(columnCtx, bufferCache, fileId, pageZeroId);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
index ed6e83f..29c9467 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -22,7 +22,6 @@
import static org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider.DEFAULT;
import java.io.IOException;
-import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
@@ -32,6 +31,7 @@
import org.apache.hyracks.cloud.buffercache.context.BufferCacheCloudReadContextUtil;
import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.cloud.io.stream.CloudInputStream;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
@@ -47,22 +47,20 @@
import org.apache.logging.log4j.Logger;
@NotThreadSafe
-final class CloudMegaPageReadContext implements IBufferCacheReadContext {
+public final class CloudMegaPageReadContext implements IBufferCacheReadContext {
private static final Logger LOGGER = LogManager.getLogger();
+ static final BitSet ALL_PAGES = new BitSet();
private final ColumnProjectorType operation;
private final ColumnRanges columnRanges;
private final IPhysicalDrive drive;
private final List<ICachedPage> pinnedPages;
private int numberOfContiguousPages;
- // For logging, to get actual number of wanted pages
- private int numberOfWantedPages;
private int pageCounter;
- private InputStream gapStream;
+ private CloudInputStream gapStream;
- // For debugging
- private long streamOffset;
- private long remainingStreamBytes;
+ // for debugging
+ int pageZeroId;
CloudMegaPageReadContext(ColumnProjectorType operation, ColumnRanges columnRanges, IPhysicalDrive drive) {
this.operation = operation;
@@ -71,13 +69,13 @@
pinnedPages = new ArrayList<>();
}
- void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages,
- int numberOfWantedPages, BitSet unwantedPages) throws HyracksDataException {
+ void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages, BitSet requestedPages)
+ throws HyracksDataException {
closeStream();
this.numberOfContiguousPages = numberOfPages;
- this.numberOfWantedPages = numberOfWantedPages;
pageCounter = 0;
- doPin(bufferCache, fileId, pageZeroId, start, numberOfPages, numberOfWantedPages, unwantedPages);
+ this.pageZeroId = pageZeroId;
+ doPin(bufferCache, fileId, pageZeroId, start, numberOfPages, requestedPages);
}
@Override
@@ -86,10 +84,10 @@
if (cachedPage.skipCloudStream()) {
/*
* This page is requested but the buffer cache has a valid copy in memory. Also, the page itself was
- * requested to be read from the cloud. Since this page is valid, no buffer cache read() will be performed.
- * As the buffer cache read() is also responsible for persisting the bytes read from the cloud, we can end
- * up writing the bytes of this page in the position of another page. Therefore, we should skip the bytes
- * for this particular page to avoid placing the bytes of this page into another page's position.
+ * gapStream requested to be read from the cloud. Since this page is valid, no buffer cache read() will be
+ * performed. As the buffer cache read() is also responsible for persisting the bytes read from the cloud,
+ * we can end up writing the bytes of this page in the position of another page. Therefore, we should skip
+ * the bytes for this particular page to avoid placing the bytes of this page into another page's position.
*/
skipStreamIfOpened(cachedPage);
}
@@ -152,49 +150,28 @@
pinnedPages.clear();
}
- void closeStream() throws HyracksDataException {
+ void closeStream() {
if (gapStream != null) {
- if (remainingStreamBytes != 0) {
- LOGGER.warn("Closed cloud stream with nonzero bytes = {}", remainingStreamBytes);
- }
-
- try {
- gapStream.close();
- gapStream = null;
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
+ gapStream.close();
+ gapStream = null;
}
}
private void readFromStream(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
CachedPage cPage, boolean persist) throws HyracksDataException {
- InputStream stream = getOrCreateStream(ioManager, fileHandle, cPage);
+ CloudInputStream stream = getOrCreateStream(ioManager, fileHandle, cPage);
ByteBuffer buffer = header.getBuffer();
buffer.position(0);
- // If the stream consists of the unwanted pages,
- // if the currentPage's offset is greater, this means
- // the streamOffset is pointing to a previous page.
+ /*
+ * The 'gapStream' could point to an offset of an unwanted page due to range merging. For example, if
+ * 'gapStream' is currently at the offset of pageId = 5 and the cPage is for pageId = 7, then, the stream
+ * must be advanced to the cPage's offset (i.e., offset of pageId = 7) -- skipping pages 5 and 6.
+ */
+ gapStream.skipTo(cPage.getCompressedPageOffset());
- // hence we should skip those many bytes.
- // eg: if pageId(cPage) = 7 and streamOffset is pointing at 5
- // then we need to jump 5,6 page worth of compressed size.
- if (cPage.getCompressedPageOffset() > streamOffset) {
- skipBytes(cPage.getCompressedPageOffset() - streamOffset);
- }
-
- try {
- while (buffer.remaining() > 0) {
- int length = stream.read(buffer.array(), buffer.position(), buffer.remaining());
- if (length < 0) {
- throw new IllegalStateException("Stream should not be empty!");
- }
- buffer.position(buffer.position() + length);
- }
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
+ // Get the page's data from the cloud
+ doStreamRead(stream, buffer);
// Flip the buffer after reading to restore the correct position
buffer.flip();
@@ -204,12 +181,9 @@
ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
BufferCacheCloudReadContextUtil.persist(cloudIOManager, fileHandle.getFileHandle(), buffer, offset);
}
-
- streamOffset += cPage.getCompressedPageSize();
- remainingStreamBytes -= cPage.getCompressedPageSize();
}
- private InputStream getOrCreateStream(IOManager ioManager, BufferedFileHandle fileHandle, CachedPage cPage)
+ private CloudInputStream getOrCreateStream(IOManager ioManager, BufferedFileHandle fileHandle, CachedPage cPage)
throws HyracksDataException {
if (gapStream != null) {
return gapStream;
@@ -219,34 +193,24 @@
long offset = cPage.getCompressedPageOffset();
int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
long length = fileHandle.getPagesTotalSize(pageId, requiredNumOfPages);
- remainingStreamBytes = length;
- streamOffset = offset;
- LOGGER.info(
- "Cloud stream read for pageId={} starting from pageCounter={} out of "
- + "numberOfContiguousPages={} with numberOfWantedPages={}"
- + " (streamOffset = {}, remainingStreamBytes = {})",
- pageId, pageCounter, numberOfContiguousPages, numberOfWantedPages, streamOffset, remainingStreamBytes);
-
ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
gapStream = cloudIOManager.cloudRead(fileHandle.getFileHandle(), offset, length);
+ LOGGER.info(
+ "Cloud stream read for pageId={} starting from pageCounter={} out of "
+ + "numberOfContiguousPages={}. pageZeroId={} stream: {}",
+ pageId, pageCounter, numberOfContiguousPages, pageZeroId, gapStream);
+
return gapStream;
}
- private void skipBytes(long length) throws HyracksDataException {
- if (gapStream == null) {
- return;
- }
-
+ private void doStreamRead(CloudInputStream stream, ByteBuffer buffer) throws HyracksDataException {
+ int length = buffer.remaining();
try {
- long lengthToSkip = length;
- while (length > 0) {
- length -= gapStream.skip(length);
- }
- streamOffset += lengthToSkip;
- remainingStreamBytes -= lengthToSkip;
- } catch (IOException e) {
- throw HyracksDataException.create(e);
+ stream.read(buffer);
+ } catch (Throwable th) {
+ LOGGER.warn("Failed to READ {} bytes from stream {}", length, gapStream);
+ throw HyracksDataException.create(th);
}
}
@@ -255,34 +219,29 @@
return;
}
+ // Ensure the stream starts from the page's offset and also skip the page's content
+ long newOffset = cPage.getCompressedPageOffset() + cPage.getCompressedPageSize();
try {
- long remaining = cPage.getCompressedPageSize();
- while (remaining > 0) {
- remaining -= gapStream.skip(remaining);
- }
- streamOffset += cPage.getCompressedPageSize();
- remainingStreamBytes -= cPage.getCompressedPageSize();
+ gapStream.skipTo(newOffset);
} catch (IOException e) {
+ LOGGER.warn("Failed to SKIP to new offset {} from stream {}", newOffset, gapStream);
throw HyracksDataException.create(e);
}
}
private void doPin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages,
- int numberOfWantedPages, BitSet unwantedPages) throws HyracksDataException {
+ BitSet requestedPages) throws HyracksDataException {
for (int i = start; i < start + numberOfPages; i++) {
- int pageId = pageZeroId + i;
- long dpid = BufferedFileHandle.getDiskPageId(fileId, pageId);
try {
- if (!unwantedPages.get(pageId)) {
+ if (requestedPages == ALL_PAGES || requestedPages.get(i)) {
+ int pageId = pageZeroId + i;
+ long dpid = BufferedFileHandle.getDiskPageId(fileId, pageId);
pinnedPages.add(bufferCache.pin(dpid, this));
}
pageCounter++;
} catch (Throwable e) {
- LOGGER.error(
- "Error while pinning page number {} with number of pages streamed {}, "
- + "with actually wanted number of pages {}"
- + "(streamOffset:{}, remainingStreamBytes: {}) columnRanges:\n {}",
- i, numberOfPages, numberOfWantedPages, streamOffset, remainingStreamBytes, columnRanges);
+ LOGGER.error("Error while pinning page number {} with number of pages {}. "
+ + "stream: {}, columnRanges:\n {}", i, numberOfPages, gapStream, columnRanges);
throw e;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java
deleted file mode 100644
index c0c2fc9..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
-
-import java.util.BitSet;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
-import org.apache.hyracks.storage.common.buffercache.IBufferCache;
-
-import it.unimi.dsi.fastutil.ints.IntArrayList;
-import it.unimi.dsi.fastutil.ints.IntList;
-import it.unimi.dsi.fastutil.longs.LongArrayPriorityQueue;
-import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
-
-/**
- * Merge the given ranges such that the maximum number of ranges <= N.
- * Merge should be greedy as the range having lower gaps should be given priority.
- */
-public class MergedPageRanges {
- public static final BitSet EMPTY = new BitSet();
- private final CloudMegaPageReadContext columnCtx;
- private final int numRequiredRanges;
- private final IntList pageRanges;
- private final LongPriorityQueue gapRanges;
- // indicates the index of the ranges which are merged
- private final BitSet mergedIndex = new BitSet();
- // indicates a page is requested or not
- private final BitSet unwantedPages = new BitSet();
- // indicates the extra pages got included while a merge
- private int currentIndex = 0;
- private int numRanges;
-
- MergedPageRanges(CloudMegaPageReadContext columnCtx, int numRequiredRanges) {
- this.numRequiredRanges = numRequiredRanges;
- this.pageRanges = new IntArrayList(40);
- this.gapRanges = new LongArrayPriorityQueue(IntPairUtil.FIRST_COMPARATOR);
- this.columnCtx = columnCtx;
- this.numRanges = 0;
- }
-
- public void reset() {
- mergedIndex.clear();
- pageRanges.clear();
- gapRanges.clear();
- numRanges = 0;
- currentIndex = 0;
- }
-
- public void addRange(int rangeStart, int rangeEnd) {
- pageRanges.add(rangeStart);
- pageRanges.add(rangeEnd);
- numRanges++;
- }
-
- public void mergeRanges() {
- // totalMerges = totalRanges - MAXIMUM_RESULTANT_RANGES
- int merges = numRanges - numRequiredRanges;
- for (int i = 2; i < pageRanges.size(); i += 2) {
- int previousRangeEnd = pageRanges.getInt(i - 1);
- int currentRangeStart = pageRanges.getInt(i);
- // this could be optimized to just enqueue "merges" ranges,
- // but won't be much diff as the number of ranges gonna be small
- long gap = IntPairUtil.of(currentRangeStart - previousRangeEnd, i / 2);
- gapRanges.enqueue(gap);
- }
-
- int count = 0;
- while (count < merges) {
- // extract the lower 32 bits for the index.
- int index = IntPairUtil.getSecond(gapRanges.dequeueLong());
- // set the bit from [index - 1, index] indicating
- // the index and index-1 are merged.
- mergedIndex.set(index - 1, index + 1);
- count++;
- }
- }
-
- public void pin(int fileId, int pageZeroId, IBufferCache bufferCache) throws HyracksDataException {
- // since the numRanges are already within set threshold
- if (numRanges <= numRequiredRanges) {
- pinWithoutMerge(fileId, pageZeroId, bufferCache);
- return;
- }
- pinWithMerge(fileId, pageZeroId, bufferCache);
- }
-
- private void pinWithoutMerge(int fileId, int pageZeroId, IBufferCache bufferCache) throws HyracksDataException {
- for (int pageIndex = 1; pageIndex < pageRanges.size(); pageIndex += 2) {
- int lastPageIndex = pageRanges.getInt(pageIndex);
- int firstPageIndex = pageRanges.getInt(pageIndex - 1);
- int numberOfPages = lastPageIndex - firstPageIndex + 1;
- columnCtx.pin(bufferCache, fileId, pageZeroId, firstPageIndex, numberOfPages, numberOfPages, EMPTY);
- }
- }
-
- private void pinWithMerge(int fileId, int pageZeroId, IBufferCache bufferCache) throws HyracksDataException {
- // merge the range based on the numRequiredRanges.
- mergeRanges();
- // go through page ranges and pin the required ranges.
- int rangeCnt = 0;
- while (rangeCnt < numRequiredRanges) {
- unwantedPages.clear();
- long mergedRange = getNextRange();
-
- int firstRangeIdx = IntPairUtil.getFirst(mergedRange);
- int lastRangeIdx = IntPairUtil.getSecond(mergedRange);
-
- // since the ranges are flattened out in the pageRanges.
- // hence ith index's element would be at [2*i, 2*i + 1]
- int firstRangeStart = pageRanges.getInt(2 * firstRangeIdx);
- int firstRangeEnd = pageRanges.getInt(2 * firstRangeIdx + 1);
- int lastRangeStart = pageRanges.getInt(2 * lastRangeIdx);
- int lastRangeEnd = pageRanges.getInt(2 * lastRangeIdx + 1);
-
- int numberOfPages = lastRangeEnd - firstRangeStart + 1;
- // Number of unwanted pages will be zero, when there is just a single range (i.e. no merge)
- boolean areUnwantedPages = firstRangeIdx != lastRangeIdx;
- // and when the there is no extra page being fetched. eg: [1 2] [3 4]
- // for: [ 1 2 ] [ 4 5 ] [ 7 8 ] -> [ 1 8 ] ( fromIndex = 0, toIndex = 2 )
- // numberOfUnwantedPages = (4 - 2 - 1) + (7 - 5 -1) = 2
- areUnwantedPages = areUnwantedPages && (lastRangeStart - firstRangeEnd > 1);
- int numberOfUnwantedPages = 0;
- if (areUnwantedPages) {
- // iterate through the index and mark the gaps
- for (int fromIndex = firstRangeIdx; fromIndex < lastRangeIdx; fromIndex++) {
- // Gap = V (2 * (fromIndex+1) ) - V(fromIndex * 2 + 1)
- // V(index) = value at the index
- int fromRangeEnd = pageRanges.getInt(2 * fromIndex + 1);
- int toRangeStart = pageRanges.getInt(2 * (fromIndex + 1));
- // fromRangeEnd != toRangeStart, as they would have been merged already
- int rangeGap = (fromRangeEnd == toRangeStart) ? 0 : toRangeStart - fromRangeEnd - 1;
- if (rangeGap > 0) {
- unwantedPages.set(fromRangeEnd + 1, toRangeStart);
- }
- numberOfUnwantedPages += rangeGap;
- }
- }
-
- columnCtx.pin(bufferCache, fileId, pageZeroId, firstRangeStart, numberOfPages,
- numberOfPages - numberOfUnwantedPages, unwantedPages);
- rangeCnt++;
- }
- }
-
- // making package-private for MergedPageRangesTest
- long getNextRange() {
- int fromIndex = currentIndex;
- int endIndex = currentIndex;
- int toIndex;
-
- // move till we have a set index, indicating all the indexes
- // are merged into one range.
- while (endIndex < numRanges && mergedIndex.get(endIndex)) {
- endIndex++;
- }
-
- if (fromIndex == endIndex) {
- currentIndex = endIndex + 1;
- toIndex = endIndex;
- } else {
- currentIndex = endIndex;
- toIndex = endIndex - 1;
- }
-
- return IntPairUtil.of(fromIndex, toIndex);
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/PageRangesComputer.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/PageRangesComputer.java
new file mode 100644
index 0000000..28612c4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/PageRangesComputer.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import java.util.Arrays;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.longs.LongArrayPriorityQueue;
+import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
+
+/**
+ * Merge the given ranges such that the maximum number of ranges <= N.
+ * Merge should be greedy as the range having lower gaps should be given priority.
+ */
+final class PageRangesComputer extends AbstractPageRangesComputer {
+ static final int SINGLE_RANGE = 0;
+ static final int EACH_RANGE = 1;
+ static final int BOUNDARIES = 2;
+
+ private final int maxNumberOfRanges;
+ private final LongPriorityQueue gapRanges;
+ final IntList pageRanges;
+ final int[] rangeBoundaries;
+
+ PageRangesComputer(int maxNumberOfRanges) {
+ this.maxNumberOfRanges = maxNumberOfRanges;
+ pageRanges = new IntArrayList(INITIAL_SIZE);
+ gapRanges = new LongArrayPriorityQueue(IntPairUtil.FIRST_COMPARATOR);
+ rangeBoundaries = new int[maxNumberOfRanges];
+ }
+
+ @Override
+ int getMaxNumberOfRanges() {
+ return maxNumberOfRanges;
+ }
+
+ @Override
+ void clear() {
+ pageRanges.clear();
+ gapRanges.clear();
+ requestedPages.clear();
+ }
+
+ @Override
+ void addRange(int rangeStart, int rangeEnd) {
+ int previousEnd = pageRanges.size() - 1;
+
+ pageRanges.add(rangeStart);
+ pageRanges.add(rangeEnd);
+ requestedPages.set(rangeStart, rangeEnd + 1);
+
+ if (previousEnd > 0) {
+ int maxNumberOfCuts = maxNumberOfRanges - 1;
+ int gapSize = rangeStart - pageRanges.getInt(previousEnd) - 1;
+ if (gapRanges.size() < maxNumberOfCuts) {
+ // Didn't reach the maximum number cuts, add this gap
+ gapRanges.enqueue(IntPairUtil.of(gapSize, previousEnd));
+ } else if (IntPairUtil.getFirst(gapRanges.firstLong()) < gapSize) {
+ // Found a bigger gap. Remove the smallest and add this new bigger gap
+ gapRanges.dequeueLong();
+ gapRanges.enqueue(IntPairUtil.of(gapSize, previousEnd));
+ }
+ // This gap is smaller than the smallest gap in the queue, ignore
+ // A smaller gap than the smallest gap. Ignore it
+ }
+ }
+
+ @Override
+ void pin(CloudMegaPageReadContext ctx, IBufferCache bufferCache, int fileId, int pageZeroId)
+ throws HyracksDataException {
+ int mergeResult = mergeRanges();
+ switch (mergeResult) {
+ case SINGLE_RANGE:
+ pinAsSingleRange(ctx, bufferCache, fileId, pageZeroId);
+ break;
+ case EACH_RANGE:
+ pinEachRange(ctx, bufferCache, fileId, pageZeroId);
+ break;
+ default:
+ pinMergedRanges(ctx, bufferCache, fileId, pageZeroId);
+ break;
+ }
+ }
+
+ int mergeRanges() {
+ int i = 0;
+ int maxGap = 0;
+ while (!gapRanges.isEmpty()) {
+ long pair = gapRanges.dequeueLong();
+ maxGap = Math.max(maxGap, IntPairUtil.getFirst(pair));
+ rangeBoundaries[i] = IntPairUtil.getSecond(pair);
+ i++;
+ }
+
+ if (maxGap == 1) {
+ // The biggest gap is 1, merge the ranges in a single range
+ return SINGLE_RANGE;
+ }
+
+ if (getNumberOfRanges() <= maxNumberOfRanges) {
+ // the number of ranges are within the limit, pin each range separately
+ return EACH_RANGE;
+ }
+
+ // Set the last boundary
+ rangeBoundaries[maxNumberOfRanges - 1] = pageRanges.size() - 1;
+
+ // Sort cuts smallest to largest
+ Arrays.sort(rangeBoundaries);
+
+ // Use the boundaries to cut the ranges into separate ones
+ return BOUNDARIES;
+ }
+
+ private int getNumberOfRanges() {
+ return pageRanges.size() / 2;
+ }
+
+ private void pinAsSingleRange(CloudMegaPageReadContext ctx, IBufferCache bufferCache, int fileId, int pageZeroId)
+ throws HyracksDataException {
+ int start = pageRanges.getInt(0);
+ int end = pageRanges.getInt(pageRanges.size() - 1);
+ int numberOfPages = end - start + 1;
+ ctx.pin(bufferCache, fileId, pageZeroId, start, numberOfPages, requestedPages);
+ }
+
+ private void pinEachRange(CloudMegaPageReadContext ctx, IBufferCache bufferCache, int fileId, int pageZeroId)
+ throws HyracksDataException {
+ int numberOfRanges = getNumberOfRanges();
+ for (int i = 0; i < numberOfRanges; i += 2) {
+ int start = pageRanges.getInt(i);
+ int end = pageRanges.getInt(i + 1);
+ int numberOfPages = end - start + 1;
+ ctx.pin(bufferCache, fileId, pageZeroId, start, numberOfPages, requestedPages);
+ }
+ }
+
+ private void pinMergedRanges(CloudMegaPageReadContext ctx, IBufferCache bufferCache, int fileId, int pageZeroId)
+ throws HyracksDataException {
+ int startIndex = 0;
+ for (int i = 0; i < rangeBoundaries.length; i++) {
+ int endIndex = rangeBoundaries[i];
+ int rangeStart = pageRanges.getInt(startIndex);
+ int rangeEnd = pageRanges.getInt(endIndex);
+ int numberOfPages = rangeEnd - rangeStart + 1;
+ ctx.pin(bufferCache, fileId, pageZeroId, rangeStart, numberOfPages, requestedPages);
+
+ // Start from the next cut
+ startIndex = endIndex + 1;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "{rangeBoundaries: " + Arrays.toString(rangeBoundaries) + ", pageRanges: " + pageRanges
+ + ", requestedPages: " + requestedPages + "}";
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/SinglePageRangeComputer.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/SinglePageRangeComputer.java
new file mode 100644
index 0000000..4c9536b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/SinglePageRangeComputer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+
+/**
+ * Merge the given ranges such that the maximum number of ranges <= N.
+ * Merge should be greedy as the range having lower gaps should be given priority.
+ */
+final class SinglePageRangeComputer extends AbstractPageRangesComputer {
+ int rangeStart;
+ int rangeEnd;
+ private int numberOfRanges;
+
+ @Override
+ int getMaxNumberOfRanges() {
+ return 1;
+ }
+
+ @Override
+ void clear() {
+ numberOfRanges = 0;
+ requestedPages.clear();
+ }
+
+ @Override
+ void addRange(int start, int end) {
+ if (numberOfRanges++ == 0) {
+ rangeStart = start;
+ }
+ rangeEnd = end;
+ requestedPages.set(start, end + 1);
+ }
+
+ @Override
+ void pin(CloudMegaPageReadContext ctx, IBufferCache bufferCache, int fileId, int pageZeroId)
+ throws HyracksDataException {
+ int numberOfPages = rangeEnd - rangeStart + 1;
+ ctx.pin(bufferCache, fileId, pageZeroId, rangeStart, numberOfPages, requestedPages);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
index 169c14d..c5210d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
@@ -18,40 +18,29 @@
*/
package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
-import java.util.List;
-
-import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
import org.junit.Assert;
import org.junit.Test;
-import it.unimi.dsi.fastutil.Pair;
+import it.unimi.dsi.fastutil.ints.IntList;
public class MergedPageRagesTest {
- private MergedPageRanges mergedPageRanges;
- private final CloudMegaPageReadContext cloudMegaPageReadContext = null;
@Test
public void mergePageRanges1() {
int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
int requiredRangeCount = 3;
- mergedPageRanges = new MergedPageRanges(cloudMegaPageReadContext, requiredRangeCount);
+ AbstractPageRangesComputer mergedPageRanges = AbstractPageRangesComputer.create(requiredRangeCount);
for (int i = 0; i < pageRanges.length; i += 2) {
mergedPageRanges.addRange(pageRanges[i], pageRanges[i + 1]);
}
- mergedPageRanges.mergeRanges();
// since the gaps are in following order
// ( 2, 1, 4, 4 )
// since we need 3 ranges, 5 - 3 = 2 merges should be done.
// hence the resultant ranges := ( 0, 2 ), ( 3, 3 ), ( 4, 4 )
- List<Pair<Integer, Integer>> ranges = List.of(Pair.of(0, 2), Pair.of(3, 3), Pair.of(4, 4));
-
- for (int i = 0; i < requiredRangeCount; i++) {
- long nextRange = mergedPageRanges.getNextRange();
- Assert.assertEquals(ranges.get(i).first().intValue(), IntPairUtil.getFirst(nextRange));
- Assert.assertEquals(ranges.get(i).second().intValue(), IntPairUtil.getSecond(nextRange));
- }
+ int[] expectedRanges = new int[] { 1, 12, 16, 19, 23, 26 };
+ assertResult(mergedPageRanges, expectedRanges);
}
@Test
@@ -59,23 +48,17 @@
int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
int requiredRangeCount = 1;
- mergedPageRanges = new MergedPageRanges(cloudMegaPageReadContext, requiredRangeCount);
+ AbstractPageRangesComputer mergedPageRanges = AbstractPageRangesComputer.create(requiredRangeCount);
for (int i = 0; i < pageRanges.length; i += 2) {
mergedPageRanges.addRange(pageRanges[i], pageRanges[i + 1]);
}
- mergedPageRanges.mergeRanges();
// since the gaps are in following order
// ( 2, 1, 4, 4 )
// since we need 1 ranges, 5 - 4 = 1 merge should be done.
// hence the resultant ranges := ( 0, 4)
- List<Pair<Integer, Integer>> ranges = List.of(Pair.of(0, 4));
-
- for (int i = 0; i < requiredRangeCount; i++) {
- long nextRange = mergedPageRanges.getNextRange();
- Assert.assertEquals(ranges.get(i).first().intValue(), IntPairUtil.getFirst(nextRange));
- Assert.assertEquals(ranges.get(i).second().intValue(), IntPairUtil.getSecond(nextRange));
- }
+ int[] expectedRanges = new int[] { 1, 26 };
+ assertResult(mergedPageRanges, expectedRanges);
}
@Test
@@ -83,22 +66,92 @@
int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
int requiredRangeCount = 8;
- mergedPageRanges = new MergedPageRanges(cloudMegaPageReadContext, requiredRangeCount);
+ AbstractPageRangesComputer mergedPageRanges = AbstractPageRangesComputer.create(requiredRangeCount);
for (int i = 0; i < pageRanges.length; i += 2) {
mergedPageRanges.addRange(pageRanges[i], pageRanges[i + 1]);
}
- mergedPageRanges.mergeRanges();
// since the gaps are in following order
// ( 2, 1, 4, 4 )
// since we need 8 ranges, no merge should be done.
- List<Pair<Integer, Integer>> ranges =
- List.of(Pair.of(0, 0), Pair.of(1, 1), Pair.of(2, 2), Pair.of(3, 3), Pair.of(4, 4));
+ int[] expectedRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
- for (int i = 0; i < ranges.size(); i++) {
- long nextRange = mergedPageRanges.getNextRange();
- Assert.assertEquals(ranges.get(i).first().intValue(), IntPairUtil.getFirst(nextRange));
- Assert.assertEquals(ranges.get(i).second().intValue(), IntPairUtil.getSecond(nextRange));
+ assertResult(mergedPageRanges, expectedRanges);
+ }
+
+ @Test
+ public void singlePageGap() {
+ int[] pageRanges = new int[] { 1, 3, 5, 7, 9, 10 };
+
+ int requiredRangeCount = 3;
+ AbstractPageRangesComputer mergedPageRanges = AbstractPageRangesComputer.create(requiredRangeCount);
+ for (int i = 0; i < pageRanges.length; i += 2) {
+ mergedPageRanges.addRange(pageRanges[i], pageRanges[i + 1]);
+ }
+
+ // Since the gaps between the ranges are all 1, the result should be a single range
+ int[] expectedRanges = new int[] { 1, 10 };
+
+ assertResult(mergedPageRanges, expectedRanges);
+ }
+
+ private void assertResult(AbstractPageRangesComputer mergedPageRanges, int[] expectedRanges) {
+ if (mergedPageRanges.getMaxNumberOfRanges() == 1) {
+ assertSinglePageRangeComputer((SinglePageRangeComputer) mergedPageRanges, expectedRanges);
+ return;
+ }
+ PageRangesComputer pageRangesComputer = (PageRangesComputer) mergedPageRanges;
+ int mergeResult = pageRangesComputer.mergeRanges();
+ switch (mergeResult) {
+ case PageRangesComputer.SINGLE_RANGE:
+ assertSingleRange(pageRangesComputer, expectedRanges);
+ break;
+ case PageRangesComputer.EACH_RANGE:
+ assertEachRange(pageRangesComputer, expectedRanges);
+ break;
+ default:
+ assertBoundaries(pageRangesComputer, expectedRanges);
+ break;
+ }
+ }
+
+ private void assertSinglePageRangeComputer(SinglePageRangeComputer mergedPageRanges, int[] expectedRanges) {
+ Assert.assertEquals(expectedRanges.length, 2);
+ Assert.assertEquals(expectedRanges[0], mergedPageRanges.rangeStart);
+ Assert.assertEquals(expectedRanges[1], mergedPageRanges.rangeEnd);
+ }
+
+ private void assertSingleRange(PageRangesComputer mergedPageRanges, int[] expectedRanges) {
+ Assert.assertEquals(expectedRanges.length, 2);
+ IntList pageRanges = mergedPageRanges.pageRanges;
+ Assert.assertEquals(expectedRanges[0], pageRanges.getInt(0));
+ Assert.assertEquals(expectedRanges[1], pageRanges.getInt(pageRanges.size() - 1));
+ }
+
+ private void assertEachRange(PageRangesComputer mergedPageRanges, int[] expectedRanges) {
+ IntList pageRanges = mergedPageRanges.pageRanges;
+ Assert.assertEquals(expectedRanges.length, pageRanges.size());
+ for (int i = 0; i < expectedRanges.length; i++) {
+ Assert.assertEquals(expectedRanges[i], pageRanges.getInt(i));
+ }
+ }
+
+ private void assertBoundaries(PageRangesComputer mergedPageRanges, int[] expectedRanges) {
+ int[] rangeBoundaries = mergedPageRanges.rangeBoundaries;
+ Assert.assertEquals(expectedRanges.length / 2, rangeBoundaries.length);
+ IntList pageRanges = mergedPageRanges.pageRanges;
+ int startIndex = 0;
+ int expectedRange = 0;
+ for (int i = 0; i < rangeBoundaries.length; i++) {
+ int endIndex = rangeBoundaries[i];
+ int rangeStart = pageRanges.getInt(startIndex);
+ int rangeEnd = pageRanges.getInt(endIndex);
+
+ Assert.assertEquals(expectedRanges[expectedRange++], rangeStart);
+ Assert.assertEquals(expectedRanges[expectedRange++], rangeEnd);
+
+ // Start from the next cut
+ startIndex = endIndex + 1;
}
}
}