[ASTERIXDB-3375][STO][HYR] Introduce local disk caching APIs
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Introduce hyracks-cloud and local disk caching APIs
Change-Id: I8a1ce6616d1190f8b047570fac1d774297a3d7de
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18227
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
index c826ed7..3f91c75 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
@@ -28,16 +28,21 @@
* Only used for files which are stored inside an IO device.
*/
public class FileReference implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final File file;
private final IODeviceHandle dev;
private final String path;
private long registrationTime = 0L;
+ /**
+ * Used to determine if holes can exist in this file
+ */
+ private boolean holesAllowed;
public FileReference(IODeviceHandle dev, String path) {
file = new File(dev.getMount(), path);
this.dev = dev;
this.path = path;
+ holesAllowed = false;
}
public File getFile() {
@@ -132,4 +137,15 @@
String parentPath = path.substring(0, parentIndex);
return new FileReference(dev, parentPath);
}
+
+ public boolean areHolesAllowed() {
+ return holesAllowed;
+ }
+
+ /**
+ * Set the file to allow holes. Once set, it cannot be changed.
+ */
+ public void setHolesAllowed() {
+ this.holesAllowed = true;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IDiskSpaceMaker.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IDiskSpaceMaker.java
new file mode 100644
index 0000000..b5b912e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IDiskSpaceMaker.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.io;
+
+import java.io.IOException;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IDiskSpaceMaker {
+ void makeSpaceOrThrow(IOException e) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/pom.xml b/hyracks-fullstack/hyracks/hyracks-cloud/pom.xml
new file mode 100644
index 0000000..6ff2ef6
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/pom.xml
@@ -0,0 +1,91 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-cloud</artifactId>
+ <parent>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.3.10-SNAPSHOT</version>
+ </parent>
+ <licenses>
+ <license>
+ <name>Apache License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ <comments>A business-friendly OSS license</comments>
+ </license>
+ </licenses>
+ <properties>
+ <root.dir>${basedir}/../..</root.dir>
+ <jnr-posix.version>3.1.19</jnr-posix.version>
+ <jnr-ffi.version>2.2.16</jnr-ffi.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-am-lsm-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.github.jnr</groupId>
+ <artifactId>jnr-posix</artifactId>
+ <version>${jnr-posix.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.jnr</groupId>
+ <artifactId>jnr-ffi</artifactId>
+ <version>${jnr-ffi.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/ContextUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/ContextUtil.java
new file mode 100644
index 0000000..dfcd6f1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/ContextUtil.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.buffercache.context;
+
+import static org.apache.hyracks.storage.common.buffercache.IBufferCache.RESERVED_HEADER_BYTES;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+
+public class ContextUtil {
+ private ContextUtil() {
+ }
+
+ public static boolean isEmpty(BufferCacheHeaderHelper header) {
+ ByteBuffer headerBuf = header.getBuffer();
+ // THIS IS ONLY VALID FOR COLUMNAR
+ return headerBuf.getInt(BufferCacheHeaderHelper.FRAME_MULTIPLIER_OFF) == 0;
+ }
+
+ public static void persist(ICloudIOManager cloudIOManager, IFileHandle fileHandle, ByteBuffer buffer, long offset)
+ throws HyracksDataException {
+ int originalLimit = buffer.limit();
+ buffer.position(RESERVED_HEADER_BYTES);
+
+ // First write the content of the page
+ cloudIOManager.localWriter(fileHandle, offset + RESERVED_HEADER_BYTES, buffer);
+
+ // If a failure happened before this position, we are sure the header still has 0 for
+ // the multiplier (i.e., a hole)
+
+ // Next, write the header. This is like a "commit" for the page
+ buffer.position(0);
+ buffer.limit(RESERVED_HEADER_BYTES);
+ // TODO what if this failed to write fully? (e.g., it wrote the first 3 bytes of the multiplier)
+ cloudIOManager.localWriter(fileHandle, offset, buffer);
+
+ // After this point the header is written. We are sure the page is valid and has the correct multiplier
+
+ // Restore the page's position and limit
+ buffer.position(0);
+ buffer.limit(originalLimit);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
new file mode 100644
index 0000000..c4ae6c0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.buffercache.context;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheWriteContext;
+
+public final class DefaultCloudOnlyWriteContext implements IBufferCacheWriteContext {
+ public static final IBufferCacheWriteContext INSTANCE = new DefaultCloudOnlyWriteContext();
+
+ private DefaultCloudOnlyWriteContext() {
+ }
+
+ @Override
+ public int write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer data)
+ throws HyracksDataException {
+ ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
+ return cloudIOManager.cloudWrite(handle, offset, data);
+ }
+
+ @Override
+ public long write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer[] data)
+ throws HyracksDataException {
+ ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
+ return cloudIOManager.cloudWrite(handle, offset, data);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudReadContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudReadContext.java
new file mode 100644
index 0000000..8dfd8ce
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudReadContext.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.buffercache.context;
+
+import static org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper.EXTRA_BLOCK_PAGE_ID_OFF;
+import static org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper.FRAME_MULTIPLIER_OFF;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+/**
+ * Default context for {@link BufferCache#pin(long, IBufferCacheReadContext)}
+ * and {@link BufferCache#unpin(ICachedPage, IBufferCacheReadContext)} in a cloud deployment.
+ * <p>
+ * The default behavior of this context is persisting the pages read from cloud if {@link IPhysicalDrive}
+ * reports that the local drive(s) are not pressured.
+ */
+@ThreadSafe
+public class DefaultCloudReadContext implements IBufferCacheReadContext {
+ private final IPhysicalDrive drive;
+
+ public DefaultCloudReadContext(IPhysicalDrive drive) {
+ this.drive = drive;
+ }
+
+ @Override
+ public void onPin(ICachedPage page) {
+ // NoOp
+ }
+
+ @Override
+ public void onUnpin(ICachedPage page) {
+ // NoOp
+ }
+
+ @Override
+ public boolean isNewPage() {
+ return false;
+ }
+
+ @Override
+ public boolean incrementStats() {
+ return true;
+ }
+
+ @Override
+ public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
+ CachedPage cPage) throws HyracksDataException {
+ return readAndPersistPage(ioManager, fileHandle, header, cPage, drive.hasSpace());
+ }
+
+ public static ByteBuffer readAndPersistPage(IOManager ioManager, BufferedFileHandle fileHandle,
+ BufferCacheHeaderHelper header, CachedPage cPage, boolean persist) throws HyracksDataException {
+ ByteBuffer headerBuf = readAndPersistIfEmpty(ioManager, fileHandle.getFileHandle(), header, cPage, persist);
+
+ cPage.setFrameSizeMultiplier(headerBuf.getInt(FRAME_MULTIPLIER_OFF));
+ cPage.setExtraBlockPageId(headerBuf.getInt(EXTRA_BLOCK_PAGE_ID_OFF));
+ headerBuf.position(BufferCache.RESERVED_HEADER_BYTES);
+ return headerBuf;
+ }
+
+ /**
+ * Note: {@link BufferCache} guarantees that no two reads on the same page can happen at the same time.
+ * This means persisting the page on disk here is guaranteed to be done by a single thread.
+ *
+ * @param ioManager ioManager (guaranteed to be {@link ICloudIOManager}
+ * @param fileHandle file
+ * @param header header buffer provider
+ * @param cPage cached page
+ * @return header buffer
+ */
+ private static ByteBuffer readAndPersistIfEmpty(IOManager ioManager, IFileHandle fileHandle,
+ BufferCacheHeaderHelper header, CachedPage cPage, boolean persist) throws HyracksDataException {
+ ByteBuffer headerBuf = header.getBuffer();
+ if (ContextUtil.isEmpty(header)) {
+ // header indicates the page is empty
+ // reset the buffer position to 0. Limit should be already set before the call of processHeader
+ headerBuf.position(0);
+ long offset = cPage.getCompressedPageOffset();
+ ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
+ // Read pageZero from the cloud
+ cloudIOManager.cloudRead(fileHandle, offset, headerBuf);
+ headerBuf.flip();
+
+ if (persist) {
+ ContextUtil.persist(cloudIOManager, fileHandle, headerBuf, offset);
+ }
+ }
+
+ return headerBuf;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/CloudCachedPage.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/CloudCachedPage.java
new file mode 100644
index 0000000..76d937c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/CloudCachedPage.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.buffercache.page;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+
+public final class CloudCachedPage extends CachedPage {
+ private ISweepLockInfo lockInfo;
+
+ private CloudCachedPage() {
+ // Disable default constructor
+ }
+
+ CloudCachedPage(int cpid, ByteBuffer buffer, IPageReplacementStrategy replacementStrategy) {
+ super(cpid, buffer, replacementStrategy);
+ lockInfo = UnlockedSweepLockInfo.INSTANCE;
+ }
+
+ public ISweepLockInfo beforeRead() {
+ latch.readLock().lock();
+ return lockInfo;
+ }
+
+ public void afterRead() {
+ latch.readLock().unlock();
+ }
+
+ public boolean trySweepLock(ISweepLockInfo lockInfo) {
+ if (!latch.writeLock().tryLock()) {
+ return false;
+ }
+
+ try {
+ this.lockInfo = lockInfo;
+ } finally {
+ latch.writeLock().unlock();
+ }
+
+ return true;
+ }
+
+ public void sweepUnlock() {
+ latch.writeLock().lock();
+ try {
+ this.lockInfo = UnlockedSweepLockInfo.INSTANCE;
+ } finally {
+ latch.writeLock().unlock();
+ }
+ }
+
+ public boolean skipCloudStream() {
+ return valid;
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/CloudDiskCachedPageAllocator.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/CloudDiskCachedPageAllocator.java
new file mode 100644
index 0000000..d9c91f3
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/CloudDiskCachedPageAllocator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.buffercache.page;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IDiskCachedPageAllocator;
+import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+
+public final class CloudDiskCachedPageAllocator implements IDiskCachedPageAllocator {
+ public static final IDiskCachedPageAllocator INSTANCE = new CloudDiskCachedPageAllocator();
+
+ private CloudDiskCachedPageAllocator() {
+ }
+
+ @Override
+ public CachedPage allocate(int cpid, ByteBuffer buffer, IPageReplacementStrategy replacementStrategy) {
+ return new CloudCachedPage(cpid, buffer, replacementStrategy);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/ISweepLockInfo.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/ISweepLockInfo.java
new file mode 100644
index 0000000..a90c74d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/ISweepLockInfo.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.buffercache.page;
+
+/**
+ * Sweep lock information holder of {@link CloudCachedPage}
+ */
+public interface ISweepLockInfo {
+ /**
+ * @return true if {@link CloudCachedPage} is locked for a sweep operation
+ */
+ boolean isLocked();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/UnlockedSweepLockInfo.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/UnlockedSweepLockInfo.java
new file mode 100644
index 0000000..e7c5f57
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/page/UnlockedSweepLockInfo.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.buffercache.page;
+
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+/**
+ * The default information holder when unlocked
+ */
+@ThreadSafe
+final class UnlockedSweepLockInfo implements ISweepLockInfo {
+ public static final ISweepLockInfo INSTANCE = new UnlockedSweepLockInfo();
+
+ @Override
+ public boolean isLocked() {
+ return false;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
new file mode 100644
index 0000000..381d5eb
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.cache.service;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.prefetch.AbstractPrefetchRequest;
+import org.apache.hyracks.storage.common.disk.prefetch.IPrefetchHandler;
+
+public final class CloudDiskCacheMonitoringAndPrefetchingService
+ implements IDiskCacheMonitoringService, IPrefetchHandler {
+ private final ExecutorService executor;
+ private final DiskCacheSweeperThread monitorThread;
+ private final IPhysicalDrive drive;
+
+ public CloudDiskCacheMonitoringAndPrefetchingService(ExecutorService executor, IPhysicalDrive drive,
+ DiskCacheSweeperThread monitorThread) {
+ this.executor = executor;
+ this.drive = drive;
+ this.monitorThread = monitorThread;
+ }
+
+ @Override
+ public void start() {
+ executor.execute(monitorThread);
+ }
+
+ @Override
+ public void stop() {
+ executor.shutdown();
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return true;
+ }
+
+ @Override
+ public IPhysicalDrive getPhysicalDrive() {
+ return drive;
+ }
+
+ @Override
+ public void request(AbstractPrefetchRequest request) throws HyracksDataException {
+ // TODO implement
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
new file mode 100644
index 0000000..004c5f5
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.cache.service;
+
+import org.apache.hyracks.cloud.cache.unit.DatasetUnit;
+import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+// TODO locking should be revised
+public final class CloudDiskResourceCacheLockNotifier implements IDiskResourceCacheLockNotifier {
+ private final Int2ObjectMap<DatasetUnit> datasets;
+
+ public CloudDiskResourceCacheLockNotifier() {
+ datasets = Int2ObjectMaps.synchronize(new Int2ObjectOpenHashMap<>());
+ }
+
+ @Override
+ public void onRegister(int datasetId, LocalResource localResource, IIndex index) {
+ ILSMIndex lsmIndex = (ILSMIndex) index;
+ if (lsmIndex.getDiskCacheManager().isSweepable()) {
+ DatasetUnit datasetUnit = datasets.computeIfAbsent(datasetId, DatasetUnit::new);
+ datasetUnit.addIndex(localResource.getId(), lsmIndex);
+ }
+ }
+
+ @Override
+ public void onUnregister(int datasetId, long resourceId) {
+ DatasetUnit datasetUnit = datasets.get(datasetId);
+ if (datasetUnit != null && datasetUnit.dropIndex(resourceId)) {
+ datasets.remove(datasetId);
+ }
+ }
+
+ @Override
+ public void onOpen(int datasetId, long resourceId) {
+ DatasetUnit datasetUnit = datasets.get(datasetId);
+ if (datasetUnit != null) {
+ IndexUnit indexUnit = datasetUnit.getIndex(resourceId);
+ if (indexUnit != null) {
+ indexUnit.readLock();
+ }
+ }
+ }
+
+ @Override
+ public void onClose(int datasetId, long resourceId) {
+ DatasetUnit datasetUnit = datasets.get(datasetId);
+ if (datasetUnit != null) {
+ IndexUnit indexUnit = datasetUnit.getIndex(resourceId);
+ if (indexUnit != null) {
+ indexUnit.readUnlock();
+ }
+ }
+ }
+
+ Int2ObjectMap<DatasetUnit> getDatasets() {
+ return datasets;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
new file mode 100644
index 0000000..79b978e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.cache.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IDiskSpaceMaker;
+import org.apache.hyracks.cloud.cache.unit.DatasetUnit;
+import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.cloud.sweeper.ISweeper;
+import org.apache.hyracks.cloud.sweeper.Sweeper;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.CriticalPath;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class DiskCacheSweeperThread implements Runnable, IDiskSpaceMaker {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final long waitTime;
+ private final CloudDiskResourceCacheLockNotifier resourceManager;
+ private final IPhysicalDrive physicalDrive;
+ private final List<IndexUnit> indexes;
+ private final ISweeper sweeper;
+
+ public DiskCacheSweeperThread(ExecutorService executorService, long waitTime,
+ CloudDiskResourceCacheLockNotifier resourceManager, ICloudIOManager cloudIOManager, int numOfSweepThreads,
+ int sweepQueueSize, IPhysicalDrive physicalDrive, BufferCache bufferCache,
+ Map<Integer, BufferedFileHandle> fileInfoMap) {
+ this.waitTime = TimeUnit.SECONDS.toMillis(waitTime);
+ this.resourceManager = resourceManager;
+ this.physicalDrive = physicalDrive;
+ indexes = new ArrayList<>();
+ sweeper = new Sweeper(executorService, cloudIOManager, bufferCache, fileInfoMap, numOfSweepThreads,
+ sweepQueueSize);
+ }
+
+ @Override
+ public void makeSpaceOrThrow(IOException ioException) throws HyracksDataException {
+ if (ioException.getMessage().contains("no space")) {
+ synchronized (this) {
+ // Notify the sweeper thread
+ notify();
+ try {
+ // Wait for the sweep to finish
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } else {
+ throw HyracksDataException.create(ioException);
+ }
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName(this.getClass().getSimpleName());
+ while (true) {
+ synchronized (this) {
+ try {
+ sweep();
+ wait(waitTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ notifyAll();
+ }
+ }
+ }
+ }
+
+ private void sweep() {
+ if (physicalDrive.computeAndCheckIsPressured()) {
+ for (DatasetUnit dataset : resourceManager.getDatasets().values()) {
+ indexes.clear();
+ dataset.getIndexes(indexes);
+ sweepIndexes(sweeper, indexes);
+ }
+ }
+ }
+
+ @CriticalPath
+ private static void sweepIndexes(ISweeper sweeper, List<IndexUnit> indexes) {
+ for (int i = 0; i < indexes.size(); i++) {
+ IndexUnit index = indexes.get(i);
+ if (!index.isSweeping()) {
+ try {
+ sweeper.sweep(index);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Sweeping thread interrupted", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java
new file mode 100644
index 0000000..34b37fb
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.cache.unit;
+
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+
+public final class DatasetUnit {
+ private final int id;
+ private final ReentrantReadWriteLock lock;
+ /**
+ * Maps resourceId to {@link IndexUnit}
+ */
+ private final Long2ObjectMap<IndexUnit> indexes;
+
+ public DatasetUnit(int datasetId) {
+ id = datasetId;
+ lock = new ReentrantReadWriteLock();
+ indexes = new Long2ObjectOpenHashMap<>();
+
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public IndexUnit addIndex(long resourceId, ILSMIndex index) {
+ writeLock();
+ try {
+ IndexUnit indexUnit = new IndexUnit(resourceId, index);
+ indexes.put(resourceId, indexUnit);
+ return indexUnit;
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public boolean dropIndex(long resourceId) {
+ IndexUnit indexUnit = indexes.remove(resourceId);
+ // Signal that the index is being dropped so a sweeper thread does not sweep this index or stops sweeping
+ indexUnit.setDropped();
+ // Wait for the sweep operation (if running) before allowing the index to be dropped
+ indexUnit.waitForSweep();
+ return indexUnit.getIndex().isPrimaryIndex();
+ }
+
+ public IndexUnit getIndex(long resourceId) {
+ readLock();
+ try {
+ return indexes.get(resourceId);
+ } finally {
+ readUnlock();
+ }
+ }
+
+ /**
+ * Return the current indexes
+ *
+ * @param indexUnits container used to return the current indexes
+ */
+ public void getIndexes(List<IndexUnit> indexUnits) {
+ readLock();
+ try {
+ indexUnits.addAll(indexes.values());
+ } finally {
+ readUnlock();
+ }
+ }
+
+ private void readLock() {
+ lock.readLock().lock();
+ }
+
+ private void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ private void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ lock.writeLock().unlock();
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java
new file mode 100644
index 0000000..8f8b412
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.cache.unit;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+// TODO allow evicting an index entirely
+public final class IndexUnit {
+ private final long id;
+ private final ILSMIndex index;
+ private final AtomicBoolean dropped;
+ private final AtomicBoolean sweeping;
+ private final AtomicInteger readCounter;
+
+ public IndexUnit(long resourceId, ILSMIndex index) {
+ this.id = resourceId;
+ this.index = index;
+ dropped = new AtomicBoolean(false);
+ sweeping = new AtomicBoolean(false);
+ readCounter = new AtomicInteger(0);
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public ILSMIndex getIndex() {
+ return index;
+ }
+
+ public void setDropped() {
+ dropped.set(false);
+ }
+
+ public boolean isDropped() {
+ return dropped.get();
+ }
+
+ public boolean isSweeping() {
+ return sweeping.get();
+ }
+
+ public void startSweeping() {
+ sweeping.set(true);
+ }
+
+ public void waitForSweep() {
+ synchronized (sweeping) {
+ while (sweeping.get()) {
+ // This should not be interrupted until we get a notification the sweep is done
+ InvokeUtil.doUninterruptibly(sweeping::wait);
+ }
+ }
+ }
+
+ public void finishedSweeping() {
+ sweeping.set(false);
+ synchronized (sweeping) {
+ sweeping.notifyAll();
+ }
+ }
+
+ public void readLock() {
+ readCounter.incrementAndGet();
+ }
+
+ public void readUnlock() {
+ readCounter.decrementAndGet();
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/DefaultFileSystemOperator.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/DefaultFileSystemOperator.java
new file mode 100644
index 0000000..e389bf0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/DefaultFileSystemOperator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.filesystem;
+
+import java.nio.channels.FileChannel;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+class DefaultFileSystemOperator implements IFileSystemOperator {
+ @Override
+ public int getFileDescriptor(FileChannel fileChannel) {
+ return -1;
+ }
+
+ @Override
+ public int getBlockSize(int fileDescriptor) throws HyracksDataException {
+ return -1;
+ }
+
+ @Override
+ public long punchHole(int fileDescriptor, long offset, long length, int blockSize) {
+ return 0;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/FileSystemOperationDispatcherUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/FileSystemOperationDispatcherUtil.java
new file mode 100644
index 0000000..6708644
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/FileSystemOperationDispatcherUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.filesystem;
+
+import java.nio.channels.FileChannel;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FileSystemOperationDispatcherUtil {
+ private static final IFileSystemOperator FILE_SYSTEM_OPERATOR = createFileSystemOperator();
+
+ public static int getFileDescriptor(FileChannel fileChannel) throws HyracksDataException {
+ return FILE_SYSTEM_OPERATOR.getFileDescriptor(fileChannel);
+ }
+
+ public static int getBlockSize(int fileDescriptor) throws HyracksDataException {
+ return FILE_SYSTEM_OPERATOR.getBlockSize(fileDescriptor);
+ }
+
+ public static int punchHole(int fileDescriptor, long offset, long length, int blockSize)
+ throws HyracksDataException {
+ return (int) FILE_SYSTEM_OPERATOR.punchHole(fileDescriptor, offset, length, blockSize);
+ }
+
+ private static IFileSystemOperator createFileSystemOperator() {
+ if (isLinux()) {
+ return new LinuxFileSystemOperator();
+ }
+
+ return new DefaultFileSystemOperator();
+ }
+
+ public static String getOSName() {
+ return System.getProperty("os.name");
+ }
+
+ public static boolean isLinux() {
+ String os = getOSName();
+ return os.contains("linux");
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/IFileSystemOperator.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/IFileSystemOperator.java
new file mode 100644
index 0000000..3b6200e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/IFileSystemOperator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.filesystem;
+
+import java.nio.channels.FileChannel;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An interface for native file system operations
+ */
+interface IFileSystemOperator {
+ /**
+ * Returns the file descriptor of a file
+ *
+ * @param fileChannel opened file channel
+ * @return file descriptor as reported by the OS
+ */
+ int getFileDescriptor(FileChannel fileChannel) throws HyracksDataException;
+
+ /**
+ * Block size of a file (OS dependent)
+ *
+ * @param fileDescriptor of the file
+ * @return block size
+ */
+ int getBlockSize(int fileDescriptor) throws HyracksDataException;
+
+ /**
+ * Punches a hole in a file
+ *
+ * @param fileDescriptor of the file
+ * @param offset starting offset
+ * @param length length
+ * @param blockSize block size
+ * @return length of the hole
+ */
+ long punchHole(int fileDescriptor, long offset, long length, int blockSize) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/LinuxFileSystemOperator.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/LinuxFileSystemOperator.java
new file mode 100644
index 0000000..c8e7ee9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/LinuxFileSystemOperator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.filesystem;
+
+import java.io.FileDescriptor;
+import java.lang.reflect.Field;
+import java.nio.channels.FileChannel;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import jnr.ffi.LibraryLoader;
+import jnr.posix.FileStat;
+import jnr.posix.POSIX;
+import jnr.posix.POSIXFactory;
+
+class LinuxFileSystemOperator implements IFileSystemOperator {
+ /**
+ * Load native library
+ */
+ private static final LinuxNativeLibC libc = LibraryLoader.create(LinuxNativeLibC.class).failImmediately().load("c");
+ /**
+ * Load POSIX
+ */
+ private static final POSIX posix = POSIXFactory.getPOSIX();
+
+ /**
+ * default is extend size
+ */
+ public static final int FALLOC_FL_KEEP_SIZE = 0x01;
+
+ /**
+ * de-allocates range
+ */
+ public static final int FALLOC_FL_PUNCH_HOLE = 0x02;
+
+ @Override
+ public int getFileDescriptor(FileChannel fileChannel) throws HyracksDataException {
+ FileDescriptor fd = getField(fileChannel, "fd", FileDescriptor.class);
+ return getField(fd, "fd", int.class);
+ }
+
+ @Override
+ public int getBlockSize(int fileDescriptor) throws HyracksDataException {
+ FileStat stat = posix.fstat(fileDescriptor);
+ return Math.toIntExact(stat.blockSize());
+ }
+
+ /**
+ * @param fileDescriptor of the file
+ * @param offset starting offset
+ * @param length length
+ * @param blockSize block size
+ * @return length of the hole
+ * @see <a href="https://github.com/apache/ignite/tree/master/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress">Apache Ignite Internal</a>
+ */
+ @Override
+ public long punchHole(int fileDescriptor, long offset, long length, int blockSize) throws HyracksDataException {
+ assert offset >= 0;
+ assert length > 0;
+
+ if (length < blockSize) {
+ return 0;
+ }
+
+ long off = offset;
+ long len = length;
+ // TODO maybe optimize for power of 2
+ if (off % blockSize != 0) {
+ long end = off + len;
+ off = (off / blockSize + 1) * blockSize;
+ len = end - off;
+
+ if (len <= 0) {
+ return 0;
+ }
+ }
+
+ len = len / blockSize * blockSize;
+
+ if (len > 0) {
+ int res = libc.fallocate(fileDescriptor, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, off, len);
+ if (res != 0) {
+ throw new HyracksDataException("error");
+ }
+ }
+
+ return len;
+ }
+
+ private <T> T getField(Object object, String name, Class<T> clazz) throws HyracksDataException {
+ try {
+ Field field = object.getClass().getDeclaredField(name);
+ field.setAccessible(true);
+ return clazz.cast(field.get(field));
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public interface LinuxNativeLibC {
+ /**
+ * Allows the caller to directly manipulate the allocated
+ * disk space for the file referred to by fd for the byte range starting
+ * at {@code off} offset and continuing for {@code len} bytes.
+ *
+ * @param fd file descriptor.
+ * @param mode determines the operation to be performed on the given range.
+ * @param off required position offset.
+ * @param len required length.
+ * @return On success, fallocate() returns zero. On error, -1 is returned and
+ * {@code errno} is set to indicate the error.
+ */
+ int fallocate(int fd, int mode, long off, long len);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java
new file mode 100644
index 0000000..72996b6
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.filesystem;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+@ThreadSafe
+public final class PhysicalDrive implements IPhysicalDrive {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final List<File> drivePaths;
+ private final long pressureSize;
+ private final AtomicBoolean pressured;
+
+ public PhysicalDrive(List<IODeviceHandle> deviceHandles, double pressureThreshold, double storagePercentage,
+ long pressureDebugSize) {
+ drivePaths = getDrivePaths(deviceHandles);
+ pressureSize = getPressureSize(drivePaths, pressureThreshold, storagePercentage, pressureDebugSize);
+ pressured = new AtomicBoolean(getUsedSpace() <= pressureSize);
+ }
+
+ @Override
+ public boolean computeAndCheckIsPressured() {
+ long usedSpace = getUsedSpace();
+ boolean isPressured = usedSpace > pressureSize;
+ pressured.set(isPressured);
+ LOGGER.info("Used space: {}, pressureCapacity: {} (isPressured: {})",
+ StorageUtil.toHumanReadableSize(usedSpace), StorageUtil.toHumanReadableSize(pressureSize), isPressured);
+ return isPressured;
+ }
+
+ @Override
+ public boolean hasSpace() {
+ return !pressured.get();
+ }
+
+ private long getUsedSpace() {
+ long totalUsedSpace = 0;
+ for (int i = 0; i < drivePaths.size(); i++) {
+ File device = drivePaths.get(i);
+ totalUsedSpace += device.getTotalSpace() - device.getFreeSpace();
+ }
+ return totalUsedSpace;
+ }
+
+ private static long getPressureSize(List<File> drivePaths, double pressureThreshold, double storagePercentage,
+ long pressureDebugSize) {
+
+ long totalCapacity = 0;
+ long totalUsedSpace = 0;
+ for (File drive : drivePaths) {
+ totalCapacity += drive.getTotalSpace();
+ totalUsedSpace += drive.getTotalSpace() - drive.getFreeSpace();
+ }
+
+ long allocatedCapacity = (long) (totalCapacity * storagePercentage);
+ long pressureCapacity = pressureDebugSize > 0 ? totalUsedSpace + pressureDebugSize
+ : (long) (allocatedCapacity * pressureThreshold);
+
+ LOGGER.info(
+ "PhysicalDrive configured with diskCapacity: {}, allocatedCapacity: {}, and pressureCapacity: {} (used space: {})",
+ StorageUtil.toHumanReadableSize(totalCapacity), StorageUtil.toHumanReadableSize(allocatedCapacity),
+ StorageUtil.toHumanReadableSize(pressureCapacity), StorageUtil.toHumanReadableSize(totalUsedSpace));
+
+ return pressureCapacity;
+ }
+
+ private static List<File> getDrivePaths(List<IODeviceHandle> deviceHandles) {
+ File[] roots = File.listRoots();
+ Set<File> distinctUsedRoots = new HashSet<>();
+ for (IODeviceHandle handle : deviceHandles) {
+ File handlePath = handle.getMount();
+ for (File root : roots) {
+ if (handlePath.getAbsolutePath().startsWith(root.getAbsolutePath())
+ && !distinctUsedRoots.contains(root)) {
+ distinctUsedRoots.add(root);
+ break;
+ }
+ }
+ }
+ return new ArrayList<>(distinctUsedRoots);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
new file mode 100644
index 0000000..430aac7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+
+/**
+ * Certain operations needed to be provided by {@link org.apache.hyracks.api.io.IIOManager} to support cloud
+ * file operations in a cloud deployment.
+ */
+public interface ICloudIOManager {
+ /**
+ * Read from the cloud
+ *
+ * @param fHandle file handle
+ * @param offset starting offset
+ * @param data buffer to read to
+ */
+ void cloudRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
+
+ /**
+ * Read from the cloud
+ *
+ * @param fHandle file handle
+ * @param offset starting offset
+ * @return input stream of the required data
+ */
+ InputStream cloudRead(IFileHandle fHandle, long offset, long length);
+
+ /**
+ * Write to local drive only
+ *
+ * @param fHandle file handle
+ * @param offset starting offset
+ * @param data to write
+ */
+
+ int localWriter(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
+
+ /**
+ * Write to cloud only
+ *
+ * @param fHandle file handle
+ * @param offset starting offset
+ * @param data to write
+ * @return number of written bytes
+ */
+ int cloudWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
+
+ /**
+ * Write to cloud only
+ *
+ * @param fHandle file handle
+ * @param offset starting offset
+ * @param data to write
+ * @return number of written bytes
+ */
+ long cloudWrite(IFileHandle fHandle, long offset, ByteBuffer[] data) throws HyracksDataException;
+
+ /**
+ * Punch a hole in a file
+ *
+ * @param fHandle file handle
+ * @param offset starting offset
+ * @param length length
+ */
+ int punchHole(IFileHandle fHandle, long offset, long length) throws HyracksDataException;
+
+ /**
+ * Evict a directory from the local disk cache
+ *
+ * @param directory to evict
+ */
+ void evict(FileReference directory) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
new file mode 100644
index 0000000..d064fda
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.sweeper;
+
+import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+
+/**
+ * Sweeps an index to relieve the pressure on a local {@link IPhysicalDrive}
+ */
+@FunctionalInterface
+public interface ISweeper {
+ /**
+ * Sweep an index
+ *
+ * @param indexUnit to sweep
+ */
+ void sweep(IndexUnit indexUnit) throws InterruptedException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
new file mode 100644
index 0000000..0410e3b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.sweeper;
+
+import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+
+public final class NoOpSweeper implements ISweeper {
+ public static final ISweeper INSTANCE = new NoOpSweeper();
+
+ private NoOpSweeper() {
+ }
+
+ @Override
+ public void sweep(IndexUnit indexUnit) {
+ // NoOp
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
new file mode 100644
index 0000000..773b307
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.sweeper;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
+import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.disk.ISweepContext;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+public final class SweepContext implements ISweepContext {
+ private final ICloudIOManager cloudIOManager;
+ private final BufferCache bufferCache;
+ private final Map<Integer, BufferedFileHandle> fileInfoMap;
+ private final AtomicBoolean shutdown;
+ private IndexUnit indexUnit;
+ private BufferedFileHandle handle;
+
+ public SweepContext(ICloudIOManager cloudIOManager, BufferCache bufferCache,
+ Map<Integer, BufferedFileHandle> fileInfoMap, AtomicBoolean shutdown) {
+ this.cloudIOManager = cloudIOManager;
+ this.bufferCache = bufferCache;
+ this.fileInfoMap = fileInfoMap;
+ this.shutdown = shutdown;
+ }
+
+ @Override
+ public void open(int fileId) throws HyracksDataException {
+ close();
+ bufferCache.openFile(fileId);
+ synchronized (fileInfoMap) {
+ this.handle = fileInfoMap.get(fileId);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (handle != null) {
+ bufferCache.closeFile(handle.getFileId());
+ handle = null;
+ }
+ }
+
+ @Override
+ public CloudCachedPage pin(long dpid, IBufferCacheReadContext bcOpCtx) throws HyracksDataException {
+ return (CloudCachedPage) bufferCache.pin(dpid, bcOpCtx);
+ }
+
+ @Override
+ public void unpin(ICachedPage page, IBufferCacheReadContext bcOpCtx) throws HyracksDataException {
+ bufferCache.unpin(page, bcOpCtx);
+ }
+
+ public void setIndexUnit(IndexUnit indexUnit) {
+ this.indexUnit = indexUnit;
+ }
+
+ public IndexUnit getIndexUnit() {
+ return indexUnit;
+ }
+
+ public int punchHole(int startPageId, int numberOfPage) throws HyracksDataException {
+ long offset = handle.getStartPageOffset(startPageId);
+ long length = handle.getPagesTotalSize(startPageId, numberOfPage);
+ return cloudIOManager.punchHole(handle.getFileHandle(), offset, length);
+ }
+
+ /**
+ * Whether the sweep operation should stop or proceed
+ * Stopping condition:
+ * 1- Either the sweeper thread is shutting down
+ * 2- OR the index was dropped
+ *
+ * @return true if it should stop, false otherwise
+ */
+ public boolean stopSweeping() {
+ return shutdown.get() || indexUnit.isDropped();
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
new file mode 100644
index 0000000..36c4a12
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.sweeper;
+
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.cloud.IIndexDiskCacheManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class Sweeper implements ISweeper {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final SweepRequest POISON = new SweepRequest();
+ private final BlockingQueue<SweepRequest> requests;
+ private final BlockingQueue<SweepRequest> freeRequests;
+ private final AtomicBoolean shutdown;
+
+ public Sweeper(ExecutorService executor, ICloudIOManager cloudIOManager, BufferCache bufferCache,
+ Map<Integer, BufferedFileHandle> fileInfoMap, int numOfSweepThreads, int queueSize) {
+ requests = new ArrayBlockingQueue<>(queueSize);
+ freeRequests = new ArrayBlockingQueue<>(queueSize);
+ shutdown = new AtomicBoolean(false);
+ for (int i = 0; i < queueSize; i++) {
+ freeRequests.add(new SweepRequest(new SweepContext(cloudIOManager, bufferCache, fileInfoMap, shutdown)));
+ }
+ for (int i = 0; i < numOfSweepThreads; i++) {
+ executor.execute(new SweepThread(requests, freeRequests, i));
+ }
+ }
+
+ @Override
+ public void sweep(IndexUnit indexUnit) throws InterruptedException {
+ SweepRequest request = freeRequests.take();
+ request.reset(indexUnit);
+ requests.put(request);
+ }
+
+ public void shutdown() {
+ shutdown.set(true);
+ requests.clear();
+ freeRequests.clear();
+ requests.offer(POISON);
+ // TODO wait for threads to terminate
+ }
+
+ private static class SweepThread implements Runnable {
+ private final BlockingQueue<SweepRequest> requests;
+ private final BlockingQueue<SweepRequest> freeRequests;
+ private final int threadNumber;
+
+ private SweepThread(BlockingQueue<SweepRequest> requests, BlockingQueue<SweepRequest> freeRequests,
+ int threadNumber) {
+ this.requests = requests;
+ this.freeRequests = freeRequests;
+ this.threadNumber = threadNumber;
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName(getClass().getSimpleName() + "-" + threadNumber);
+ while (true) {
+ SweepRequest request = null;
+ try {
+ request = requests.take();
+ if (isPoison(request)) {
+ break;
+ }
+ request.handle();
+ } catch (InterruptedException e) {
+ LOGGER.warn("Ignoring interrupt. Sweep threads should never be interrupted.");
+ } catch (Throwable t) {
+ LOGGER.error("Sweep failed", t);
+ } finally {
+ if (request != null && request != POISON) {
+ freeRequests.add(request);
+ }
+ }
+
+ }
+ }
+
+ private boolean isPoison(SweepRequest request) {
+ if (request == POISON) {
+ LOGGER.info("Exiting");
+ InvokeUtil.doUninterruptibly(() -> requests.put(POISON));
+ if (Thread.interrupted()) {
+ LOGGER.error("Ignoring interrupt. Sweep threads should never be interrupted.");
+ }
+ return true;
+ }
+
+ return false;
+ }
+ }
+
+ private static class SweepRequest {
+ private final SweepContext context;
+
+ SweepRequest() {
+ this(null);
+ }
+
+ SweepRequest(SweepContext context) {
+ this.context = context;
+ }
+
+ void reset(IndexUnit indexUnit) {
+ context.setIndexUnit(indexUnit);
+ }
+
+ void handle() throws HyracksDataException {
+ if (context.stopSweeping()) {
+ /*
+ * This could happen as the sweeper gets a copy of a list of all indexUnits at a certain point
+ * of time. However, after acquiring the list and the start of handling this sweep request, we found
+ * that the index of this request was dropped.
+ *
+ * To illustrate:
+ * 1- The Sweeper got a list of all indexUnits (say index_1 and index_2)
+ * 2a- The Sweeper started sweeping index_1
+ * 2b- index_2 was dropped
+ * 3- The sweeper finished sweeping index_1 and started sweeping index_2. However, index_2 was
+ * dropped at 2b.
+ */
+ return;
+ }
+ IndexUnit indexUnit = context.getIndexUnit();
+ indexUnit.startSweeping();
+ try {
+ ILSMIndex index = indexUnit.getIndex();
+ IIndexDiskCacheManager diskCacheManager = index.getDiskCacheManager();
+ if (!diskCacheManager.isActive()) {
+ return;
+ }
+
+ if (diskCacheManager.isSweepRequirePlanning()) {
+ // Manager require planning
+ diskCacheManager.prepareSweepPlan();
+ }
+ // Currently, we always sweep.
+ // But we in the future we can only do a planning and sweep with another request
+ diskCacheManager.sweep(context);
+ } finally {
+ indexUnit.finishedSweeping();
+ }
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/test/java/org/apache/hyracks/cloud/filesystem/PhysicalDriveTest.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/test/java/org/apache/hyracks/cloud/filesystem/PhysicalDriveTest.java
new file mode 100644
index 0000000..9e9c782
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/test/java/org/apache/hyracks/cloud/filesystem/PhysicalDriveTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.filesystem;
+
+import java.io.File;
+
+import javax.swing.filechooser.FileSystemView;
+
+import org.junit.Test;
+
+public class PhysicalDriveTest {
+
+ @Test
+ public void test() {
+ FileSystemView fileSystemView = FileSystemView.getFileSystemView();
+ File[] roots = File.listRoots();
+ File file = new File(".");
+
+ for (File root : roots) {
+ if (file.getAbsolutePath().startsWith(root.getAbsolutePath())) {
+ System.err.println(root.getAbsolutePath() + " is drive: " + fileSystemView.getSystemDisplayName(root));
+ }
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
index 69fc6d1..bdac239 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
@@ -31,6 +31,7 @@
import org.apache.hyracks.storage.common.IResourceLifecycleManager;
import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator;
import org.apache.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
@@ -44,7 +45,9 @@
import org.apache.hyracks.storage.common.file.ResourceIdFactory;
import org.apache.hyracks.storage.common.file.ResourceIdFactoryProvider;
import org.apache.hyracks.storage.common.file.TransientLocalResourceRepositoryFactory;
+import org.apache.hyracks.util.annotations.TestOnly;
+@TestOnly
public class RuntimeContext {
private final IIOManager ioManager;
private final IBufferCache bufferCache;
@@ -56,7 +59,8 @@
public RuntimeContext(INCServiceContext appCtx) throws HyracksDataException {
fileMapManager = new FileMapManager();
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
- IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, 32768, 50);
+ IPageReplacementStrategy prs =
+ new ClockPageReplacementStrategy(allocator, DefaultDiskCachedPageAllocator.INSTANCE, 32768, 50);
ThreadFactory threadFactory = Thread::new;
this.ioManager = appCtx.getIoManager();
bufferCache = new BufferCache(ioManager, prs, new DelayPageCleanerPolicy(1000), fileMapManager, 100, 10,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
index 932ad6c..ddd8dfd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
@@ -32,10 +32,12 @@
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.TestOnly;
/**
* @deprecated This class must not be used. Instead, use {@link AppendOnlyLinkedMetadataPageManager}
*/
+@TestOnly
@Deprecated
public class LinkedMetaDataPageManager implements IMetadataPageManager {
private final IBufferCache bufferCache;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 4756921..0869fed 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.cloud.IIndexDiskCacheManager;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMHarness;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -99,12 +100,8 @@
* Create a flush operation.
* This is an atomic operation. If an exception is thrown, no partial effect is left
*
+ * @param ctx the operation context
* @return the flush operation
- *
- * @param ctx
- * the operation context
- * @param callback
- * the IO callback
* @throws HyracksDataException
*/
ILSMIOOperation createFlushOperation(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -115,10 +112,7 @@
* Create a merge operation.
* This is an atomic operation. If an exception is thrown, no partial effect is left
*
- * @param ctx
- * the operation context
- * @param callback
- * the IO callback
+ * @param ctx the operation context
* @throws HyracksDataException
*/
ILSMIOOperation createMergeOperation(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -141,8 +135,7 @@
/**
* Populates the context's component holder with a snapshot of the components involved in the operation.
*
- * @param ctx
- * - the operation's context
+ * @param ctx - the operation's context
* @throws HyracksDataException
*/
void getOperationalComponents(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -231,4 +224,9 @@
*/
void resetCurrentComponentIndex();
+ /**
+ * @return disk cache manager
+ */
+ IIndexDiskCacheManager getDiskCacheManager();
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/DefaultIndexDiskCacheManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/DefaultIndexDiskCacheManager.java
new file mode 100644
index 0000000..fd0c983
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/DefaultIndexDiskCacheManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.cloud;
+
+import org.apache.hyracks.storage.common.disk.ISweepContext;
+
+public final class DefaultIndexDiskCacheManager implements IIndexDiskCacheManager {
+ public static final IIndexDiskCacheManager INSTANCE = new DefaultIndexDiskCacheManager();
+ private static final String NOT_SWEEPABLE_ERR_MSG = "Index is not sweepable";
+
+ @Override
+ public boolean isActive() {
+ return false;
+ }
+
+ @Override
+ public boolean isSweepable() {
+ return false;
+ }
+
+ @Override
+ public boolean isSweepRequirePlanning() {
+ throw new IllegalStateException(NOT_SWEEPABLE_ERR_MSG);
+ }
+
+ @Override
+ public void prepareSweepPlan() {
+ throw new IllegalStateException(NOT_SWEEPABLE_ERR_MSG);
+ }
+
+ @Override
+ public long sweep(ISweepContext context) {
+ throw new IllegalStateException(NOT_SWEEPABLE_ERR_MSG);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/IIndexDiskCacheManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/IIndexDiskCacheManager.java
new file mode 100644
index 0000000..ee393af
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/cloud/IIndexDiskCacheManager.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.cloud;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.ISweepContext;
+
+/**
+ * Disk cache manager for an index
+ */
+public interface IIndexDiskCacheManager {
+ /**
+ * @return the manager is active
+ * @see ILSMIndex#activate()
+ */
+ boolean isActive();
+
+ /**
+ * @return whether an index can be swept to make space in {@link IPhysicalDrive}
+ */
+ boolean isSweepable();
+
+ /**
+ * @return whether a sweep operation requires planning
+ */
+ boolean isSweepRequirePlanning();
+
+ /**
+ * Prepare a sweep plan
+ */
+ void prepareSweepPlan();
+
+ /**
+ * Sweep an index to make space in {@link IPhysicalDrive}
+ *
+ * @param context sweep context
+ * @return freed space in bytes
+ */
+ long sweep(ISweepContext context) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index dfce00f..0534754 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -68,6 +68,8 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMPageWriteCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.cloud.DefaultIndexDiskCacheManager;
+import org.apache.hyracks.storage.am.lsm.common.cloud.IIndexDiskCacheManager;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.IIndexCursor;
@@ -957,4 +959,8 @@
return mergePolicy;
}
+ @Override
+ public IIndexDiskCacheManager getDiskCacheManager() {
+ return DefaultIndexDiskCacheManager.INSTANCE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 0c442a7..a559a8c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -297,13 +297,20 @@
protected FileReference getCompressedFileReferenceIfAny(String name) {
final ICompressorDecompressor compDecomp = compressorDecompressorFactory.createInstance();
+ FileReference treeFileRef;
//Avoid creating LAF file for NoOpCompressorDecompressor
if (compDecomp != NoOpCompressorDecompressor.INSTANCE && isCompressible(name)) {
final String path = baseDir.getChildPath(name);
- return new CompressedFileReference(baseDir.getDeviceHandle(), compDecomp, path, path + LAF_SUFFIX);
+ treeFileRef = new CompressedFileReference(baseDir.getDeviceHandle(), compDecomp, path, path + LAF_SUFFIX);
+ } else {
+ treeFileRef = baseDir.getChild(name);
}
- return baseDir.getChild(name);
+ if (areHolesAllowed()) {
+ treeFileRef.setHolesAllowed();
+ }
+
+ return treeFileRef;
}
protected void cleanLookAsideFiles(Set<String> groundTruth, IBufferCache bufferCache) throws HyracksDataException {
@@ -321,6 +328,10 @@
}
}
+ protected boolean areHolesAllowed() {
+ return false;
+ }
+
private boolean isCompressible(String fileName) {
return !fileName.endsWith(BLOOM_FILTER_SUFFIX) && !fileName.endsWith(DELETE_TREE_SUFFIX);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
index 1bceea3..1091040 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
@@ -27,6 +27,8 @@
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheWriteContext;
import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
import org.apache.hyracks.util.annotations.NotThreadSafe;
@@ -65,42 +67,37 @@
/**
* Read the CachedPage from disk
*
- * @param cPage
- * CachedPage in {@link BufferCache}
- * @throws HyracksDataException
+ * @param cPage CachedPage in {@link BufferCache}
+ * @param context read context
*/
- public abstract void read(CachedPage cPage) throws HyracksDataException;
+ public abstract void read(CachedPage cPage, IBufferCacheReadContext context) throws HyracksDataException;
/**
* Write the CachedPage into disk
*
- * @param cPage
- * CachedPage in {@link BufferCache}
- * @throws HyracksDataException
+ * @param cPage CachedPage in {@link BufferCache}
+ * @param context write context
*/
- public void write(CachedPage cPage) throws HyracksDataException {
+ public void write(CachedPage cPage, IBufferCacheWriteContext context) throws HyracksDataException {
final int totalPages = cPage.getFrameSizeMultiplier();
final int extraBlockPageId = cPage.getExtraBlockPageId();
final BufferCacheHeaderHelper header = checkoutHeaderHelper();
- write(cPage, header, totalPages, extraBlockPageId);
+ write(cPage, header, totalPages, extraBlockPageId, context);
}
/**
- * Write the CachedPage into disk called by {@link AbstractBufferedFileIOManager#write(CachedPage)}
+ * Write the CachedPage into disk called by
+ * {@link AbstractBufferedFileIOManager#write(CachedPage, IBufferCacheWriteContext)}
* Note: It is the responsibility of the caller to return {@link BufferCacheHeaderHelper}
*
- * @param cPage
- * CachedPage that will be written
- * @param header
- * HeaderHelper to add into the written page
- * @param totalPages
- * Number of pages to be written
- * @param extraBlockPageId
- * Extra page ID in case it has more than one page
- * @throws HyracksDataException
+ * @param cPage CachedPage that will be written
+ * @param header HeaderHelper to add into the written page
+ * @param totalPages Number of pages to be written
+ * @param extraBlockPageId Extra page ID in case it has more than one page
+ * @param context write context
*/
protected abstract void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages,
- int extraBlockPageId) throws HyracksDataException;
+ int extraBlockPageId, IBufferCacheWriteContext context) throws HyracksDataException;
/* ********************************
* File operations' methods
@@ -133,11 +130,14 @@
ioManager.close(fileHandle);
}
+ public IFileHandle getFileHandle() {
+ return fileHandle;
+ }
+
/**
* Force the file into disk
*
- * @param metadata
- * see {@link java.nio.channels.FileChannel#force(boolean)}
+ * @param metadata see {@link java.nio.channels.FileChannel#force(boolean)}
* @throws HyracksDataException
*/
public void force(boolean metadata) throws HyracksDataException {
@@ -145,6 +145,14 @@
}
/**
+ * Return start offset of a page
+ *
+ * @param pageId page ID
+ * @return offset
+ */
+ public abstract long getStartPageOffset(int pageId) throws HyracksDataException;
+
+ /**
* Get the number of pages in the file
*
* @throws HyracksDataException
@@ -158,8 +166,7 @@
/**
* Check whether the file has been deleted
*
- * @return
- * true if has been deleted, false o.w
+ * @return true if has been deleted, false o.w
*/
public boolean hasBeenDeleted() {
return fileHandle == null;
@@ -168,8 +175,7 @@
/**
* Check whether the file has ever been opened
*
- * @return
- * true if has ever been opened, false otherwise
+ * @return true if has ever been opened, false otherwise
*/
public final boolean hasBeenOpened() {
return hasOpen;
@@ -235,6 +241,15 @@
public abstract ICompressedPageWriter getCompressedPageWriter();
+ /**
+ * Compute the total size of pages
+ *
+ * @param startPageId page ID to start from
+ * @param numberOfPages the number of pages
+ * @return total size of pages in bytes
+ */
+ public abstract long getPagesTotalSize(int startPageId, int numberOfPages) throws HyracksDataException;
+
/* ********************************
* Common helper methods
* ********************************
@@ -243,20 +258,16 @@
/**
* Get the offset for the first page
*
- * @param cPage
- * CachedPage for which the offset is needed
- * @return
- * page offset in the file
+ * @param cPage CachedPage for which the offset is needed
+ * @return page offset in the file
*/
protected abstract long getFirstPageOffset(CachedPage cPage);
/**
* Get the offset for the extra page
*
- * @param cPage
- * CachedPage for which the offset is needed
- * @return
- * page offset in the file
+ * @param cPage CachedPage for which the offset is needed
+ * @return page offset in the file
*/
protected abstract long getExtraPageOffset(CachedPage cPage);
@@ -276,11 +287,11 @@
return ioManager.syncRead(fileHandle, offset, buf);
}
- protected final long writeToFile(ByteBuffer buf, long offset) throws HyracksDataException {
+ protected final long writeExtraToFile(ByteBuffer buf, long offset) throws HyracksDataException {
return ioManager.doSyncWrite(fileHandle, offset, buf);
}
- protected final long writeToFile(ByteBuffer[] buf, long offset) throws HyracksDataException {
+ protected final long writeExtraToFile(ByteBuffer[] buf, long offset) throws HyracksDataException {
return ioManager.doSyncWrite(fileHandle, offset, buf);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 1660b7e..5d25685 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -568,7 +568,7 @@
private void read(CachedPage cPage, IBufferCacheReadContext context) throws HyracksDataException {
BufferedFileHandle fInfo = getFileHandle(cPage);
cPage.buffer.clear();
- fInfo.read(cPage);
+ fInfo.read(cPage, context);
final IThreadStats threadStats = statsSubscribers.get(Thread.currentThread());
if (threadStats != null && context.incrementStats()) {
threadStats.coldRead();
@@ -588,7 +588,7 @@
if (fInfo.hasBeenDeleted()) {
return;
}
- fInfo.write(cPage);
+ fInfo.write(cPage, context);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java
index a105e60..8f9054a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java
@@ -23,17 +23,19 @@
import java.nio.ByteBuffer;
import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.control.nc.io.IOManager;
public class BufferCacheHeaderHelper {
public static final int FRAME_MULTIPLIER_OFF = 0;
public static final int EXTRA_BLOCK_PAGE_ID_OFF = FRAME_MULTIPLIER_OFF + 4; // 4
private final ByteBuffer[] array;
- private final int pageSizeWithHeader;
private ByteBuffer buf;
public BufferCacheHeaderHelper(int pageSize) {
- this.pageSizeWithHeader = RESERVED_HEADER_BYTES + pageSize;
+ int pageSizeWithHeader = RESERVED_HEADER_BYTES + pageSize;
buf = ByteBuffer.allocate(pageSizeWithHeader);
array = new ByteBuffer[] { buf, null };
}
@@ -54,34 +56,47 @@
return buf;
}
+ public int writeToFile(IOManager ioManager, IFileHandle handle, ByteBuffer buffer, long offset)
+ throws HyracksDataException {
+ // TODO include CRC32?
+ return ioManager.doSyncWrite(handle, offset, buffer);
+ }
+
+ public long writeToFile(IOManager ioManager, IFileHandle handle, ByteBuffer[] buffers, long offset)
+ throws HyracksDataException {
+ // TODO include CRC32?
+ return ioManager.doSyncWrite(handle, offset, buffers);
+ }
+
+ public int readFromFile(IOManager ioManager, IFileHandle handle, long offset, int size)
+ throws HyracksDataException {
+ buf.position(0);
+ buf.limit(size);
+ // TODO check whether the CRC32 is valid or not
+ return ioManager.syncRead(handle, offset, buf);
+ }
+
public ByteBuffer prepareRead(int size) {
buf.position(0);
buf.limit(size);
return buf;
}
- public ByteBuffer processHeader(CachedPage cPage) {
- cPage.setFrameSizeMultiplier(buf.getInt(FRAME_MULTIPLIER_OFF));
- cPage.setExtraBlockPageId(buf.getInt(EXTRA_BLOCK_PAGE_ID_OFF));
- buf.position(RESERVED_HEADER_BYTES);
- return buf;
- }
-
public ByteBuffer getBuffer() {
return buf;
}
private void setPageInfo(CachedPage cPage) {
buf.putInt(FRAME_MULTIPLIER_OFF, cPage.getFrameSizeMultiplier());
+ // TODO EXTRA_BLOCK_PAGE_ID_OFF is always going to be the following page, use it for CRC32 instead?
buf.putInt(EXTRA_BLOCK_PAGE_ID_OFF, cPage.getExtraBlockPageId());
}
/**
- * {@link ICompressorDecompressor#compress(byte[], int, int, byte[], int)} may require additional
- * space to do the compression. see {@link ICompressorDecompressor#computeCompressedBufferSize(int)}.
+ * {@link ICompressorDecompressor} may require additional space to do the compression.
*
- * @param compressor
- * @param size
+ * @param size required size
+ * @see ICompressorDecompressor#computeCompressedBufferSize(int)
*/
private void ensureBufferCapacity(int size) {
final int requiredSize = size + RESERVED_HEADER_BYTES;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index 1d4c789..6fd18ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -31,12 +31,12 @@
ByteBuffer buffer;
public final AtomicInteger pinCount;
final AtomicBoolean dirty;
- final ReentrantReadWriteLock latch;
+ protected final ReentrantReadWriteLock latch;
private final Object replacementStrategyObject;
private final IPageReplacementStrategy pageReplacementStrategy;
volatile long dpid; // disk page id (composed of file id and page id)
CachedPage next;
- volatile boolean valid;
+ protected volatile boolean valid;
final AtomicBoolean confiscated;
private int multiplier;
private int extraBlockPageId;
@@ -190,6 +190,7 @@
return multiplier > 1;
}
+ @Override
public void setCompressedPageOffset(long offset) {
this.compressedOffset = offset;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java
index 87a15d3..7707ac8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java
@@ -32,16 +32,19 @@
private static final int MAX_UNSUCCESSFUL_CYCLE_COUNT = 3;
private IBufferCacheInternal bufferCache;
- private AtomicInteger clockPtr;
- private ICacheMemoryAllocator allocator;
- private AtomicInteger numPages;
- private AtomicInteger cpIdCounter;
+ private final AtomicInteger clockPtr;
+ private final ICacheMemoryAllocator allocator;
+ private final AtomicInteger numPages;
+ private final AtomicInteger cpIdCounter;
+ private final IDiskCachedPageAllocator pageAllocator;
private final int pageSize;
private final int maxAllowedNumPages;
private final ConcurrentLinkedQueue<Integer> cpIdFreeList;
- public ClockPageReplacementStrategy(ICacheMemoryAllocator allocator, int pageSize, int maxAllowedNumPages) {
+ public ClockPageReplacementStrategy(ICacheMemoryAllocator allocator, IDiskCachedPageAllocator pageAllocator,
+ int pageSize, int maxAllowedNumPages) {
this.allocator = allocator;
+ this.pageAllocator = pageAllocator;
this.pageSize = pageSize;
this.maxAllowedNumPages = maxAllowedNumPages;
this.clockPtr = new AtomicInteger(0);
@@ -154,7 +157,7 @@
if (cpId == null) {
cpId = cpIdCounter.getAndIncrement();
}
- CachedPage cPage = new CachedPage(cpId, allocator.allocate(pageSize * multiplier, 1)[0], this);
+ CachedPage cPage = pageAllocator.allocate(cpId, allocator.allocate(pageSize * multiplier, 1)[0], this);
cPage.setFrameSizeMultiplier(multiplier);
bufferCache.addPage(cPage);
numPages.getAndAdd(multiplier);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/DefaultBufferCacheReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/DefaultBufferCacheReadContext.java
index 2a67a06..b44ae25 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/DefaultBufferCacheReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/DefaultBufferCacheReadContext.java
@@ -18,7 +18,7 @@
*/
package org.apache.hyracks.storage.common.buffercache.context.page;
-public final class DefaultBufferCacheReadContext extends AbstractBufferCacheReadContext {
+final class DefaultBufferCacheReadContext extends AbstractBufferCacheReadContext {
@Override
public boolean isNewPage() {
return false;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/IBufferCacheReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/IBufferCacheReadContext.java
index 216ebfb..f63ab76 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/IBufferCacheReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/IBufferCacheReadContext.java
@@ -38,7 +38,7 @@
*
* @param page that will be pinned
*/
- void onPin(ICachedPage page);
+ void onPin(ICachedPage page) throws HyracksDataException;
/**
* Signals that a page will be unpinned
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/IBufferCacheWriteContext.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/IBufferCacheWriteContext.java
index 7afb4ce..9b3761a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/IBufferCacheWriteContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/page/IBufferCacheWriteContext.java
@@ -23,9 +23,33 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IFIFOPageWriter;
+/**
+ * This context allows for providing different implementations on {@link IFIFOPageWriter#write(ICachedPage)}
+ */
public interface IBufferCacheWriteContext {
+ /**
+ * Write to a file managed by {@link IBufferCache}
+ *
+ * @param ioManager ioManager
+ * @param handle file handle
+ * @param offset starting offset
+ * @param data to be written
+ * @return number of written bytes
+ */
int write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer data) throws HyracksDataException;
+ /**
+ * Write to a file managed by {@link IBufferCache}
+ *
+ * @param ioManager ioManager
+ * @param handle file handle
+ * @param offset starting offset
+ * @param data to be written
+ * @return number of written bytes
+ */
long write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer[] data) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/DummyPhysicalDrive.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/DummyPhysicalDrive.java
new file mode 100644
index 0000000..af4162b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/DummyPhysicalDrive.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk;
+
+public final class DummyPhysicalDrive implements IPhysicalDrive {
+ public static final IPhysicalDrive INSTANCE = new DummyPhysicalDrive();
+
+ private DummyPhysicalDrive() {
+ }
+
+ @Override
+ public boolean computeAndCheckIsPressured() {
+ return false;
+ }
+
+ @Override
+ public boolean hasSpace() {
+ return true;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
new file mode 100644
index 0000000..5fb7088
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk;
+
+/**
+ * Disk cache monitoring service is responsible for monitor the local drives
+ */
+public interface IDiskCacheMonitoringService {
+ /**
+ * Start the monitoring service
+ */
+ void start();
+
+ /**
+ * Stop the monitoring service
+ */
+ void stop();
+
+ /**
+ * @return whether the monitoring service is enabled
+ */
+ boolean isEnabled();
+
+ /**
+ * @return physical drive
+ */
+ IPhysicalDrive getPhysicalDrive();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
new file mode 100644
index 0000000..fc4645d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk;
+
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.LocalResource;
+
+/**
+ * A proxy to notify a disk-cache (a faster disk that is caching a slower resource) about resource lifecycle events.
+ * The notifier could block a resource from being operated on if the disk-cache manager denying access to a resource
+ * TODO Do we care about dataset?
+ */
+public interface IDiskResourceCacheLockNotifier {
+ /**
+ * Notify registering a new resource
+ * Note: this method is not thread-safe outside {@link org.apache.hyracks.storage.common.IResourceLifecycleManager}
+ *
+ * @param datasetId dataset ID
+ * @param localResource resource to be registered
+ * @param index of the resource
+ */
+ void onRegister(int datasetId, LocalResource localResource, IIndex index);
+
+ /**
+ * Notify unregistering an existing resource
+ * Note: this method is not thread-safe outside {@link org.apache.hyracks.storage.common.IResourceLifecycleManager}
+ *
+ * @param datasetId dataset ID
+ * @param resourceId resource ID
+ */
+ void onUnregister(int datasetId, long resourceId);
+
+ /**
+ * Notify opening a resource
+ *
+ * @param datasetId dataset ID
+ * @param resourceId resource ID
+ */
+ void onOpen(int datasetId, long resourceId);
+
+ /**
+ * Notify closing a resource
+ *
+ * @param datasetId dataset ID
+ * @param resourceId resource ID
+ */
+ void onClose(int datasetId, long resourceId);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IPhysicalDrive.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IPhysicalDrive.java
new file mode 100644
index 0000000..d2aaec1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IPhysicalDrive.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk;
+
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+/**
+ * Monitors the pressure of the local drive
+ */
+@ThreadSafe
+public interface IPhysicalDrive {
+ /**
+ * Computes the actual pressure on the drive
+ *
+ * @return true if pressured, false otherwise
+ */
+ boolean computeAndCheckIsPressured();
+
+ /**
+ * @return if drive is still has free space (i.e., not pressured)
+ */
+ boolean hasSpace();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/ISweepContext.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/ISweepContext.java
new file mode 100644
index 0000000..1646116
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/ISweepContext.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheReadContext;
+
+/**
+ * Provides the necessary {@link IBufferCache} functionalities for a sweep operation
+ */
+public interface ISweepContext {
+
+ /**
+ * Open a file for sweeping
+ *
+ * @param fileId to open
+ */
+ void open(int fileId) throws HyracksDataException;
+
+ /**
+ * Close the opened file
+ */
+ void close() throws HyracksDataException;
+
+ /**
+ * Pin a page
+ *
+ * @param dpid page unique ID
+ * @param bcOpCtx read context
+ * @return pinned page
+ */
+ ICachedPage pin(long dpid, IBufferCacheReadContext bcOpCtx) throws HyracksDataException;
+
+ /**
+ * Unpin a page
+ *
+ * @param page to unpin
+ * @param bcOpCtx read context
+ */
+ void unpin(ICachedPage page, IBufferCacheReadContext bcOpCtx) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
new file mode 100644
index 0000000..9a7a5d6
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk;
+
+public final class NoOpDiskCacheMonitoringService implements IDiskCacheMonitoringService {
+ public static final IDiskCacheMonitoringService INSTANCE = new NoOpDiskCacheMonitoringService();
+
+ private NoOpDiskCacheMonitoringService() {
+ }
+
+ @Override
+ public void start() {
+ // NoOp
+ }
+
+ @Override
+ public void stop() {
+ // NoOp
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return false;
+ }
+
+ @Override
+ public IPhysicalDrive getPhysicalDrive() {
+ return DummyPhysicalDrive.INSTANCE;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
new file mode 100644
index 0000000..3c0d6c9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk;
+
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.LocalResource;
+
+public final class NoOpDiskResourceCacheLockNotifier implements IDiskResourceCacheLockNotifier {
+ public static final IDiskResourceCacheLockNotifier INSTANCE = new NoOpDiskResourceCacheLockNotifier();
+
+ private NoOpDiskResourceCacheLockNotifier() {
+ }
+
+ @Override
+ public void onRegister(int datasetId, LocalResource localResource, IIndex index) {
+ // NoOp
+ }
+
+ @Override
+ public void onUnregister(int datasetId, long resourceId) {
+ // NoOp
+ }
+
+ @Override
+ public void onOpen(int datasetId, long resourceId) {
+ // NoOp
+ }
+
+ @Override
+ public void onClose(int datasetId, long resourceId) {
+ // NoOp
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/AbstractPrefetchRequest.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/AbstractPrefetchRequest.java
new file mode 100644
index 0000000..e9cef95
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/AbstractPrefetchRequest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk.prefetch;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractPrefetchRequest implements Runnable {
+ private final ReentrantLock lock;
+ private volatile PrefetchState state;
+
+ protected AbstractPrefetchRequest() {
+ lock = new ReentrantLock();
+ state = PrefetchState.QUEUED;
+ }
+
+ public final void reset() {
+ state = PrefetchState.QUEUED;
+ }
+
+ @Override
+ public final void run() {
+ try {
+ if (state == PrefetchState.QUEUED) {
+ // Acquire a lock so the prefetching op. won't get aborted
+ lock.lock();
+ doPrefetch();
+ state = PrefetchState.FINISHED;
+ }
+ } catch (HyracksDataException e) {
+ onException(e);
+ state = PrefetchState.FAILED;
+ } catch (Throwable e) {
+ onFailure(e);
+ state = PrefetchState.FAILED;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @return true if aborted or failed, false otherwise
+ */
+ public final boolean waitOrAbort() {
+ // Acquire lock to ensure the prefetching op. is not running
+ lock.lock();
+ try {
+ switch (state) {
+ case QUEUED:
+ // The request is still queued, abort
+ state = PrefetchState.ABORTED;
+ case FAILED:
+ // The request failed
+ return true;
+ default:
+ // The request finished
+ return false;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ protected abstract void doPrefetch() throws HyracksDataException;
+
+ protected abstract void onException(HyracksDataException e);
+
+ protected abstract void onFailure(Throwable e);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/IPrefetchHandler.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/IPrefetchHandler.java
new file mode 100644
index 0000000..1e5af97
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/IPrefetchHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk.prefetch;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IPrefetchHandler {
+ void request(AbstractPrefetchRequest request) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/PrefetchState.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/PrefetchState.java
new file mode 100644
index 0000000..29491a9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/prefetch/PrefetchState.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.disk.prefetch;
+
+enum PrefetchState {
+ QUEUED,
+ ABORTED,
+ FAILED,
+ FINISHED
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
index 3d4d80b..3edec4b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
@@ -26,12 +26,15 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.common.buffercache.AbstractBufferedFileIOManager;
import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
import org.apache.hyracks.storage.common.buffercache.CachedPage;
import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheWriteContext;
import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
import org.apache.hyracks.storage.common.compression.file.NoOpLAFWriter;
@@ -68,17 +71,19 @@
}
@Override
- public void read(CachedPage cPage) throws HyracksDataException {
+ public void read(CachedPage cPage, IBufferCacheReadContext context) throws HyracksDataException {
final BufferCacheHeaderHelper header = checkoutHeaderHelper();
try {
- long bytesRead =
- readToBuffer(header.prepareRead(bufferCache.getPageSizeWithHeader()), getFirstPageOffset(cPage));
+ setPageInfo(cPage);
+ IFileHandle handle = getFileHandle();
+ int pageSize = bufferCache.getPageSizeWithHeader();
+ long bytesRead = header.readFromFile(ioManager, handle, getFirstPageOffset(cPage), pageSize);
if (!verifyBytesRead(bufferCache.getPageSizeWithHeader(), bytesRead)) {
return;
}
- final ByteBuffer buf = header.processHeader(cPage);
+ final ByteBuffer buf = context.processHeader(ioManager, this, header, cPage);
cPage.getBuffer().put(buf);
} finally {
returnHeaderHelper(header);
@@ -87,6 +92,11 @@
readExtraPages(cPage);
}
+ private void setPageInfo(CachedPage cPage) {
+ cPage.setCompressedPageOffset(getFirstPageOffset(cPage));
+ cPage.setCompressedPageSize(bufferCache.getPageSize());
+ }
+
private void readExtraPages(CachedPage cPage) throws HyracksDataException {
final int totalPages = cPage.getFrameSizeMultiplier();
if (totalPages > 1) {
@@ -98,22 +108,24 @@
}
@Override
- protected void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages, int extraBlockPageId)
- throws HyracksDataException {
+ protected void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages, int extraBlockPageId,
+ IBufferCacheWriteContext context) throws HyracksDataException {
final ByteBuffer buf = cPage.getBuffer();
final boolean contiguousLargePages = getPageId(cPage.getDiskPageId()) + 1 == extraBlockPageId;
+ IFileHandle handle = getFileHandle();
long bytesWritten;
try {
buf.limit(contiguousLargePages ? bufferCache.getPageSize() * totalPages : bufferCache.getPageSize());
buf.position(0);
- bytesWritten = writeToFile(header.prepareWrite(cPage), getFirstPageOffset(cPage));
+ ByteBuffer[] buffers = header.prepareWrite(cPage);
+ bytesWritten = context.write(ioManager, handle, getFirstPageOffset(cPage), buffers);
} finally {
returnHeaderHelper(header);
}
if (totalPages > 1 && !contiguousLargePages) {
buf.limit(totalPages * bufferCache.getPageSize());
- bytesWritten += writeToFile(buf, getExtraPageOffset(cPage));
+ bytesWritten += writeExtraToFile(buf, getExtraPageOffset(cPage));
}
final int expectedWritten = bufferCache.getPageSizeWithHeader() + bufferCache.getPageSize() * (totalPages - 1);
@@ -121,6 +133,11 @@
}
@Override
+ public long getStartPageOffset(int pageId) throws HyracksDataException {
+ return (long) pageId * bufferCache.getPageSizeWithHeader();
+ }
+
+ @Override
public int getNumberOfPages() throws HyracksDataException {
if (DEBUG) {
assert getFileSize() % bufferCache.getPageSizeWithHeader() == 0;
@@ -134,6 +151,12 @@
}
@Override
+ public long getPagesTotalSize(int startPageId, int numberOfPages) throws HyracksDataException {
+ // This could be an overestimate as we cannot determine for sure as extra pages do not have a header
+ return (long) numberOfPages * bufferCache.getPageSizeWithHeader();
+ }
+
+ @Override
protected long getFirstPageOffset(CachedPage cPage) {
return getPageOffset(getPageId(cPage.getDiskPageId()));
}
@@ -143,10 +166,6 @@
return getPageOffset(cPage.getExtraBlockPageId());
}
- private long getPageOffset(long pageId) {
- return pageId * bufferCache.getPageSizeWithHeader();
- }
-
public static long getDiskPageId(int fileId, int pageId) {
return (((long) fileId) << 32) + pageId;
}
@@ -169,4 +188,8 @@
}
return new BufferedFileHandle(fileId, bufferCache, ioManager, headerPageCache, pageReplacementStrategy);
}
+
+ private long getPageOffset(int pageId) {
+ return (long) pageId * bufferCache.getPageSizeWithHeader();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index 01811c7..ed71a94 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -24,11 +24,14 @@
import org.apache.hyracks.api.compression.ICompressorDecompressor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
import org.apache.hyracks.storage.common.buffercache.CachedPage;
import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.buffercache.context.page.IBufferCacheWriteContext;
import org.apache.hyracks.storage.common.compression.file.CompressedFileManager;
import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
@@ -45,16 +48,18 @@
}
@Override
- public void read(CachedPage cPage) throws HyracksDataException {
+ public void read(CachedPage cPage, IBufferCacheReadContext context) throws HyracksDataException {
final BufferCacheHeaderHelper header = checkoutHeaderHelper();
try {
compressedFileManager.setCompressedPageInfo(cPage);
- long bytesRead = readToBuffer(header.prepareRead(cPage.getCompressedPageSize()), getFirstPageOffset(cPage));
-
+ IFileHandle handle = getFileHandle();
+ int size = cPage.getCompressedPageSize();
+ long bytesRead = header.readFromFile(ioManager, handle, getFirstPageOffset(cPage), size);
if (!verifyBytesRead(cPage.getCompressedPageSize(), bytesRead)) {
return;
}
- final ByteBuffer cBuffer = header.processHeader(cPage);
+
+ final ByteBuffer cBuffer = context.processHeader(ioManager, this, header, cPage);
final ByteBuffer uBuffer = cPage.getBuffer();
fixBufferPointers(uBuffer, 0);
if (cPage.getCompressedPageSize() < bufferCache.getPageSizeWithHeader()) {
@@ -93,9 +98,10 @@
}
@Override
- protected void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages, int extraBlockPageId)
- throws HyracksDataException {
+ protected void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages, int extraBlockPageId,
+ IBufferCacheWriteContext context) throws HyracksDataException {
try {
+ IFileHandle handle = getFileHandle();
final ByteBuffer cBuffer = header.prepareWrite(cPage, getRequiredBufferSize());
final ByteBuffer uBuffer = cPage.getBuffer();
final long pageId = cPage.getDiskPageId();
@@ -103,18 +109,22 @@
final long bytesWritten;
final long expectedBytesWritten;
- fixBufferPointers(uBuffer, 0);
+ if (cPage.isLargePage()) {
+ fixBufferPointers(uBuffer, 0);
+ } else {
+ uBuffer.position(0);
+ }
if (compressToWriteBuffer(uBuffer, cBuffer) < bufferCache.getPageSize()) {
cBuffer.position(0);
final long offset = compressedFileManager.writePageInfo(pageId, cBuffer.remaining());
expectedBytesWritten = cBuffer.limit();
- bytesWritten = writeToFile(cBuffer, offset);
+ bytesWritten = context.write(ioManager, handle, offset, cBuffer);
} else {
//Compression did not gain any savings
final ByteBuffer[] buffers = header.prepareWrite(cPage);
final long offset = compressedFileManager.writePageInfo(pageId, bufferCache.getPageSizeWithHeader());
expectedBytesWritten = buffers[0].limit() + (long) buffers[1].limit();
- bytesWritten = writeToFile(buffers, offset);
+ bytesWritten = context.write(ioManager, handle, offset, buffers);
}
verifyBytesWritten(expectedBytesWritten, bytesWritten);
@@ -147,7 +157,7 @@
final int length = writeBuffer.remaining();
final long offset = compressedFileManager.writeExtraPageInfo(extraBlockPageId, length, i - 1);
expectedBytesWritten += length;
- bytesWritten += writeToFile(writeBuffer, offset);
+ bytesWritten += writeExtraToFile(writeBuffer, offset);
}
verifyBytesWritten(expectedBytesWritten, bytesWritten);
@@ -199,6 +209,11 @@
}
@Override
+ public long getStartPageOffset(int pageId) throws HyracksDataException {
+ return compressedFileManager.getPageOffset(pageId);
+ }
+
+ @Override
public int getNumberOfPages() {
return compressedFileManager.getNumberOfPages();
}
@@ -218,6 +233,11 @@
return compressedFileManager.getCompressedPageWriter();
}
+ @Override
+ public long getPagesTotalSize(int startPageId, int numberOfPages) throws HyracksDataException {
+ return compressedFileManager.getTotalCompressedSize(startPageId, numberOfPages);
+ }
+
/* ********************************
* Compression methods
* ********************************
@@ -233,7 +253,6 @@
private void uncompressToPageBuffer(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException {
final ICompressorDecompressor compDecomp = compressedFileManager.getCompressorDecompressor();
compDecomp.uncompress(cBuffer, uBuffer);
- verifyUncompressionSize(bufferCache.getPageSize(), uBuffer.remaining());
}
private int compressToWriteBuffer(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException {
@@ -246,11 +265,4 @@
final ICompressorDecompressor compDecomp = compressedFileManager.getCompressorDecompressor();
return compDecomp.computeCompressedBufferSize(bufferCache.getPageSize());
}
-
- private void verifyUncompressionSize(int expected, int actual) {
- if (expected != actual) {
- throwException("Uncompressed", expected, actual);
- }
- }
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
index 00a041f..8e002b5 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -39,6 +39,7 @@
import org.apache.hyracks.storage.common.IResourceLifecycleManager;
import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator;
import org.apache.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
@@ -146,7 +147,8 @@
return bufferCache;
}
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
- IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, pageSize, numPages);
+ IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator,
+ DefaultDiskCachedPageAllocator.INSTANCE, pageSize, numPages);
IFileMapProvider fileMapProvider = getFileMapProvider();
bufferCache = new BufferCache(ioManager, prs, new DelayPageCleanerPolicy(1000),
(IFileMapManager) fileMapProvider, maxOpenFiles, 10, threadFactory, new HashMap<>(),
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/TestOnly.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/TestOnly.java
new file mode 100644
index 0000000..458898e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/TestOnly.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util.annotations;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.SOURCE)
+public @interface TestOnly {
+}
diff --git a/hyracks-fullstack/hyracks/pom.xml b/hyracks-fullstack/hyracks/pom.xml
index 96bf131..ff390a9 100644
--- a/hyracks-fullstack/hyracks/pom.xml
+++ b/hyracks-fullstack/hyracks/pom.xml
@@ -101,5 +101,6 @@
<module>hyracks-hdfs</module>
<module>hyracks-dist</module>
<module>hyracks-http</module>
+ <module>hyracks-cloud</module>
</modules>
</project>