[ASTERIXDB-3453][STO] Incompressible pages are not written as full pages

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Incompressible pages must be written as full pages (i.e., as
page + header) entirely to ensure the position of the cloud
files are in-sync with local files.

Change-Id: Iccebe6fcab375d064825ab2e9343b96daf8afbc6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18446
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 91c24e8..033f135 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -210,10 +210,11 @@
     }
 
     @Override
-    public final int cloudWrite(IFileHandle fHandle, ByteBuffer data) throws HyracksDataException {
+    public final int cloudWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
         ICloudWriter cloudWriter = ((CloudFileHandle) fHandle).getCloudWriter();
         int writtenBytes;
         try {
+            ensurePosition(fHandle, cloudWriter.position(), offset);
             writtenBytes = cloudWriter.write(data);
         } catch (HyracksDataException e) {
             cloudWriter.abort();
@@ -223,10 +224,11 @@
     }
 
     @Override
-    public final long cloudWrite(IFileHandle fHandle, ByteBuffer[] data) throws HyracksDataException {
+    public final long cloudWrite(IFileHandle fHandle, long offset, ByteBuffer[] data) throws HyracksDataException {
         ICloudWriter cloudWriter = ((CloudFileHandle) fHandle).getCloudWriter();
         int writtenBytes;
         try {
+            ensurePosition(fHandle, cloudWriter.position(), offset);
             writtenBytes = cloudWriter.write(data[0], data[1]);
         } catch (HyracksDataException e) {
             cloudWriter.abort();
@@ -265,18 +267,33 @@
     @Override
     public final long doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray)
             throws HyracksDataException {
+        // Save original position and limit
+        ByteBuffer buffer1 = dataArray[0];
+        int position1 = buffer1.position();
+
+        ByteBuffer buffer2 = dataArray[1];
+        int position2 = buffer2.position();
+
         long writtenBytes = localIoManager.doSyncWrite(fHandle, offset, dataArray);
-        dataArray[0].flip();
-        dataArray[1].flip();
-        cloudWrite(fHandle, dataArray);
+
+        // Restore original position
+        buffer1.position(position1);
+        buffer2.position(position2);
+
+        cloudWrite(fHandle, offset, dataArray);
         return writtenBytes;
     }
 
     @Override
     public final int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+        // Save original position and limit
+        int position = data.position();
+
         int writtenBytes = localIoManager.doSyncWrite(fHandle, offset, data);
-        data.flip();
-        cloudWrite(fHandle, data);
+
+        // Restore original position
+        data.position(position);
+        cloudWrite(fHandle, offset, data);
         return writtenBytes;
     }
 
@@ -390,4 +407,11 @@
             performBulkOperation(deleteBulkOperation);
         }
     }
+
+    private void ensurePosition(IFileHandle fileHandle, long cloudOffset, long requestedWriteOffset) {
+        if (cloudOffset != requestedWriteOffset) {
+            throw new IllegalStateException("Misaligned positions in " + fileHandle.getFileReference()
+                    + ", cloudOffset: " + cloudOffset + " != requestedWriteOffset: " + requestedWriteOffset);
+        }
+    }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index cace898..a233ca5 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -36,10 +36,12 @@
     private final IWriteBufferProvider bufferProvider;
     private final ICloudBufferedWriter bufferedWriter;
     private ByteBuffer writeBuffer;
+    private long writtenBytes;
 
     public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter, IWriteBufferProvider bufferProvider) {
         this.bufferedWriter = bufferedWriter;
         this.bufferProvider = bufferProvider;
+        writtenBytes = 0;
     }
 
     /* ************************************************************
@@ -75,7 +77,7 @@
     @Override
     public int write(ByteBuffer page) throws HyracksDataException {
         open();
-        return write(page.array(), 0, page.limit());
+        return write(page.array(), page.position(), page.remaining());
     }
 
     @Override
@@ -84,6 +86,7 @@
             uploadAndWait();
         }
         writeBuffer.put((byte) b);
+        writtenBytes += 1;
     }
 
     @Override
@@ -102,7 +105,7 @@
             // enough to write all
             if (writeBuffer.remaining() > pageRemaining) {
                 writeBuffer.put(b, offset, pageRemaining);
-                return len;
+                break;
             }
 
             int remaining = writeBuffer.remaining();
@@ -112,10 +115,16 @@
             uploadAndWait();
         }
 
+        writtenBytes += len;
         return len;
     }
 
     @Override
+    public long position() {
+        return writtenBytes;
+    }
+
+    @Override
     public int read(byte[] b, int off, int len) throws IOException {
         if (writeBuffer.remaining() == 0) {
             return -1;
@@ -173,6 +182,7 @@
         if (writeBuffer == null) {
             writeBuffer = bufferProvider.getBuffer();
             writeBuffer.clear();
+            writtenBytes = 0;
         }
     }
 
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
index 15822c4..920be9c 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
@@ -61,6 +61,11 @@
     int write(byte[] b, int off, int len) throws HyracksDataException;
 
     /**
+     * @return the current position of the writer
+     */
+    long position();
+
+    /**
      * Finish the write operation
      * Note: this should be called upon successful write
      */
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index a6dade5..d9119a5 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -41,12 +41,14 @@
     private final IRequestProfiler profiler;
     private final Storage gcsClient;
     private WriteChannel writer = null;
+    private long writtenBytes;
 
     public GCSWriter(String bucket, String path, Storage gcsClient, IRequestProfiler profiler) {
         this.bucket = bucket;
         this.path = path;
         this.profiler = profiler;
         this.gcsClient = gcsClient;
+        writtenBytes = 0;
     }
 
     @Override
@@ -67,17 +69,26 @@
             throw HyracksDataException.create(e);
         }
 
+        writtenBytes += written;
         return written;
     }
 
     @Override
     public int write(byte[] b, int off, int len) throws HyracksDataException {
-        return write(ByteBuffer.wrap(b, off, len));
+        int written = write(ByteBuffer.wrap(b, off, len));
+        writtenBytes += written;
+        return written;
+    }
+
+    @Override
+    public long position() {
+        return writtenBytes;
     }
 
     @Override
     public void write(int b) throws HyracksDataException {
         write(ByteBuffer.wrap(new byte[] { (byte) b }));
+        writtenBytes += 1;
     }
 
     @Override
@@ -105,6 +116,7 @@
         if (writer == null) {
             writer = gcsClient.writer(BlobInfo.newBuilder(BlobId.of(bucket, path)).build());
             writer.setChunkSize(WRITE_BUFFER_SIZE);
+            writtenBytes = 0;
             log("STARTED");
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
index c75e83a..69d33e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
@@ -36,13 +36,13 @@
     public int write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer data)
             throws HyracksDataException {
         ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
-        return cloudIOManager.cloudWrite(handle, data);
+        return cloudIOManager.cloudWrite(handle, offset, data);
     }
 
     @Override
     public long write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer[] data)
             throws HyracksDataException {
         ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
-        return cloudIOManager.cloudWrite(handle, data);
+        return cloudIOManager.cloudWrite(handle, offset, data);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index ab57139..9b6a2ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -74,19 +74,21 @@
      * Write to cloud only
      *
      * @param fHandle file handle
+     * @param offset  position to write from
      * @param data    to write
      * @return number of written bytes
      */
-    int cloudWrite(IFileHandle fHandle, ByteBuffer data) throws HyracksDataException;
+    int cloudWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
 
     /**
      * Write to cloud only
      *
      * @param fHandle file handle
+     * @param offset  position to write from
      * @param data    to write
      * @return number of written bytes
      */
-    long cloudWrite(IFileHandle fHandle, ByteBuffer[] data) throws HyracksDataException;
+    long cloudWrite(IFileHandle fHandle, long offset, ByteBuffer[] data) throws HyracksDataException;
 
     /**
      * Punch a hole in a file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index 6ad4d27..6bc85ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -123,8 +123,10 @@
                 expectedBytesWritten = cBuffer.limit();
                 bytesWritten = context.write(ioManager, handle, offset, cBuffer);
             } else {
-                //Compression did not gain any savings
+                // Compression did not gain any savings
                 final ByteBuffer[] buffers = header.prepareWrite(cPage);
+                // Incompressible pages should be written entirely
+                fixBufferPointers(buffers[1], 0);
                 offset = compressedFileManager.writePageInfo(pageId, bufferCache.getPageSizeWithHeader());
                 expectedBytesWritten = buffers[0].limit() + (long) buffers[1].limit();
                 bytesWritten = context.write(ioManager, handle, offset, buffers);
@@ -152,7 +154,7 @@
         long bytesWritten = 0;
         for (int i = 1; i < totalPages; i++) {
             fixBufferPointers(uBuffer, i);
-            cBuffer.position(0);
+            cBuffer.clear();
 
             final ByteBuffer writeBuffer;
             if (compressToWriteBuffer(uBuffer, cBuffer) < bufferCache.getPageSize()) {