[ASTERIXDB-3453][STO] Incompressible pages are not written as full pages
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Incompressible pages must be written as full pages (i.e., as
page + header) entirely to ensure the position of the cloud
files are in-sync with local files.
Change-Id: Iccebe6fcab375d064825ab2e9343b96daf8afbc6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18446
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 91c24e8..033f135 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -210,10 +210,11 @@
}
@Override
- public final int cloudWrite(IFileHandle fHandle, ByteBuffer data) throws HyracksDataException {
+ public final int cloudWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
ICloudWriter cloudWriter = ((CloudFileHandle) fHandle).getCloudWriter();
int writtenBytes;
try {
+ ensurePosition(fHandle, cloudWriter.position(), offset);
writtenBytes = cloudWriter.write(data);
} catch (HyracksDataException e) {
cloudWriter.abort();
@@ -223,10 +224,11 @@
}
@Override
- public final long cloudWrite(IFileHandle fHandle, ByteBuffer[] data) throws HyracksDataException {
+ public final long cloudWrite(IFileHandle fHandle, long offset, ByteBuffer[] data) throws HyracksDataException {
ICloudWriter cloudWriter = ((CloudFileHandle) fHandle).getCloudWriter();
int writtenBytes;
try {
+ ensurePosition(fHandle, cloudWriter.position(), offset);
writtenBytes = cloudWriter.write(data[0], data[1]);
} catch (HyracksDataException e) {
cloudWriter.abort();
@@ -265,18 +267,33 @@
@Override
public final long doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray)
throws HyracksDataException {
+ // Save original position and limit
+ ByteBuffer buffer1 = dataArray[0];
+ int position1 = buffer1.position();
+
+ ByteBuffer buffer2 = dataArray[1];
+ int position2 = buffer2.position();
+
long writtenBytes = localIoManager.doSyncWrite(fHandle, offset, dataArray);
- dataArray[0].flip();
- dataArray[1].flip();
- cloudWrite(fHandle, dataArray);
+
+ // Restore original position
+ buffer1.position(position1);
+ buffer2.position(position2);
+
+ cloudWrite(fHandle, offset, dataArray);
return writtenBytes;
}
@Override
public final int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+ // Save original position and limit
+ int position = data.position();
+
int writtenBytes = localIoManager.doSyncWrite(fHandle, offset, data);
- data.flip();
- cloudWrite(fHandle, data);
+
+ // Restore original position
+ data.position(position);
+ cloudWrite(fHandle, offset, data);
return writtenBytes;
}
@@ -390,4 +407,11 @@
performBulkOperation(deleteBulkOperation);
}
}
+
+ private void ensurePosition(IFileHandle fileHandle, long cloudOffset, long requestedWriteOffset) {
+ if (cloudOffset != requestedWriteOffset) {
+ throw new IllegalStateException("Misaligned positions in " + fileHandle.getFileReference()
+ + ", cloudOffset: " + cloudOffset + " != requestedWriteOffset: " + requestedWriteOffset);
+ }
+ }
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index cace898..a233ca5 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -36,10 +36,12 @@
private final IWriteBufferProvider bufferProvider;
private final ICloudBufferedWriter bufferedWriter;
private ByteBuffer writeBuffer;
+ private long writtenBytes;
public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter, IWriteBufferProvider bufferProvider) {
this.bufferedWriter = bufferedWriter;
this.bufferProvider = bufferProvider;
+ writtenBytes = 0;
}
/* ************************************************************
@@ -75,7 +77,7 @@
@Override
public int write(ByteBuffer page) throws HyracksDataException {
open();
- return write(page.array(), 0, page.limit());
+ return write(page.array(), page.position(), page.remaining());
}
@Override
@@ -84,6 +86,7 @@
uploadAndWait();
}
writeBuffer.put((byte) b);
+ writtenBytes += 1;
}
@Override
@@ -102,7 +105,7 @@
// enough to write all
if (writeBuffer.remaining() > pageRemaining) {
writeBuffer.put(b, offset, pageRemaining);
- return len;
+ break;
}
int remaining = writeBuffer.remaining();
@@ -112,10 +115,16 @@
uploadAndWait();
}
+ writtenBytes += len;
return len;
}
@Override
+ public long position() {
+ return writtenBytes;
+ }
+
+ @Override
public int read(byte[] b, int off, int len) throws IOException {
if (writeBuffer.remaining() == 0) {
return -1;
@@ -173,6 +182,7 @@
if (writeBuffer == null) {
writeBuffer = bufferProvider.getBuffer();
writeBuffer.clear();
+ writtenBytes = 0;
}
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
index 15822c4..920be9c 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
@@ -61,6 +61,11 @@
int write(byte[] b, int off, int len) throws HyracksDataException;
/**
+ * @return the current position of the writer
+ */
+ long position();
+
+ /**
* Finish the write operation
* Note: this should be called upon successful write
*/
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index a6dade5..d9119a5 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -41,12 +41,14 @@
private final IRequestProfiler profiler;
private final Storage gcsClient;
private WriteChannel writer = null;
+ private long writtenBytes;
public GCSWriter(String bucket, String path, Storage gcsClient, IRequestProfiler profiler) {
this.bucket = bucket;
this.path = path;
this.profiler = profiler;
this.gcsClient = gcsClient;
+ writtenBytes = 0;
}
@Override
@@ -67,17 +69,26 @@
throw HyracksDataException.create(e);
}
+ writtenBytes += written;
return written;
}
@Override
public int write(byte[] b, int off, int len) throws HyracksDataException {
- return write(ByteBuffer.wrap(b, off, len));
+ int written = write(ByteBuffer.wrap(b, off, len));
+ writtenBytes += written;
+ return written;
+ }
+
+ @Override
+ public long position() {
+ return writtenBytes;
}
@Override
public void write(int b) throws HyracksDataException {
write(ByteBuffer.wrap(new byte[] { (byte) b }));
+ writtenBytes += 1;
}
@Override
@@ -105,6 +116,7 @@
if (writer == null) {
writer = gcsClient.writer(BlobInfo.newBuilder(BlobId.of(bucket, path)).build());
writer.setChunkSize(WRITE_BUFFER_SIZE);
+ writtenBytes = 0;
log("STARTED");
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
index c75e83a..69d33e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
@@ -36,13 +36,13 @@
public int write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer data)
throws HyracksDataException {
ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
- return cloudIOManager.cloudWrite(handle, data);
+ return cloudIOManager.cloudWrite(handle, offset, data);
}
@Override
public long write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer[] data)
throws HyracksDataException {
ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
- return cloudIOManager.cloudWrite(handle, data);
+ return cloudIOManager.cloudWrite(handle, offset, data);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index ab57139..9b6a2ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -74,19 +74,21 @@
* Write to cloud only
*
* @param fHandle file handle
+ * @param offset position to write from
* @param data to write
* @return number of written bytes
*/
- int cloudWrite(IFileHandle fHandle, ByteBuffer data) throws HyracksDataException;
+ int cloudWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
/**
* Write to cloud only
*
* @param fHandle file handle
+ * @param offset position to write from
* @param data to write
* @return number of written bytes
*/
- long cloudWrite(IFileHandle fHandle, ByteBuffer[] data) throws HyracksDataException;
+ long cloudWrite(IFileHandle fHandle, long offset, ByteBuffer[] data) throws HyracksDataException;
/**
* Punch a hole in a file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index 6ad4d27..6bc85ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -123,8 +123,10 @@
expectedBytesWritten = cBuffer.limit();
bytesWritten = context.write(ioManager, handle, offset, cBuffer);
} else {
- //Compression did not gain any savings
+ // Compression did not gain any savings
final ByteBuffer[] buffers = header.prepareWrite(cPage);
+ // Incompressible pages should be written entirely
+ fixBufferPointers(buffers[1], 0);
offset = compressedFileManager.writePageInfo(pageId, bufferCache.getPageSizeWithHeader());
expectedBytesWritten = buffers[0].limit() + (long) buffers[1].limit();
bytesWritten = context.write(ioManager, handle, offset, buffers);
@@ -152,7 +154,7 @@
long bytesWritten = 0;
for (int i = 1; i < totalPages; i++) {
fixBufferPointers(uBuffer, i);
- cBuffer.position(0);
+ cBuffer.clear();
final ByteBuffer writeBuffer;
if (compressToWriteBuffer(uBuffer, cBuffer) < bufferCache.getPageSize()) {