[ASTERIXDB-3129][STO][RT] Add columnn encoders/decoders
- user mode changes: no
- storage format changes: no
- interface changes: yes
Details:
- Add a modified version of Parquet's encoders/decoders
that fits our needs and avoids object creation. Also,
accepts Hyracks values (i.e., IValueReference)
- Add column streams (in/out) for reading/writing
encoded column values
Interface changes:
Add close() to the interface for IColumnTupleIterator
to log the number of filtered pages.
Change-Id: Ib185ba5da37b4c88523a028e7cc4108aefc0145a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17413
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-column/pom.xml b/asterixdb/asterix-column/pom.xml
new file mode 100644
index 0000000..2ee75c8
--- /dev/null
+++ b/asterixdb/asterix-column/pom.xml
@@ -0,0 +1,151 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>apache-asterixdb</artifactId>
+ <groupId>org.apache.asterix</groupId>
+ <version>0.9.8-SNAPSHOT</version>
+ </parent>
+ <artifactId>asterix-column</artifactId>
+
+ <licenses>
+ <license>
+ <name>Apache License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ <comments>A business-friendly OSS license</comments>
+ </license>
+ </licenses>
+
+ <properties>
+ <root.dir>${basedir}/..</root.dir>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>default</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ <configuration>
+ <licenses>
+ <license implementation="org.apache.rat.analysis.license.ApacheSoftwareLicense20"/>
+ </licenses>
+ <excludes combine.children="append">
+ <exclude>src/test/resources/result/**</exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-om</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-runtime</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-external-data</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-data-std</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-am-lsm-btree-column</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>algebricks-data</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-am-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-column</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-encoding</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/ParquetDeltaBinaryPackingConfig.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/ParquetDeltaBinaryPackingConfig.java
new file mode 100644
index 0000000..f591d57
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/ParquetDeltaBinaryPackingConfig.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+
+/**
+ * Copy of {@link org.apache.parquet.column.values.delta.DeltaBinaryPackingConfig}
+ */
+public class ParquetDeltaBinaryPackingConfig {
+ private int blockSizeInValues;
+ private int miniBlockNumInABlock;
+ private int miniBlockSizeInValues;
+
+ public ParquetDeltaBinaryPackingConfig(int blockSizeInValues, int miniBlockNumInABlock) {
+ reset(blockSizeInValues, miniBlockNumInABlock);
+ }
+
+ private void reset(int blockSizeInValues, int miniBlockNumInABlock) {
+ this.blockSizeInValues = blockSizeInValues;
+ this.miniBlockNumInABlock = miniBlockNumInABlock;
+ double miniSize = (double) blockSizeInValues / miniBlockNumInABlock;
+ Preconditions.checkArgument(miniSize % 8 == 0, "miniBlockSize must be multiple of 8, but it's " + miniSize);
+ this.miniBlockSizeInValues = (int) miniSize;
+ }
+
+ public static ParquetDeltaBinaryPackingConfig readConfig(InputStream in, ParquetDeltaBinaryPackingConfig config)
+ throws IOException {
+ final int blockSizeInValues = BytesUtils.readUnsignedVarInt(in);
+ final int miniBlockNumInABlock = BytesUtils.readUnsignedVarInt(in);
+ if (config == null) {
+ return new ParquetDeltaBinaryPackingConfig(blockSizeInValues, miniBlockNumInABlock);
+ }
+ config.reset(blockSizeInValues, miniBlockNumInABlock);
+ return config;
+ }
+
+ public BytesInput toBytesInput() {
+ return BytesInput.concat(BytesInput.fromUnsignedVarInt(blockSizeInValues),
+ BytesInput.fromUnsignedVarInt(miniBlockNumInABlock));
+ }
+
+ public int getBlockSizeInValues() {
+ return blockSizeInValues;
+ }
+
+ public int getMiniBlockNumInABlock() {
+ return miniBlockNumInABlock;
+ }
+
+ public int getMiniBlockSizeInValues() {
+ return miniBlockSizeInValues;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/AbstractParquetValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/AbstractParquetValuesReader.java
new file mode 100644
index 0000000..5f5b88c
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/AbstractParquetValuesReader.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.decoder;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.column.values.ValuesReader;
+
+/**
+ * Replaces {@link ValuesReader}
+ */
+public abstract class AbstractParquetValuesReader {
+ public abstract void initFromPage(AbstractBytesInputStream stream) throws IOException;
+
+ public abstract void skip();
+
+ public int readInteger() {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ public long readLong() {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ public double readDouble() {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ public IValueReference readBytes() {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaBinaryPackingValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaBinaryPackingValuesReader.java
new file mode 100644
index 0000000..9aafa0f
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaBinaryPackingValuesReader.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.decoder;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.asterix.column.bytes.ParquetDeltaBinaryPackingConfig;
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bitpacking.BytePackerForLong;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+
+/**
+ * Re-implementation of {@link DeltaBinaryPackingValuesReader}
+ */
+public class ParquetDeltaBinaryPackingValuesReader extends AbstractParquetValuesReader {
+ private int totalValueCount;
+ /**
+ * values read by the caller
+ */
+ private int valuesBufferedRead;
+ private int valuesRead;
+
+ /**
+ * stores the decoded values including the first value which is written to the header
+ */
+ private long[] valuesBuffer;
+ /**
+ * values loaded to the buffer, it could be bigger than the totalValueCount
+ * when data is not aligned to mini block, which means padding 0s are in the buffer
+ */
+ private int valuesBuffered;
+ private AbstractBytesInputStream in;
+ private ParquetDeltaBinaryPackingConfig config;
+ private int[] bitWidths;
+ private ByteBuffer bitWidthBuffer;
+ private long lastElement;
+
+ /**
+ * Loads one block at a time instead of eagerly loading all blocks in {@link DeltaBinaryPackingValuesReader}.
+ * This is to fix the {@link #valuesBuffer} size
+ */
+ @Override
+ public void initFromPage(AbstractBytesInputStream stream) throws IOException {
+ this.in = stream;
+ this.config = ParquetDeltaBinaryPackingConfig.readConfig(in, this.config);
+ this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
+ allocateValuesBuffer();
+ bitWidths = allocate(bitWidths, config.getMiniBlockNumInABlock());
+ valuesBuffered = 0;
+
+ valuesBufferedRead = 0;
+ valuesRead = 0;
+
+ //read first value from header
+ valuesBuffer[valuesBuffered++] = BytesUtils.readZigZagVarLong(in);
+ lastElement = valuesBuffer[0];
+
+ if (valuesBuffered < totalValueCount) {
+ loadNewBlockToBuffer();
+ }
+ }
+
+ /**
+ * the value buffer is allocated so that the size of it is multiple of mini block
+ * because when writing, data is flushed on a mini block basis
+ */
+ private void allocateValuesBuffer() {
+ //+ 1 because first value written to header is also stored in values buffer
+ final int bufferSize = config.getMiniBlockSizeInValues() * config.getMiniBlockNumInABlock() + 1;
+ if (valuesBuffer == null || valuesBuffer.length < bufferSize) {
+ valuesBuffer = new long[bufferSize];
+ } else {
+ Arrays.fill(valuesBuffer, 0);
+ }
+ }
+
+ private int[] allocate(int[] array, int size) {
+ if (array == null || array.length < size) {
+ return new int[size];
+ }
+ return array;
+ }
+
+ @Override
+ public void skip() {
+ checkRead();
+ valuesRead++;
+ }
+
+ @Override
+ public int readInteger() {
+ // TODO: probably implement it separately
+ return (int) readLong();
+ }
+
+ @Override
+ public long readLong() {
+ checkRead();
+ valuesRead++;
+ return valuesBuffer[valuesBufferedRead++];
+ }
+
+ private void checkRead() {
+ if (valuesRead >= totalValueCount) {
+ throw new ParquetDecodingException("no more value to read, total value count is " + totalValueCount);
+ }
+ if (valuesBufferedRead >= valuesBuffered) {
+ //Set the last value buffered as the first
+ lastElement = valuesBuffer[valuesBufferedRead - 1];
+ valuesBufferedRead = 0;
+ valuesBuffered = 0;
+ Arrays.fill(valuesBuffer, 0);
+ try {
+ loadNewBlockToBuffer();
+ } catch (IOException e) {
+ throw new ParquetDecodingException("can not load next block", e);
+ }
+
+ }
+ }
+
+ private void loadNewBlockToBuffer() throws IOException {
+ long minDeltaInCurrentBlock;
+ try {
+ minDeltaInCurrentBlock = BytesUtils.readZigZagVarLong(in);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("can not read min delta in current block", e);
+ }
+
+ readBitWidthsForMiniBlocks();
+
+ // mini block is atomic for reading, we read a mini block when there are more values left
+ int i;
+ for (i = 0; i < config.getMiniBlockNumInABlock() && valuesRead + valuesBuffered < totalValueCount; i++) {
+ BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(bitWidths[i]);
+ unpackMiniBlock(packer);
+ }
+
+ //calculate values from deltas unpacked for current block
+ int valueUnpacked = i * config.getMiniBlockSizeInValues();
+ long prev = lastElement;
+ for (int j = valuesBuffered - valueUnpacked; j < valuesBuffered; j++) {
+ valuesBuffer[j] += minDeltaInCurrentBlock + prev;
+ prev = valuesBuffer[j];
+ }
+ }
+
+ /**
+ * mini block has a size of 8*n, unpack 8 value each time
+ *
+ * @param packer the packer created from bitwidth of current mini block
+ */
+ private void unpackMiniBlock(BytePackerForLong packer) throws IOException {
+ for (int j = 0; j < config.getMiniBlockSizeInValues(); j += 8) {
+ unpack8Values(packer);
+ }
+ }
+
+ private void unpack8Values(BytePackerForLong packer) throws IOException {
+ // get a single buffer of 8 values. most of the time, this won't require a copy
+ ByteBuffer buffer = readBitWidth(packer.getBitWidth());
+ packer.unpack8Values(buffer, buffer.position(), valuesBuffer, valuesBuffered);
+ this.valuesBuffered += 8;
+ }
+
+ private void readBitWidthsForMiniBlocks() {
+ for (int i = 0; i < config.getMiniBlockNumInABlock(); i++) {
+ try {
+ bitWidths[i] = BytesUtils.readIntLittleEndianOnOneByte(in);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Can not decode bit width in block header", e);
+ }
+ }
+ }
+
+ private ByteBuffer prepareBitWidthBuffer(int length) {
+ if (bitWidthBuffer == null || bitWidthBuffer.capacity() < length) {
+ bitWidthBuffer = ByteBuffer.allocate(length);
+ }
+ bitWidthBuffer.clear();
+ bitWidthBuffer.limit(length);
+ return bitWidthBuffer;
+ }
+
+ private ByteBuffer readBitWidth(int length) throws IOException {
+ ByteBuffer buffer = prepareBitWidthBuffer(length);
+ int read = in.read(buffer);
+ if (read != length) {
+ throw new EOFException("Reached end of stream");
+ }
+ buffer.position(0);
+ return buffer;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaByteArrayReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaByteArrayReader.java
new file mode 100644
index 0000000..70c25b8
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaByteArrayReader.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.decoder;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
+import org.apache.parquet.io.ParquetDecodingException;
+
+/**
+ * Re-implementation of {@link DeltaByteArrayReader}
+ */
+public class ParquetDeltaByteArrayReader extends AbstractParquetValuesReader {
+ private final AbstractParquetValuesReader prefixLengthReader;
+ private final ParquetDeltaLengthByteArrayValuesReader suffixReader;
+ private final byte[] lengthBytes;
+
+ private final ArrayBackedValueStorage temp;
+ private final ArrayBackedValueStorage previous;
+ boolean newPage;
+
+ public ParquetDeltaByteArrayReader(boolean containsLength) {
+ this.prefixLengthReader = new ParquetDeltaBinaryPackingValuesReader();
+ this.suffixReader = new ParquetDeltaLengthByteArrayValuesReader();
+ this.temp = new ArrayBackedValueStorage();
+ this.previous = new ArrayBackedValueStorage();
+ lengthBytes = containsLength ? new byte[4] : new byte[0];
+ }
+
+ @Override
+ public void initFromPage(AbstractBytesInputStream stream) throws IOException {
+ AbstractBytesInputStream prefixStream = stream.sliceStream(BytesUtils.readUnsignedVarInt(stream));
+ prefixLengthReader.initFromPage(prefixStream);
+ suffixReader.initFromPage(stream);
+ previous.reset();
+ temp.reset();
+ newPage = true;
+ }
+
+ @Override
+ public void skip() {
+ // read the next value to skip so that previous is correct.
+ this.readBytes();
+ }
+
+ @Override
+ public IValueReference readBytes() {
+ int prefixLength = prefixLengthReader.readInteger();
+ // This does not copy bytes
+ IValueReference suffix = suffixReader.readBytes();
+
+ // NOTE: due to PARQUET-246, it is important that we
+ // respect prefixLength which was read from prefixLengthReader,
+ // even for the *first* value of a page. Even though the first
+ // value of the page should have an empty prefix, it may not
+ // because of PARQUET-246.
+
+ // We have to do this to materialize the output
+ try {
+ int lengthSize;
+ if (prefixLength != 0) {
+ lengthSize = appendLength(prefixLength + suffix.getLength());
+ temp.append(previous.getByteArray(), previous.getStartOffset(), prefixLength);
+ } else {
+ lengthSize = appendLength(suffix.getLength());
+ }
+ temp.append(suffix);
+ /*
+ * Adding length after appending prefix and suffix is important as we do not overwrite the original
+ * previous bytes
+ * */
+ System.arraycopy(lengthBytes, 0, temp.getByteArray(), 0, lengthSize);
+ previous.set(temp.getByteArray(), temp.getStartOffset() + lengthSize, temp.getLength() - lengthSize);
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ newPage = false;
+ return temp;
+ }
+
+ private int appendLength(int length) {
+ if (lengthBytes.length > 0) {
+ int numOfBytes = UTF8StringUtil.encodeUTF8Length(length, lengthBytes, 0);
+ temp.setSize(numOfBytes);
+ return numOfBytes;
+ }
+ temp.setSize(0);
+ return 0;
+ }
+
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaLengthByteArrayValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaLengthByteArrayValuesReader.java
new file mode 100644
index 0000000..9913269
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaLengthByteArrayValuesReader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.decoder;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.io.ParquetDecodingException;
+
+public class ParquetDeltaLengthByteArrayValuesReader extends AbstractParquetValuesReader {
+
+ private final VoidPointable value;
+ private final AbstractParquetValuesReader lengthReader;
+ private AbstractBytesInputStream in;
+
+ public ParquetDeltaLengthByteArrayValuesReader() {
+ this.lengthReader = new ParquetDeltaBinaryPackingValuesReader();
+ value = new VoidPointable();
+ }
+
+ @Override
+ public void initFromPage(AbstractBytesInputStream stream) throws IOException {
+ AbstractBytesInputStream lengthStream = stream.sliceStream(BytesUtils.readUnsignedVarInt(stream));
+ lengthReader.initFromPage(lengthStream);
+ this.in = stream;
+ }
+
+ @Override
+ public void skip() {
+ int length = lengthReader.readInteger();
+ try {
+ in.skipFully(length);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to skip " + length + " bytes");
+ }
+ }
+
+ @Override
+ public IValueReference readBytes() {
+ int length = lengthReader.readInteger();
+ try {
+ in.read(value, length);
+ return value;
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read " + length + " bytes");
+ }
+ }
+
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDoublePlainValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDoublePlainValuesReader.java
new file mode 100644
index 0000000..196bec2
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDoublePlainValuesReader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.decoder;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.parquet.bytes.LittleEndianDataInputStream;
+import org.apache.parquet.io.ParquetDecodingException;
+
+public class ParquetDoublePlainValuesReader extends AbstractParquetValuesReader {
+ private LittleEndianDataInputStream in;
+
+ @Override
+ public void initFromPage(AbstractBytesInputStream stream) throws IOException {
+ this.in = new LittleEndianDataInputStream(stream.remainingStream());
+ }
+
+ @Override
+ public void skip() {
+ try {
+ in.skipBytes(8);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not skip double", e);
+ }
+ }
+
+ @Override
+ public double readDouble() {
+ try {
+ return in.readDouble();
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read double", e);
+ }
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetRunLengthBitPackingHybridDecoder.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetRunLengthBitPackingHybridDecoder.java
new file mode 100644
index 0000000..4607dc2
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetRunLengthBitPackingHybridDecoder.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.decoder;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+
+/**
+ * Re-implementation of {@link RunLengthBitPackingHybridDecoder}
+ */
+public class ParquetRunLengthBitPackingHybridDecoder {
+ private enum MODE {
+ RLE,
+ PACKED
+ }
+
+ private final int bitWidth;
+ private final BytePacker packer;
+ private InputStream in;
+
+ private MODE mode;
+ private int currentCount;
+ private int currentValue;
+ private int currentBufferLength;
+ private int[] currentBuffer;
+ private byte[] bytes;
+
+ public ParquetRunLengthBitPackingHybridDecoder(int bitWidth) {
+ Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
+ this.bitWidth = bitWidth;
+ this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
+ }
+
+ public void reset(InputStream in) {
+ this.in = in;
+ currentCount = 0;
+ currentBufferLength = 0;
+ }
+
+ public int readInt() throws HyracksDataException {
+ try {
+ return nextInt();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private int nextInt() throws IOException {
+ if (currentCount == 0) {
+ readNext();
+ }
+ --currentCount;
+ int result;
+ switch (mode) {
+ case RLE:
+ result = currentValue;
+ break;
+ case PACKED:
+ result = currentBuffer[currentBufferLength - 1 - currentCount];
+ break;
+ default:
+ throw new ParquetDecodingException("not a valid mode " + mode);
+ }
+ return result;
+ }
+
+ private void readNext() throws IOException {
+ Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream.");
+ final int header = BytesUtils.readUnsignedVarInt(in);
+ mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
+ switch (mode) {
+ case RLE:
+ currentCount = header >>> 1;
+ currentValue = BytesUtils.readIntLittleEndianPaddedOnBitWidth(in, bitWidth);
+ break;
+ case PACKED:
+ int numGroups = header >>> 1;
+ currentCount = numGroups * 8;
+ allocateBuffers(currentCount, numGroups * bitWidth);
+ // At the end of the file RLE data though, there might not be that many bytes left.
+ int bytesToRead = (int) Math.ceil(currentCount * bitWidth / 8.0);
+ bytesToRead = Math.min(bytesToRead, in.available());
+ readFully(bytes, bytesToRead);
+ for (int valueIndex = 0, byteIndex = 0; valueIndex < currentCount; valueIndex += 8, byteIndex +=
+ bitWidth) {
+ packer.unpack8Values(bytes, byteIndex, currentBuffer, valueIndex);
+ }
+ break;
+ default:
+ throw new ParquetDecodingException("not a valid mode " + mode);
+ }
+ }
+
+ private void allocateBuffers(int intBufferLength, int byteBufferLength) {
+ if (currentBuffer == null || currentBuffer.length < intBufferLength) {
+ currentBuffer = new int[intBufferLength];
+ } else {
+ Arrays.fill(currentBuffer, 0);
+ }
+ currentBufferLength = intBufferLength;
+
+ if (bytes == null || bytes.length < byteBufferLength) {
+ bytes = new byte[byteBufferLength];
+ } else {
+ Arrays.fill(bytes, (byte) 0);
+ }
+ }
+
+ private void readFully(byte[] b, int len) throws IOException {
+ if (len < 0)
+ throw new IndexOutOfBoundsException();
+ int n = 0;
+ while (n < len) {
+ int count = in.read(b, n, len - n);
+ if (count < 0)
+ throw new EOFException();
+ n += count;
+ }
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetDeltaBinaryPackingValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetDeltaBinaryPackingValuesWriter.java
new file mode 100644
index 0000000..3102063
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetDeltaBinaryPackingValuesWriter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.ParquetDeltaBinaryPackingConfig;
+import org.apache.asterix.column.bytes.stream.out.MultiTemporaryBufferBytesOutputStream;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * Re-implementation of {@link DeltaBinaryPackingValuesWriter}
+ */
+public abstract class AbstractParquetDeltaBinaryPackingValuesWriter extends AbstractParquetValuesWriter {
+
+ public static final int DEFAULT_NUM_BLOCK_VALUES = 128;
+
+ public static final int DEFAULT_NUM_MINIBLOCKS = 4;
+
+ protected final MultiTemporaryBufferBytesOutputStream outputStream;
+
+ /**
+ * stores blockSizeInValues, miniBlockNumInABlock and miniBlockSizeInValues
+ */
+ protected final ParquetDeltaBinaryPackingConfig config;
+
+ /**
+ * bit width for each mini block, reused between flushes
+ */
+ protected final int[] bitWidths;
+
+ protected int totalValueCount = 0;
+
+ /**
+ * a pointer to deltaBlockBuffer indicating the end of deltaBlockBuffer
+ * the number of values in the deltaBlockBuffer that haven't flushed to baos
+ * it will be reset after each flush
+ */
+ protected int deltaValuesToFlush = 0;
+
+ /**
+ * bytes buffer for a mini block, it is reused for each mini block.
+ * Therefore the size of biggest miniblock with bitwith of MAX_BITWITH is allocated
+ */
+ protected byte[] miniBlockByteBuffer;
+
+ /**
+ * Estimated element size after encoding
+ */
+ protected int estimatedElementSize = 0;
+ /**
+ * Estimated size for all non-flushed elements
+ */
+ protected int estimatedSize = 0;
+
+ protected AbstractParquetDeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum,
+ Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+ this.config = new ParquetDeltaBinaryPackingConfig(blockSizeInValues, miniBlockNum);
+ bitWidths = new int[config.getMiniBlockNumInABlock()];
+ outputStream = new MultiTemporaryBufferBytesOutputStream(multiPageOpRef);
+ }
+
+ protected void writeBitWidthForMiniBlock(int i) {
+ try {
+ BytesUtils.writeIntLittleEndianOnOneByte(outputStream, bitWidths[i]);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("can not write bit width for mini-block", e);
+ }
+ }
+
+ protected int getMiniBlockCountToFlush(double numberCount) {
+ return (int) Math.ceil(numberCount / config.getMiniBlockSizeInValues());
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ this.totalValueCount = 0;
+ this.outputStream.reset();
+ this.deltaValuesToFlush = 0;
+ }
+
+ @Override
+ public void close() {
+ this.totalValueCount = 0;
+ this.deltaValuesToFlush = 0;
+ outputStream.finish();
+ }
+
+ @Override
+ public int getEstimatedSize() {
+ return outputStream.size() + estimatedSize;
+ }
+
+ @Override
+ public int getAllocatedSize() {
+ return outputStream.capacity();
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetValuesWriter.java
new file mode 100644
index 0000000..b53ded2
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetValuesWriter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.values.ValuesWriter;
+
+/**
+ * Replaces {@link ValuesWriter}
+ */
+public abstract class AbstractParquetValuesWriter {
+
+ public abstract BytesInput getBytes();
+
+ /**
+ * called after getBytes() to reset the current buffer and start writing the next page
+ */
+ public abstract void reset() throws HyracksDataException;
+
+ /**
+ * Called to close the values writer. Any output stream is closed and can no longer be used.
+ * All resources are released.
+ */
+ public abstract void close();
+
+ public abstract int getEstimatedSize();
+
+ /**
+ * @return the allocated size of the buffer
+ */
+ public abstract int getAllocatedSize();
+
+ /**
+ * @param v the value to encode
+ */
+ public void writeBoolean(boolean v) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /**
+ * @param v the value to encode
+ * @param skipLengthBytes whether to skip the length bytes of {@link UTF8StringPointable} or not
+ */
+ public void writeBytes(IValueReference v, boolean skipLengthBytes) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /**
+ * @param v the value to encode
+ */
+ public void writeInteger(int v) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /**
+ * @param v the value to encode
+ */
+ public void writeLong(long v) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /**
+ * @param v the value to encode
+ */
+ public void writeDouble(double v) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaBinaryPackingValuesWriterForInteger.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaBinaryPackingValuesWriterForInteger.java
new file mode 100644
index 0000000..1c474fc
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaBinaryPackingValuesWriterForInteger.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * Re-implementation of {@link DeltaBinaryPackingValuesWriterForInteger}
+ */
+public class ParquetDeltaBinaryPackingValuesWriterForInteger extends AbstractParquetDeltaBinaryPackingValuesWriter {
+ /**
+ * max bitwidth for a mini block, it is used to allocate miniBlockByteBuffer which is
+ * reused between flushes.
+ */
+ private static final int MAX_BITWIDTH = 32;
+
+ private final int blockSizeInValues;
+ private final int miniBlockNumInABlock;
+ private final int miniBlockSizeInValues;
+
+ /**
+ * stores delta values starting from the 2nd value written(1st value is stored in header).
+ * It's reused between flushes
+ */
+ private final int[] deltaBlockBuffer;
+
+ /**
+ * firstValue is written to the header of the page
+ */
+ private int firstValue = 0;
+
+ /**
+ * cache previous written value for calculating delta
+ */
+ private int previousValue = 0;
+
+ /**
+ * min delta is written to the beginning of each block.
+ * it's zig-zag encoded. The deltas stored in each block is actually the difference to min delta,
+ * therefore are all positive
+ * it will be reset after each flush
+ */
+ private int minDeltaInCurrentBlock = Integer.MAX_VALUE;
+ private int maxDeltaInCurrentBlock = Integer.MIN_VALUE;
+ private int estimatedSize = 0;
+
+ public ParquetDeltaBinaryPackingValuesWriterForInteger(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+ this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, multiPageOpRef);
+ }
+
+ public ParquetDeltaBinaryPackingValuesWriterForInteger(int blockSizeInValues, int miniBlockNum,
+ Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+ super(blockSizeInValues, miniBlockNum, multiPageOpRef);
+ this.blockSizeInValues = blockSizeInValues;
+ this.miniBlockNumInABlock = miniBlockNum;
+ double miniSize = (double) blockSizeInValues / miniBlockNumInABlock;
+ Preconditions.checkArgument(miniSize % 8 == 0, "miniBlockSize must be multiple of 8, but it's " + miniSize);
+ this.miniBlockSizeInValues = (int) miniSize;
+
+ deltaBlockBuffer = new int[blockSizeInValues];
+ miniBlockByteBuffer = new byte[miniBlockSizeInValues * MAX_BITWIDTH];
+ }
+
+ @Override
+ public void writeInteger(int v) {
+ totalValueCount++;
+
+ if (totalValueCount == 1) {
+ firstValue = v;
+ previousValue = firstValue;
+ return;
+ }
+
+ // Calculate delta. The possible overflow is accounted for. The algorithm is correct because
+ // Java int is working as a modalar ring with base 2^32 and because of the plus and minus
+ // properties of a ring. http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+ int delta = v - previousValue;
+ previousValue = v;
+
+ deltaBlockBuffer[deltaValuesToFlush++] = delta;
+
+ if (delta < minDeltaInCurrentBlock) {
+ minDeltaInCurrentBlock = delta;
+ }
+
+ if (blockSizeInValues == deltaValuesToFlush) {
+ flushBlockBuffer();
+ } else {
+ //Recalibrate the estimated size
+ if (delta > maxDeltaInCurrentBlock) {
+ maxDeltaInCurrentBlock = delta;
+ estimatedElementSize =
+ (64 - Long.numberOfLeadingZeros(maxDeltaInCurrentBlock - minDeltaInCurrentBlock));
+ estimatedSize = estimatedElementSize * deltaValuesToFlush;
+ } else {
+ estimatedSize += estimatedElementSize;
+ }
+ }
+ }
+
+ private void flushBlockBuffer() {
+ // since we store the min delta, the deltas will be converted to be the difference to min delta
+ // and all positive
+ for (int i = 0; i < deltaValuesToFlush; i++) {
+ deltaBlockBuffer[i] = deltaBlockBuffer[i] - minDeltaInCurrentBlock;
+ }
+
+ writeMinDelta();
+ int miniBlocksToFlush = getMiniBlockCountToFlush(deltaValuesToFlush);
+
+ calculateBitWidthsForDeltaBlockBuffer(miniBlocksToFlush);
+ for (int i = 0; i < miniBlockNumInABlock; i++) {
+ writeBitWidthForMiniBlock(i);
+ }
+
+ for (int i = 0; i < miniBlocksToFlush; i++) {
+ // writing i th miniblock
+ int currentBitWidth = bitWidths[i];
+ int blockOffset = 0;
+ BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(currentBitWidth);
+ int miniBlockStart = i * miniBlockSizeInValues;
+ for (int j = miniBlockStart; j < (i + 1) * miniBlockSizeInValues; j += 8) {//8 values per pack
+ // mini block is atomic in terms of flushing
+ // This may write more values when reach to the end of data writing to last mini block,
+ // since it may not be aligned to miniblock,
+ // but doesn't matter. The reader uses total count to see if reached the end.
+ packer.pack8Values(deltaBlockBuffer, j, miniBlockByteBuffer, blockOffset);
+ blockOffset += currentBitWidth;
+ }
+ try {
+ outputStream.write(miniBlockByteBuffer, 0, blockOffset);
+ } catch (IOException e) {
+ throw new ParquetEncodingException(e);
+ }
+ }
+
+ minDeltaInCurrentBlock = Integer.MAX_VALUE;
+ deltaValuesToFlush = 0;
+ estimatedSize = 0;
+ maxDeltaInCurrentBlock = Integer.MIN_VALUE;
+ }
+
+ private void writeMinDelta() {
+ try {
+ BytesUtils.writeZigZagVarInt(minDeltaInCurrentBlock, outputStream);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("can not write min delta for block", e);
+ }
+ }
+
+ /**
+ * iterate through values in each mini block and calculate the bitWidths of max values.
+ *
+ * @param miniBlocksToFlush number of miniblocks
+ */
+ private void calculateBitWidthsForDeltaBlockBuffer(int miniBlocksToFlush) {
+ for (int miniBlockIndex = 0; miniBlockIndex < miniBlocksToFlush; miniBlockIndex++) {
+ int mask = 0;
+ int miniStart = miniBlockIndex * miniBlockSizeInValues;
+
+ /*
+ * The end of current mini block could be the end of current block(deltaValuesToFlush) buffer
+ * when data is not aligned to mini block
+ */
+ int miniEnd = Math.min((miniBlockIndex + 1) * miniBlockSizeInValues, deltaValuesToFlush);
+
+ for (int i = miniStart; i < miniEnd; i++) {
+ mask |= deltaBlockBuffer[i];
+ }
+ bitWidths[miniBlockIndex] = 32 - Integer.numberOfLeadingZeros(mask);
+ }
+ }
+
+ /**
+ * getBytes will trigger flushing block buffer, DO NOT write after getBytes() is called without calling reset()
+ *
+ * @return a BytesInput that contains the encoded page data
+ */
+ @Override
+ public BytesInput getBytes() {
+ // The Page Header should include: blockSizeInValues, numberOfMiniBlocks, totalValueCount
+ if (deltaValuesToFlush != 0) {
+ flushBlockBuffer();
+ }
+ BytesInput configBytes = BytesInput.concat(BytesInput.fromUnsignedVarInt(blockSizeInValues),
+ BytesInput.fromUnsignedVarInt(miniBlockNumInABlock));
+ return BytesInput.concat(configBytes, BytesInput.fromUnsignedVarInt(totalValueCount),
+ BytesInput.fromZigZagVarInt(firstValue), outputStream.asBytesInput());
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ super.reset();
+ this.minDeltaInCurrentBlock = Integer.MAX_VALUE;
+ estimatedSize = 0;
+ maxDeltaInCurrentBlock = Integer.MIN_VALUE;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ this.minDeltaInCurrentBlock = Integer.MAX_VALUE;
+ estimatedSize = 0;
+ maxDeltaInCurrentBlock = Integer.MIN_VALUE;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaBinaryPackingValuesWriterForLong.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaBinaryPackingValuesWriterForLong.java
new file mode 100644
index 0000000..6ba40c1
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaBinaryPackingValuesWriterForLong.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bitpacking.BytePackerForLong;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * Re-implementation of {@link DeltaBinaryPackingValuesWriterForLong}
+ */
+public class ParquetDeltaBinaryPackingValuesWriterForLong extends AbstractParquetDeltaBinaryPackingValuesWriter {
+ /**
+ * max bitwidth for a mini block, it is used to allocate miniBlockByteBuffer which is
+ * reused between flushes.
+ */
+ private static final int MAX_BITWIDTH = 64;
+
+ private final int blockSizeInValues;
+ private final int miniBlockNumInABlock;
+ private final int miniBlockSizeInValues;
+
+ /**
+ * stores delta values starting from the 2nd value written(1st value is stored in header).
+ * It's reused between flushes
+ */
+ private final long[] deltaBlockBuffer;
+
+ /**
+ * firstValue is written to the header of the page
+ */
+ private long firstValue = 0;
+
+ /**
+ * cache previous written value for calculating delta
+ */
+ private long previousValue = 0;
+
+ /**
+ * min delta is written to the beginning of each block.
+ * it's zig-zag encoded. The deltas stored in each block is actually the difference to min delta,
+ * therefore are all positive
+ * it will be reset after each flush
+ */
+ private long minDeltaInCurrentBlock = Long.MAX_VALUE;
+ private long maxDeltaInCurrentBlock = Long.MIN_VALUE;
+
+ public ParquetDeltaBinaryPackingValuesWriterForLong(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+ this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, multiPageOpRef);
+ }
+
+ public ParquetDeltaBinaryPackingValuesWriterForLong(int blockSizeInValues, int miniBlockNum,
+ Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+ super(blockSizeInValues, miniBlockNum, multiPageOpRef);
+ this.blockSizeInValues = blockSizeInValues;
+ this.miniBlockNumInABlock = miniBlockNum;
+ double miniSize = (double) blockSizeInValues / miniBlockNumInABlock;
+ Preconditions.checkArgument(miniSize % 8 == 0, "miniBlockSize must be multiple of 8, but it's " + miniSize);
+ this.miniBlockSizeInValues = (int) miniSize;
+ deltaBlockBuffer = new long[blockSizeInValues];
+ miniBlockByteBuffer = new byte[miniBlockSizeInValues * MAX_BITWIDTH];
+ }
+
+ @Override
+ public void writeLong(long v) {
+ totalValueCount++;
+
+ if (totalValueCount == 1) {
+ firstValue = v;
+ previousValue = firstValue;
+ return;
+ }
+
+ // Calculate delta. The possible overflow is accounted for. The algorithm is correct because
+ // Java long is working as a modalar ring with base 2^64 and because of the plus and minus
+ // properties of a ring. http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+ long delta = v - previousValue;
+ previousValue = v;
+
+ deltaBlockBuffer[deltaValuesToFlush++] = delta;
+
+ if (delta < minDeltaInCurrentBlock) {
+ minDeltaInCurrentBlock = delta;
+ }
+
+ if (blockSizeInValues == deltaValuesToFlush) {
+ flushBlockBuffer();
+ } else {
+ //Recalibrate the estimated size
+ if (delta > maxDeltaInCurrentBlock) {
+ maxDeltaInCurrentBlock = delta;
+ estimatedElementSize =
+ (64 - Long.numberOfLeadingZeros(maxDeltaInCurrentBlock - minDeltaInCurrentBlock));
+ estimatedSize = estimatedElementSize * deltaValuesToFlush;
+ } else {
+ estimatedSize += estimatedElementSize;
+ }
+ }
+ }
+
+ private void flushBlockBuffer() {
+ // since we store the min delta, the deltas will be converted to be the difference to min delta
+ // and all positive
+ for (int i = 0; i < deltaValuesToFlush; i++) {
+ deltaBlockBuffer[i] = deltaBlockBuffer[i] - minDeltaInCurrentBlock;
+ }
+
+ writeMinDelta();
+ int miniBlocksToFlush = getMiniBlockCountToFlush(deltaValuesToFlush);
+
+ calculateBitWidthsForDeltaBlockBuffer(miniBlocksToFlush);
+ int minBitWidth = Integer.MAX_VALUE;
+ for (int i = 0; i < miniBlockNumInABlock; i++) {
+ writeBitWidthForMiniBlock(i);
+ minBitWidth = Math.min(bitWidths[i], minBitWidth);
+ }
+
+ for (int i = 0; i < miniBlocksToFlush; i++) {
+ // writing i th miniblock
+ int currentBitWidth = bitWidths[i];
+ int blockOffset = 0;
+ // TODO: should this cache the packer?
+ BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(currentBitWidth);
+ int miniBlockStart = i * miniBlockSizeInValues;
+ // pack values into the miniblock buffer, 8 at a time to get exactly currentBitWidth bytes
+ for (int j = miniBlockStart; j < (i + 1) * miniBlockSizeInValues; j += 8) {
+ // mini block is atomic in terms of flushing
+ // This may write more values when reach to the end of data writing to last mini block,
+ // since it may not be aligned to miniblock,
+ // but doesn't matter. The reader uses total count to see if reached the end.
+ packer.pack8Values(deltaBlockBuffer, j, miniBlockByteBuffer, blockOffset);
+ blockOffset += currentBitWidth;
+ }
+ try {
+ outputStream.write(miniBlockByteBuffer, 0, blockOffset);
+ } catch (IOException e) {
+ throw new ParquetEncodingException(e);
+ }
+ }
+
+ minDeltaInCurrentBlock = Long.MAX_VALUE;
+ maxDeltaInCurrentBlock = Long.MIN_VALUE;
+ deltaValuesToFlush = 0;
+ estimatedElementSize = 0;
+ estimatedSize = 0;
+ }
+
+ private void writeMinDelta() {
+ try {
+ BytesUtils.writeZigZagVarLong(minDeltaInCurrentBlock, outputStream);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("can not write min delta for block", e);
+ }
+ }
+
+ /**
+ * iterate through values in each mini block and calculate the bitWidths of max values.
+ *
+ * @param miniBlocksToFlush number of miniblocks
+ */
+ private void calculateBitWidthsForDeltaBlockBuffer(int miniBlocksToFlush) {
+ for (int miniBlockIndex = 0; miniBlockIndex < miniBlocksToFlush; miniBlockIndex++) {
+ long mask = 0;
+ int miniStart = miniBlockIndex * miniBlockSizeInValues;
+
+ //The end of current mini block could be the end of current block(deltaValuesToFlush) buffer
+ //when data is not aligned to mini block
+ int miniEnd = Math.min((miniBlockIndex + 1) * miniBlockSizeInValues, deltaValuesToFlush);
+
+ for (int i = miniStart; i < miniEnd; i++) {
+ mask |= deltaBlockBuffer[i];
+ }
+ bitWidths[miniBlockIndex] = 64 - Long.numberOfLeadingZeros(mask);
+ }
+ }
+
+ /**
+ * getBytes will trigger flushing block buffer, DO NOT write after getBytes() is called without calling reset()
+ *
+ * @return a BytesInput that contains the encoded page data
+ */
+ @Override
+ public BytesInput getBytes() {
+ // The Page Header should include: blockSizeInValues, numberOfMiniBlocks, totalValueCount
+ if (deltaValuesToFlush != 0) {
+ flushBlockBuffer();
+ }
+ BytesInput configBytes = BytesInput.concat(BytesInput.fromUnsignedVarInt(blockSizeInValues),
+ BytesInput.fromUnsignedVarInt(miniBlockNumInABlock));
+ return BytesInput.concat(configBytes, BytesInput.fromUnsignedVarInt(totalValueCount),
+ BytesInput.fromZigZagVarLong(firstValue), outputStream.asBytesInput());
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ super.reset();
+ this.minDeltaInCurrentBlock = Long.MAX_VALUE;
+ this.maxDeltaInCurrentBlock = Long.MIN_VALUE;
+ previousValue = 0;
+ estimatedElementSize = 0;
+ estimatedSize = 0;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ this.minDeltaInCurrentBlock = Long.MAX_VALUE;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java
new file mode 100644
index 0000000..1b46116
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
+
+/**
+ * Re-implementation of {@link DeltaByteArrayWriter}
+ */
+public class ParquetDeltaByteArrayWriter extends AbstractParquetValuesWriter {
+ private static final IValueReference EMPTY_VALUE;
+ private final ParquetDeltaBinaryPackingValuesWriterForInteger prefixLengthWriter;
+ private final ParquetDeltaLengthByteArrayValuesWriter suffixWriter;
+ private final VoidPointable suffix;
+ private final ArrayBackedValueStorage previous = new ArrayBackedValueStorage();
+
+ static {
+ VoidPointable emptyPointable = new VoidPointable();
+ emptyPointable.set(new byte[0], 0, 0);
+ EMPTY_VALUE = emptyPointable;
+ }
+
+ public ParquetDeltaByteArrayWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+ this.prefixLengthWriter = new ParquetDeltaBinaryPackingValuesWriterForInteger(multiPageOpRef);
+ this.suffixWriter = new ParquetDeltaLengthByteArrayValuesWriter(multiPageOpRef);
+ suffix = new VoidPointable();
+ suffix.set(EMPTY_VALUE);
+ }
+
+ @Override
+ public BytesInput getBytes() {
+ BytesInput prefixBytes = prefixLengthWriter.getBytes();
+ BytesInput prefixLength = BytesInput.fromUnsignedVarInt((int) prefixBytes.size());
+ BytesInput suffixBytes = suffixWriter.getBytes();
+ return BytesInput.concat(prefixLength, prefixBytes, suffixBytes);
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ prefixLengthWriter.reset();
+ suffixWriter.reset();
+ previous.reset();
+ suffix.set(EMPTY_VALUE);
+ }
+
+ @Override
+ public void close() {
+ prefixLengthWriter.close();
+ suffixWriter.close();
+ previous.reset();
+ suffix.set(EMPTY_VALUE);
+ }
+
+ @Override
+ public int getEstimatedSize() {
+ return prefixLengthWriter.getEstimatedSize() + suffixWriter.getEstimatedSize();
+ }
+
+ @Override
+ public int getAllocatedSize() {
+ return prefixLengthWriter.getAllocatedSize() + suffixWriter.getAllocatedSize();
+ }
+
+ @Override
+ public void writeBytes(IValueReference value, boolean skipLengthBytes) {
+ byte[] bytes = value.getByteArray();
+ int start = value.getStartOffset();
+ int length = value.getLength();
+ if (skipLengthBytes) {
+ int lengthBytes = UTF8StringUtil.getNumBytesToStoreLength(bytes, start);
+ start += lengthBytes;
+ length -= lengthBytes;
+ }
+ writeBytes(bytes, start, length);
+ }
+
+ private void writeBytes(byte[] bytes, int offset, int length) {
+ final byte[] prevBytes = previous.getByteArray();
+ final int prevOffset = previous.getStartOffset();
+ final int minLength = Math.min(length, previous.getLength());
+ // find the number of matching prefix bytes between this value and the previous one
+ int i;
+ for (i = 0; (i < minLength) && (bytes[i + offset] == prevBytes[i + prevOffset]); i++);
+ prefixLengthWriter.writeInteger(i);
+ suffix.set(bytes, offset + i, length - i);
+ suffixWriter.writeBytes(suffix, false);
+ // We store as bytes could be evicted from the buffer cache
+ previous.set(bytes, offset, length);
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaLengthByteArrayValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaLengthByteArrayValuesWriter.java
new file mode 100644
index 0000000..afab48eb
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaLengthByteArrayValuesWriter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.out.MultiTemporaryBufferBytesOutputStream;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.LittleEndianDataOutputStream;
+import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * Re-implementation of {@link DeltaLengthByteArrayValuesWriter}
+ */
+public class ParquetDeltaLengthByteArrayValuesWriter extends AbstractParquetValuesWriter {
+ private final ParquetDeltaBinaryPackingValuesWriterForInteger lengthWriter;
+ private final MultiTemporaryBufferBytesOutputStream outputStream;
+ private final LittleEndianDataOutputStream out;
+
+ public ParquetDeltaLengthByteArrayValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+ outputStream = new MultiTemporaryBufferBytesOutputStream(multiPageOpRef);
+ out = new LittleEndianDataOutputStream(outputStream);
+ lengthWriter = new ParquetDeltaBinaryPackingValuesWriterForInteger(multiPageOpRef);
+ }
+
+ @Override
+ public void writeBytes(IValueReference value, boolean skipLengthBytes) {
+ try {
+ lengthWriter.writeInteger(value.getLength());
+ out.write(value.getByteArray(), value.getStartOffset(), value.getLength());
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write bytes", e);
+ }
+ }
+
+ @Override
+ public BytesInput getBytes() {
+ try {
+ out.flush();
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write page", e);
+ }
+ BytesInput lengthBytes = lengthWriter.getBytes();
+ BytesInput lengthSize = BytesInput.fromUnsignedVarInt((int) lengthBytes.size());
+ BytesInput arrayBytes = outputStream.asBytesInput();
+ return BytesInput.concat(lengthSize, lengthBytes, arrayBytes);
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ lengthWriter.reset();
+ outputStream.reset();
+ }
+
+ @Override
+ public void close() {
+ lengthWriter.close();
+ outputStream.finish();
+ }
+
+ @Override
+ public int getEstimatedSize() {
+ return lengthWriter.getEstimatedSize() + outputStream.size();
+ }
+
+ @Override
+ public int getAllocatedSize() {
+ return lengthWriter.getAllocatedSize() + outputStream.capacity();
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java
new file mode 100644
index 0000000..0298e59
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.asterix.column.bytes.stream.out.AbstractBytesOutputStream;
+import org.apache.asterix.column.bytes.stream.out.MultiTemporaryBufferBytesOutputStream;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.LittleEndianDataOutputStream;
+import org.apache.parquet.column.values.plain.PlainValuesWriter;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * Re-implementation of {@link PlainValuesWriter}
+ */
+public class ParquetPlainValuesWriter extends AbstractParquetValuesWriter {
+ public static final Charset CHARSET = StandardCharsets.UTF_8;
+
+ private final AbstractBytesOutputStream outputStream;
+ private final LittleEndianDataOutputStream out;
+
+ public ParquetPlainValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+ outputStream = new MultiTemporaryBufferBytesOutputStream(multiPageOpRef);
+ out = new LittleEndianDataOutputStream(outputStream);
+ }
+
+ @Override
+ public final void writeDouble(double v) {
+ try {
+ out.writeDouble(v);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write double", e);
+ }
+ }
+
+ @Override
+ public BytesInput getBytes() {
+ try {
+ out.flush();
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write page", e);
+ }
+ return outputStream.asBytesInput();
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ outputStream.reset();
+ }
+
+ @Override
+ public void close() {
+ outputStream.finish();
+ }
+
+ @Override
+ public int getEstimatedSize() {
+ return outputStream.size();
+ }
+
+ @Override
+ public int getAllocatedSize() {
+ return outputStream.capacity();
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetRunLengthBitPackingHybridEncoder.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetRunLengthBitPackingHybridEncoder.java
new file mode 100644
index 0000000..671e0a1
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetRunLengthBitPackingHybridEncoder.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.out.GrowableBytesOutputStream;
+import org.apache.asterix.column.bytes.stream.out.pointer.IReservedPointer;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
+
+/**
+ * Re-implementation of {@link RunLengthBitPackingHybridEncoder}
+ */
+public class ParquetRunLengthBitPackingHybridEncoder {
+ private final BytePacker packer;
+
+ private final GrowableBytesOutputStream outputStream;
+
+ /**
+ * The bit width used for bit-packing and for writing
+ * the repeated-value
+ */
+ private final int bitWidth;
+
+ /**
+ * Values that are bit packed 8 at at a time are packed into this
+ * buffer, which is then written to baos
+ */
+ private final byte[] packBuffer;
+
+ /**
+ * Previous value written, used to detect repeated values
+ */
+ private int previousValue;
+
+ /**
+ * We buffer 8 values at a time, and either bit pack them
+ * or discard them after writing a rle-run
+ */
+ private final int[] bufferedValues;
+ private int numBufferedValues;
+
+ /**
+ * How many times a value has been repeated
+ */
+ private int repeatCount;
+
+ /**
+ * How many groups of 8 values have been written
+ * to the current bit-packed-run
+ */
+ private int bitPackedGroupCount;
+
+ /**
+ * A "pointer" to a single byte in baos,
+ * which we use as our bit-packed-header. It's really
+ * the logical index of the byte in baos.
+ * <p>
+ * We are only using one byte for this header,
+ * which limits us to writing 504 values per bit-packed-run.
+ * <p>
+ * MSB must be 0 for varint encoding, LSB must be 1 to signify
+ * that this is a bit-packed-header leaves 6 bits to write the
+ * number of 8-groups -> (2^6 - 1) * 8 = 504
+ */
+ private final IReservedPointer bitPackedRunHeaderPointer;
+
+ private boolean toBytesCalled;
+
+ public ParquetRunLengthBitPackingHybridEncoder(int bitWidth) {
+
+ Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
+
+ this.bitWidth = bitWidth;
+ this.outputStream = new GrowableBytesOutputStream();
+ this.bitPackedRunHeaderPointer = outputStream.createPointer();
+ this.packBuffer = new byte[bitWidth];
+ this.bufferedValues = new int[8];
+ this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
+ reset(false);
+ }
+
+ private void reset(boolean resetBaos) {
+ if (resetBaos) {
+ this.outputStream.reset();
+ }
+ this.previousValue = 0;
+ this.numBufferedValues = 0;
+ this.repeatCount = 0;
+ this.bitPackedGroupCount = 0;
+ this.bitPackedRunHeaderPointer.reset();
+ this.toBytesCalled = false;
+ }
+
+ public void writeInt(int value) throws IOException {
+ if (value == previousValue) {
+ // keep track of how many times we've seen this value
+ // consecutively
+ ++repeatCount;
+
+ if (repeatCount >= 8) {
+ // we've seen this at least 8 times, we're
+ // certainly going to write an rle-run,
+ // so just keep on counting repeats for now
+ return;
+ }
+ } else {
+ // This is a new value, check if it signals the end of
+ // an rle-run
+ if (repeatCount >= 8) {
+ // it does! write an rle-run
+ writeRleRun();
+ }
+
+ // this is a new value so we've only seen it once
+ repeatCount = 1;
+ // start tracking this value for repeats
+ previousValue = value;
+ }
+
+ // We have not seen enough repeats to justify an rle-run yet,
+ // so buffer this value in case we decide to write a bit-packed-run
+ bufferedValues[numBufferedValues] = value;
+ ++numBufferedValues;
+
+ if (numBufferedValues == 8) {
+ // we've encountered less than 8 repeated values, so
+ // either start a new bit-packed-run or append to the
+ // current bit-packed-run
+ writeOrAppendBitPackedRun();
+ }
+ }
+
+ private void writeOrAppendBitPackedRun() throws IOException {
+ if (bitPackedGroupCount >= 63) {
+ // we've packed as many values as we can for this run,
+ // end it and start a new one
+ endPreviousBitPackedRun();
+ }
+
+ if (!bitPackedRunHeaderPointer.isSet()) {
+ // this is a new bit-packed-run, allocate a byte for the header
+ // and keep a "pointer" to it so that it can be mutated later
+ outputStream.reserveByte(bitPackedRunHeaderPointer);
+ }
+
+ packer.pack8Values(bufferedValues, 0, packBuffer, 0);
+ outputStream.write(packBuffer);
+
+ // empty the buffer, they've all been written
+ numBufferedValues = 0;
+
+ // clear the repeat count, as some repeated values
+ // may have just been bit packed into this run
+ repeatCount = 0;
+
+ ++bitPackedGroupCount;
+ }
+
+ /**
+ * If we are currently writing a bit-packed-run, update the
+ * bit-packed-header and consider this run to be over
+ * <p>
+ * does nothing if we're not currently writing a bit-packed run
+ */
+ private void endPreviousBitPackedRun() {
+ if (!bitPackedRunHeaderPointer.isSet()) {
+ // we're not currently in a bit-packed-run
+ return;
+ }
+
+ // create bit-packed-header, which needs to fit in 1 byte
+ byte bitPackHeader = (byte) ((bitPackedGroupCount << 1) | 1);
+
+ // update this byte
+ bitPackedRunHeaderPointer.setByte(bitPackHeader);
+
+ // mark that this run is over
+ bitPackedRunHeaderPointer.reset();
+
+ // reset the number of groups
+ bitPackedGroupCount = 0;
+ }
+
+ private void writeRleRun() throws IOException {
+ // we may have been working on a bit-packed-run
+ // so close that run if it exists before writing this
+ // rle-run
+ endPreviousBitPackedRun();
+
+ // write the rle-header (lsb of 0 signifies a rle run)
+ BytesUtils.writeUnsignedVarInt(repeatCount << 1, outputStream);
+ // write the repeated-value
+ BytesUtils.writeIntLittleEndianPaddedOnBitWidth(outputStream, previousValue, bitWidth);
+
+ // reset the repeat count
+ repeatCount = 0;
+
+ // throw away all the buffered values, they were just repeats and they've been written
+ numBufferedValues = 0;
+ }
+
+ public BytesInput toBytes() throws IOException {
+ Preconditions.checkArgument(!toBytesCalled, "You cannot call toBytes() more than once without calling reset()");
+
+ // write anything that is buffered / queued up for an rle-run
+ if (repeatCount >= 8) {
+ writeRleRun();
+ } else if (numBufferedValues > 0) {
+ for (int i = numBufferedValues; i < 8; i++) {
+ bufferedValues[i] = 0;
+ }
+ writeOrAppendBitPackedRun();
+ endPreviousBitPackedRun();
+ } else {
+ endPreviousBitPackedRun();
+ }
+
+ toBytesCalled = true;
+ return outputStream.asBytesInput();
+ }
+
+ /**
+ * Reset this encoder for re-use
+ */
+ public void reset() {
+ reset(true);
+ }
+
+ public void close() {
+ reset(false);
+ outputStream.finish();
+ }
+
+ public int getEstimatedSize() {
+ return outputStream.size() + repeatCount * bitWidth;
+ }
+
+ public int getAllocatedSize() {
+ return outputStream.capacity();
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java
new file mode 100644
index 0000000..b50143b
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.in;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+
+public abstract class AbstractBytesInputStream extends InputStream {
+
+ public abstract void resetAt(int bytesToSkip, AbstractBytesInputStream stream) throws IOException;
+
+ protected abstract void addBuffer(ByteBuffer buffer);
+
+ public abstract void read(IPointable pointable, int length) throws EOFException;
+
+ @Override
+ public abstract int read() throws IOException;
+
+ @Override
+ public abstract int read(byte[] bytes, int offset, int length) throws IOException;
+
+ @Override
+ public abstract long skip(long n);
+
+ public abstract int read(ByteBuffer out);
+
+ public abstract AbstractBytesInputStream remainingStream() throws EOFException;
+
+ public abstract AbstractBytesInputStream sliceStream(int length) throws EOFException;
+
+ @Override
+ public abstract void mark(int readLimit);
+
+ @Override
+ public abstract void reset() throws IOException;
+
+ public abstract void reset(IColumnBufferProvider bufferProvider) throws HyracksDataException;
+
+ @Override
+ public abstract int available();
+
+ public final void skipFully(long n) throws IOException {
+ long skipped = skip(n);
+ if (skipped < n) {
+ throw new EOFException("Not enough bytes to skip: " + skipped + " < " + n);
+ }
+ }
+
+ @Override
+ public final boolean markSupported() {
+ return true;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java
new file mode 100644
index 0000000..833765c
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.in;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+
+public final class ByteBufferInputStream extends AbstractBytesInputStream {
+ private ByteBuffer buffer;
+ private int mark = -1;
+
+ @Override
+ public void reset(IColumnBufferProvider bufferProvider) {
+ addBuffer(bufferProvider.getBuffer());
+ }
+
+ @Override
+ protected void addBuffer(ByteBuffer buffer) {
+ this.buffer = buffer;
+ mark = -1;
+ }
+
+ @Override
+ public void resetAt(int bytesToSkip, AbstractBytesInputStream stream) throws IOException {
+ ByteBufferInputStream in = (ByteBufferInputStream) stream;
+ buffer = in.buffer.duplicate();
+ buffer.position(buffer.position() + bytesToSkip);
+ mark = -1;
+ }
+
+ @Override
+ public void read(IPointable pointable, int length) throws EOFException {
+ if (buffer.remaining() < length) {
+ throw new EOFException();
+ }
+
+ pointable.set(buffer.array(), buffer.position(), length);
+ buffer.position(buffer.position() + length);
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (!buffer.hasRemaining()) {
+ throw new EOFException();
+ }
+ return buffer.get() & 0xFF; // as unsigned
+ }
+
+ @Override
+ public int read(byte[] bytes, int offset, int length) throws IOException {
+ if (length == 0) {
+ return 0;
+ }
+
+ int remaining = buffer.remaining();
+ if (remaining <= 0) {
+ return -1;
+ }
+
+ int bytesToRead = Math.min(remaining, length);
+ buffer.get(bytes, offset, bytesToRead);
+
+ return bytesToRead;
+ }
+
+ @Override
+ public long skip(long n) {
+ if (n == 0) {
+ return 0;
+ }
+
+ if (!buffer.hasRemaining()) {
+ return -1;
+ }
+
+ // buffer.remaining is an int, so this will always fit in an int
+ int bytesToSkip = (int) Math.min(buffer.remaining(), n);
+ buffer.position(buffer.position() + bytesToSkip);
+
+ return bytesToSkip;
+ }
+
+ @Override
+ public int read(ByteBuffer out) {
+ int bytesToCopy;
+ ByteBuffer copyBuffer;
+ if (buffer.remaining() <= out.remaining()) {
+ // copy the entire buffer
+ bytesToCopy = buffer.remaining();
+ copyBuffer = buffer;
+ } else {
+ // copy a slice of the current buffer
+ bytesToCopy = out.remaining();
+ copyBuffer = buffer.duplicate();
+ copyBuffer.position(buffer.position());
+ copyBuffer.limit(buffer.position() + bytesToCopy);
+ buffer.position(buffer.position() + bytesToCopy);
+ }
+
+ out.put(copyBuffer);
+ out.flip();
+
+ return bytesToCopy;
+ }
+
+ @Override
+ public AbstractBytesInputStream sliceStream(int length) throws EOFException {
+ if (buffer.remaining() < length) {
+ throw new EOFException();
+ }
+ ByteBuffer copy = buffer.duplicate();
+ copy.position(buffer.position());
+ copy.limit(buffer.position() + length);
+ ByteBufferInputStream in = new ByteBufferInputStream();
+ in.addBuffer(copy);
+ buffer.position(buffer.position() + length);
+ return in;
+ }
+
+ @Override
+ public AbstractBytesInputStream remainingStream() {
+ ByteBuffer remaining = buffer.duplicate();
+ remaining.position(buffer.position());
+ buffer.position(buffer.limit());
+ ByteBufferInputStream in = new ByteBufferInputStream();
+ in.addBuffer(remaining);
+ return in;
+ }
+
+ @Override
+ public void mark(int readlimit) {
+ this.mark = buffer.position();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ if (mark >= 0) {
+ buffer.position(mark);
+ this.mark = -1;
+ } else {
+ throw new IOException("No mark defined");
+ }
+ }
+
+ @Override
+ public int available() {
+ return buffer.remaining();
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiByteBufferInputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiByteBufferInputStream.java
new file mode 100644
index 0000000..31f8179
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiByteBufferInputStream.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.in;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+
+public final class MultiByteBufferInputStream extends AbstractBytesInputStream {
+ private static final ByteBuffer EMPTY;
+
+ static {
+ EMPTY = ByteBuffer.allocate(0);
+ EMPTY.limit(0);
+ }
+
+ private final Queue<ByteBuffer> buffers;
+ private final ArrayBackedValueStorage tempPointableStorage;
+ private int length;
+
+ private ByteBuffer current;
+ private int position;
+
+ public MultiByteBufferInputStream() {
+ this.buffers = new ArrayDeque<>();
+ tempPointableStorage = new ArrayBackedValueStorage();
+ this.current = EMPTY;
+ this.position = 0;
+ this.length = 0;
+
+ }
+
+ private MultiByteBufferInputStream(MultiByteBufferInputStream original, int len) throws EOFException {
+ buffers = new ArrayDeque<>();
+ tempPointableStorage = new ArrayBackedValueStorage();
+ position = original.position;
+ length = original.length;
+ buffers.addAll(original.sliceBuffers(len));
+ nextBuffer();
+ }
+
+ @Override
+ public void reset(IColumnBufferProvider bufferProvider) throws HyracksDataException {
+ reset();
+ length = bufferProvider.getLength();
+ if (length > 0) {
+ bufferProvider.readAll(buffers);
+ nextBuffer();
+ }
+ }
+
+ @Override
+ protected void addBuffer(ByteBuffer buffer) {
+ buffers.add(buffer);
+ length += buffer.remaining();
+ }
+
+ @Override
+ public void resetAt(int bytesToSkip, AbstractBytesInputStream stream) throws IOException {
+ MultiByteBufferInputStream original = (MultiByteBufferInputStream) stream;
+ buffers.clear();
+ position = original.position;
+ length = original.length;
+ current = original.current.duplicate();
+ for (ByteBuffer buffer : original.buffers) {
+ buffers.add(buffer.duplicate());
+ }
+
+ if (skip(bytesToSkip) != bytesToSkip) {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public long skip(long n) {
+ if (n <= 0) {
+ return 0;
+ }
+
+ if (current == null) {
+ return -1;
+ }
+
+ long bytesSkipped = 0;
+ while (bytesSkipped < n) {
+ if (current.remaining() > 0) {
+ long bytesToSkip = Math.min(n - bytesSkipped, current.remaining());
+ current.position(current.position() + (int) bytesToSkip);
+ bytesSkipped += bytesToSkip;
+ this.position += bytesToSkip;
+ } else if (!nextBuffer()) {
+ // there are no more buffers
+ return bytesSkipped > 0 ? bytesSkipped : -1;
+ }
+ }
+
+ return bytesSkipped;
+ }
+
+ @Override
+ public int read(ByteBuffer out) {
+ int len = out.remaining();
+ if (len <= 0) {
+ return 0;
+ }
+
+ if (current == null) {
+ return -1;
+ }
+
+ int bytesCopied = 0;
+ while (bytesCopied < len) {
+ if (current.remaining() > 0) {
+ int bytesToCopy;
+ ByteBuffer copyBuffer;
+ if (current.remaining() <= out.remaining()) {
+ // copy all the current buffer
+ bytesToCopy = current.remaining();
+ copyBuffer = current;
+ } else {
+ // copy a slice of the current buffer
+ bytesToCopy = out.remaining();
+ copyBuffer = current.duplicate();
+ copyBuffer.limit(copyBuffer.position() + bytesToCopy);
+ current.position(copyBuffer.position() + bytesToCopy);
+ }
+
+ out.put(copyBuffer);
+ bytesCopied += bytesToCopy;
+ this.position += bytesToCopy;
+
+ } else if (!nextBuffer()) {
+ // there are no more buffers
+ return bytesCopied > 0 ? bytesCopied : -1;
+ }
+ }
+
+ return bytesCopied;
+ }
+
+ @Override
+ public AbstractBytesInputStream sliceStream(int length) throws EOFException {
+ return new MultiByteBufferInputStream(this, length);
+ }
+
+ @Override
+ public AbstractBytesInputStream remainingStream() throws EOFException {
+ return new MultiByteBufferInputStream(this, length - position);
+ }
+
+ @Override
+ public int read(byte[] bytes, int off, int len) {
+ if (len <= 0) {
+ if (len < 0) {
+ throw new IndexOutOfBoundsException("Read length must be greater than 0: " + len);
+ }
+ return 0;
+ }
+
+ if (current == null) {
+ return -1;
+ }
+
+ int bytesRead = 0;
+ while (bytesRead < len) {
+ if (current.remaining() > 0) {
+ int bytesToRead = Math.min(len - bytesRead, current.remaining());
+ current.get(bytes, off + bytesRead, bytesToRead);
+ bytesRead += bytesToRead;
+ this.position += bytesToRead;
+ } else if (!nextBuffer()) {
+ // there are no more buffers
+ return bytesRead > 0 ? bytesRead : -1;
+ }
+ }
+
+ return bytesRead;
+ }
+
+ @Override
+ public int read(byte[] bytes) {
+ return read(bytes, 0, bytes.length);
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (current == null) {
+ throw new EOFException();
+ }
+
+ while (true) {
+ if (current.remaining() > 0) {
+ this.position += 1;
+ return current.get() & 0xFF; // as unsigned
+ } else if (!nextBuffer()) {
+ // there are no more buffers
+ throw new EOFException();
+ }
+ }
+ }
+
+ @Override
+ public void read(IPointable pointable, int length) throws EOFException {
+ if (current.remaining() >= length) {
+ pointable.set(current.array(), current.position(), length);
+ current.position(current.position() + length);
+ position += length;
+ } else {
+ tempPointableStorage.setSize(length);
+ //Read first half part from the current buffer
+ int bytesRead = read(tempPointableStorage.getByteArray(), 0, length);
+ if (bytesRead != length) {
+ throw new EOFException();
+ }
+ pointable.set(tempPointableStorage);
+ }
+ }
+
+ @Override
+ public int available() {
+ return length - position;
+ }
+
+ @Override
+ public void mark(int readLimit) {
+ throw new UnsupportedOperationException("reset() is not supported");
+ }
+
+ @Override
+ public void reset() {
+ buffers.clear();
+ this.current = EMPTY;
+ this.position = 0;
+ this.length = 0;
+ }
+
+ private List<ByteBuffer> sliceBuffers(long length) throws EOFException {
+ if (length <= 0) {
+ return Collections.emptyList();
+ }
+
+ if (current == null) {
+ throw new EOFException();
+ }
+
+ List<ByteBuffer> sliceBuffers = new ArrayList<>();
+ long bytesAccumulated = 0;
+ while (bytesAccumulated < length) {
+ if (current.remaining() > 0) {
+ // get a slice of the current buffer to return
+ // always fits in an int because remaining returns an int that is >= 0
+ int bufLen = (int) Math.min(length - bytesAccumulated, current.remaining());
+ ByteBuffer slice = current.duplicate();
+ slice.limit(slice.position() + bufLen);
+ sliceBuffers.add(slice);
+ bytesAccumulated += bufLen;
+
+ // update state; the bytes are considered read
+ current.position(current.position() + bufLen);
+ this.position += bufLen;
+ } else if (!nextBuffer()) {
+ // there are no more buffers
+ throw new EOFException();
+ }
+ }
+
+ return sliceBuffers;
+ }
+
+ private boolean nextBuffer() {
+ if (buffers.isEmpty()) {
+ return false;
+ }
+ current = buffers.poll();
+ return true;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractBytesOutputStream.java
new file mode 100644
index 0000000..698eac4
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractBytesOutputStream.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.asterix.column.bytes.stream.out.pointer.IReservedPointer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.values.ValuesWriter;
+
+/**
+ * Extends {@link OutputStream} to include methods needed by {@link ValuesWriter}
+ */
+public abstract class AbstractBytesOutputStream extends OutputStream {
+ private final ParquetBytesInput bytesInput;
+
+ protected AbstractBytesOutputStream() {
+ bytesInput = new ParquetBytesInput(this);
+ }
+
+ @Override
+ public abstract void write(int b) throws IOException;
+
+ @Override
+ public final void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public abstract void write(byte[] b, int off, int len) throws IOException;
+
+ public final void write(IValueReference value) throws IOException {
+ write(value.getByteArray(), value.getStartOffset(), value.getLength());
+ }
+
+ public final BytesInput asBytesInput() {
+ return bytesInput;
+ }
+
+ public abstract void finish();
+
+ /**
+ * Reset output stream
+ */
+ public abstract void reset() throws HyracksDataException;
+
+ /**
+ * Reserve a byte at the current position of the stream
+ *
+ * @param pointer pointer that references the current position
+ */
+ public abstract void reserveByte(IReservedPointer pointer) throws IOException;
+
+ /**
+ * Reserve an integer at the current position of the stream
+ *
+ * @param pointer pointer that references the current position
+ */
+ public abstract void reserveInteger(IReservedPointer pointer) throws IOException;
+
+ /**
+ * @return a reusable instance of {@link IReservedPointer}
+ */
+ public abstract IReservedPointer createPointer();
+
+ /**
+ * @return Size of written value
+ */
+ public abstract int size();
+
+ /**
+ * @return Allocated buffer size
+ */
+ public abstract int capacity();
+
+ /**
+ * Write the content to another output stream
+ *
+ * @param outputStream output stream to write to
+ */
+ public abstract void writeTo(OutputStream outputStream) throws IOException;
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java
new file mode 100644
index 0000000..4b7c835
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.column.bytes.stream.out.pointer.ByteBufferReservedPointer;
+import org.apache.asterix.column.bytes.stream.out.pointer.IReservedPointer;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+
+abstract class AbstractMultiBufferBytesOutputStream extends AbstractBytesOutputStream {
+ protected final Mutable<IColumnWriteMultiPageOp> multiPageOpRef;
+ protected final List<ByteBuffer> buffers;
+ protected int currentBufferIndex;
+ protected int allocatedBytes;
+ protected int position;
+ protected ByteBuffer currentBuf;
+
+ AbstractMultiBufferBytesOutputStream(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+ this.multiPageOpRef = multiPageOpRef;
+ buffers = new ArrayList<>();
+ }
+
+ protected abstract ByteBuffer confiscateNewBuffer() throws HyracksDataException;
+
+ protected abstract void preReset() throws HyracksDataException;
+
+ @Override
+ public final void reset() throws HyracksDataException {
+ preReset();
+ position = 0;
+ currentBufferIndex = 0;
+ if (allocatedBytes == 0) {
+ allocateBuffer();
+ }
+ currentBufferIndex = 0;
+ currentBuf = buffers.get(0);
+ currentBuf.clear();
+ }
+
+ @Override
+ public final void write(int b) throws IOException {
+ ensureCapacity(1);
+ currentBuf.put((byte) b);
+ position++;
+ }
+
+ @Override
+ public final void write(byte[] b, int off, int len) throws IOException {
+ ensureCapacity(len);
+ int remaining = len;
+ int offset = off;
+ while (remaining > 0) {
+ setNextBufferIfNeeded();
+ int writeLength = Math.min(remaining, currentBuf.remaining());
+ currentBuf.put(b, offset, writeLength);
+ position += writeLength;
+ offset += writeLength;
+ remaining -= writeLength;
+ }
+ }
+
+ @Override
+ public void reserveByte(IReservedPointer pointer) throws IOException {
+ ensureCapacity(Byte.BYTES);
+ int offset = getCurrentBufferPosition();
+ currentBuf.put((byte) 0);
+ position += 1;
+ ((ByteBufferReservedPointer) pointer).setPointer(currentBuf, offset);
+ }
+
+ @Override
+ public final void reserveInteger(IReservedPointer pointer) throws HyracksDataException {
+ ensureCapacity(Integer.BYTES);
+ int offset = getCurrentBufferPosition();
+ currentBuf.putInt(0);
+ position += Integer.BYTES;
+ ((ByteBufferReservedPointer) pointer).setPointer(currentBuf, offset);
+ }
+
+ @Override
+ public final IReservedPointer createPointer() {
+ return new ByteBufferReservedPointer();
+ }
+
+ public final int getCurrentBufferPosition() {
+ return currentBuf.position();
+ }
+
+ @Override
+ public final int size() {
+ return position;
+ }
+
+ @Override
+ public final int capacity() {
+ return allocatedBytes;
+ }
+
+ @Override
+ public final void finish() {
+ currentBuf = null;
+ buffers.clear();
+ allocatedBytes = 0;
+ }
+
+ /* *************************************************
+ * Helper methods
+ * *************************************************
+ */
+
+ private void ensureCapacity(int length) throws HyracksDataException {
+ if (position + length > allocatedBytes) {
+ allocateMoreBuffers(length);
+ } else if (length > 0) {
+ setNextBufferIfNeeded();
+ }
+ }
+
+ private void allocateMoreBuffers(int length) throws HyracksDataException {
+ int neededSpace = length - currentBuf.remaining();
+ while (neededSpace > 0) {
+ neededSpace -= allocateBuffer();
+ }
+ setNextBufferIfNeeded();
+ }
+
+ private void setNextBufferIfNeeded() {
+ if (currentBuf.remaining() == 0) {
+ currentBuf = buffers.get(++currentBufferIndex);
+ currentBuf.clear();
+ }
+ }
+
+ private int allocateBuffer() throws HyracksDataException {
+ ByteBuffer buffer = confiscateNewBuffer();
+ buffers.add(buffer);
+ buffer.clear();
+ int size = buffer.capacity();
+ allocatedBytes += size;
+ return size;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ByteBufferOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ByteBufferOutputStream.java
new file mode 100644
index 0000000..8817ae6
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ByteBufferOutputStream.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public final class ByteBufferOutputStream extends OutputStream {
+ private ByteBuffer buffer;
+ private int startOffset;
+
+ public void reset(ByteBuffer buffer) {
+ this.buffer = buffer;
+ startOffset = buffer.position();
+ }
+
+ public int size() {
+ return buffer.position() - startOffset;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ buffer.put((byte) b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ buffer.put(b, off, len);
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/GrowableBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/GrowableBytesOutputStream.java
new file mode 100644
index 0000000..20daf7d
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/GrowableBytesOutputStream.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.asterix.column.bytes.stream.out.pointer.GrowableBytesPointer;
+import org.apache.asterix.column.bytes.stream.out.pointer.IReservedPointer;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public final class GrowableBytesOutputStream extends AbstractBytesOutputStream {
+ private final ArrayBackedValueStorage storage;
+
+ public GrowableBytesOutputStream() {
+ storage = new ArrayBackedValueStorage(128);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ storage.getDataOutput().write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ storage.getDataOutput().write(b, off, len);
+ }
+
+ @Override
+ public void finish() {
+ reset();
+ }
+
+ @Override
+ public void reset() {
+ storage.reset();
+ }
+
+ @Override
+ public void reserveByte(IReservedPointer pointer) throws IOException {
+ ((GrowableBytesPointer) pointer).setPointer(storage.getLength());
+ storage.getDataOutput().write(0);
+ }
+
+ @Override
+ public void reserveInteger(IReservedPointer pointer) throws IOException {
+ ((GrowableBytesPointer) pointer).setPointer(storage.getLength());
+ storage.getDataOutput().writeInt(0);
+ }
+
+ @Override
+ public IReservedPointer createPointer() {
+ return new GrowableBytesPointer(storage);
+ }
+
+ @Override
+ public int size() {
+ return storage.getLength();
+ }
+
+ @Override
+ public int capacity() {
+ return storage.getByteArray().length;
+ }
+
+ @Override
+ public void writeTo(OutputStream outputStream) throws IOException {
+ outputStream.write(storage.getByteArray(), storage.getStartOffset(), storage.getLength());
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiPersistentBufferBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiPersistentBufferBytesOutputStream.java
new file mode 100644
index 0000000..c910131
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiPersistentBufferBytesOutputStream.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+
+public final class MultiPersistentBufferBytesOutputStream extends AbstractMultiBufferBytesOutputStream {
+ public MultiPersistentBufferBytesOutputStream(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+ super(multiPageOpRef);
+ }
+
+ @Override
+ protected ByteBuffer confiscateNewBuffer() throws HyracksDataException {
+ return multiPageOpRef.getValue().confiscatePersistent();
+ }
+
+ @Override
+ protected void preReset() throws HyracksDataException {
+ if (allocatedBytes > 0) {
+ //Persist all buffers before resetting the stream
+ multiPageOpRef.getValue().persist();
+ allocatedBytes = 0;
+ buffers.clear();
+ }
+ }
+
+ @Override
+ public void writeTo(OutputStream outputStream) {
+ throw new IllegalAccessError("Persistent stream cannot be written to other stream");
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
new file mode 100644
index 0000000..cf2808e
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+
+public final class MultiTemporaryBufferBytesOutputStream extends AbstractMultiBufferBytesOutputStream {
+ public MultiTemporaryBufferBytesOutputStream(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+ super(multiPageOpRef);
+ }
+
+ @Override
+ protected void preReset() {
+ //NoOp
+ }
+
+ @Override
+ protected ByteBuffer confiscateNewBuffer() throws HyracksDataException {
+ return multiPageOpRef.getValue().confiscateTemporary();
+ }
+
+ @Override
+ public void writeTo(OutputStream outputStream) throws IOException {
+ int writtenSize = 0;
+ for (int i = 0; i < currentBufferIndex + 1; i++) {
+ ByteBuffer buffer = buffers.get(i);
+ outputStream.write(buffer.array(), 0, buffer.position());
+ writtenSize += buffer.position();
+ }
+ if (writtenSize != position) {
+ //Sanity check
+ throw new IllegalStateException("Size is different");
+ }
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ParquetBytesInput.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ParquetBytesInput.java
new file mode 100644
index 0000000..c5ad38e
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ParquetBytesInput.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.asterix.column.bytes.encoder.ParquetDeltaBinaryPackingValuesWriterForLong;
+import org.apache.parquet.bytes.BytesInput;
+
+/**
+ * A wrapper for {@link BytesInput} which is used to concatenate multiple {@link AbstractBytesOutputStream}
+ *
+ * @see ParquetDeltaBinaryPackingValuesWriterForLong#getBytes() as an example
+ */
+class ParquetBytesInput extends BytesInput {
+ private final AbstractBytesOutputStream outputStream;
+
+ ParquetBytesInput(AbstractBytesOutputStream outputStream) {
+ this.outputStream = outputStream;
+ }
+
+ @Override
+ public final void writeAllTo(OutputStream outputStream) throws IOException {
+ this.outputStream.writeTo(outputStream);
+ }
+
+ @Override
+ public final long size() {
+ return outputStream.size();
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/ByteBufferReservedPointer.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/ByteBufferReservedPointer.java
new file mode 100644
index 0000000..8773a31
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/ByteBufferReservedPointer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out.pointer;
+
+import java.nio.ByteBuffer;
+
+public class ByteBufferReservedPointer implements IReservedPointer {
+ private ByteBuffer buffer;
+ private int offset;
+
+ public void setPointer(ByteBuffer buffer, int offset) {
+ this.buffer = buffer;
+ this.offset = offset;
+ }
+
+ @Override
+ public void setByte(byte value) {
+ buffer.put(offset, value);
+ }
+
+ @Override
+ public void setInteger(int value) {
+ buffer.putInt(offset, value);
+ }
+
+ @Override
+ public void reset() {
+ buffer = null;
+ }
+
+ @Override
+ public boolean isSet() {
+ return buffer != null;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/GrowableBytesPointer.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/GrowableBytesPointer.java
new file mode 100644
index 0000000..0863c72
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/GrowableBytesPointer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out.pointer;
+
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class GrowableBytesPointer implements IReservedPointer {
+ private final ArrayBackedValueStorage storage;
+ private int offset;
+
+ public GrowableBytesPointer(ArrayBackedValueStorage storage) {
+ this.storage = storage;
+ }
+
+ public void setPointer(int offset) {
+ this.offset = offset;
+ }
+
+ @Override
+ public void setByte(byte value) {
+ storage.getByteArray()[offset] = value;
+ }
+
+ @Override
+ public void setInteger(int value) {
+ IntegerPointable.setInteger(storage.getByteArray(), offset, value);
+ }
+
+ @Override
+ public void reset() {
+ offset = -1;
+ }
+
+ @Override
+ public boolean isSet() {
+ return offset >= 0;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/IReservedPointer.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/IReservedPointer.java
new file mode 100644
index 0000000..46c4d36
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/IReservedPointer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out.pointer;
+
+import org.apache.asterix.column.bytes.stream.out.AbstractBytesOutputStream;
+
+/**
+ * Pointer that reference a position in {@link AbstractBytesOutputStream}
+ */
+public interface IReservedPointer {
+ /**
+ * Set a byte value at the pointer's position
+ *
+ * @param value byte value to be set
+ */
+ void setByte(byte value);
+
+ /**
+ * Set an integer value at the pointer's position
+ *
+ * @param value integer value to be set
+ */
+ void setInteger(int value);
+
+ /**
+ * Reset the pointer
+ */
+ void reset();
+
+ /**
+ * @return whether the pointer is set or not
+ */
+ boolean isSet();
+}
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index bff631f..85ee76d 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -942,6 +942,7 @@
<module>asterix-license</module>
<module>asterix-geo</module>
<module>asterix-spidersilk</module>
+ <module>asterix-column</module>
</modules>
<dependencyManagement>
@@ -1384,6 +1385,11 @@
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-am-lsm-btree-column</artifactId>
+ <version>${hyracks.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-storage-am-lsm-rtree</artifactId>
<version>${hyracks.version}</version>
</dependency>
@@ -1897,6 +1903,28 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-common</artifactId>
+ <version>${parquet.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-encoding</artifactId>
+ <version>${parquet.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-data-core</artifactId>
<version>1.1.0</version>
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ArrayBackedValueStorage.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ArrayBackedValueStorage.java
index d5a4481..d4feff6 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ArrayBackedValueStorage.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ArrayBackedValueStorage.java
@@ -65,8 +65,12 @@
}
public void append(IValueReference value) throws HyracksDataException {
+ append(value.getByteArray(), value.getStartOffset(), value.getLength());
+ }
+
+ public void append(byte[] bytes, int start, int length) throws HyracksDataException {
try {
- data.append(value);
+ data.append(bytes, start, length);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
index 9e4a297..24682a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
@@ -87,5 +87,9 @@
<artifactId>hyracks-util</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java
index 2ffa1bb..0c95500 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java
@@ -75,4 +75,6 @@
* Calls {@link IBufferCache#unpin(ICachedPage)} for all columns' pages
*/
void unpinColumnsPages() throws HyracksDataException;
+
+ void close();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
index d0b7e2b..fe980cc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
@@ -194,6 +194,7 @@
@Override
public void doClose() throws HyracksDataException {
+ frameTuple.close();
releasePages();
page0 = null;
pred = null;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
index 5a3b111..d39f94e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
@@ -29,14 +29,19 @@
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public abstract class AbstractColumnTupleReference implements IColumnTupleIterator {
+ private static final Logger LOGGER = LogManager.getLogger();
private static final String UNSUPPORTED_OPERATION_MSG = "Operation is not supported for column tuples";
private final int componentIndex;
private final ColumnBTreeReadLeafFrame frame;
private final IColumnBufferProvider[] primaryKeyBufferProviders;
private final IColumnBufferProvider[] buffersProviders;
private final int numberOfPrimaryKeys;
+ private int totalNumberOfPages;
+ private int numOfSkippedPages;
protected int tupleIndex;
/**
@@ -68,6 +73,8 @@
buffersProviders[i] = new ColumnSingleBufferProvider(columnIndex);
}
}
+ totalNumberOfPages = 0;
+ numOfSkippedPages = 0;
}
@Override
@@ -96,7 +103,10 @@
provider.reset(frame);
startColumn(provider, tupleIndex, i, numberOfTuples);
}
+ } else {
+ numOfSkippedPages++;
}
+ totalNumberOfPages++;
}
protected abstract boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples);
@@ -137,6 +147,13 @@
}
}
+ @Override
+ public final void close() {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Skipped {} pages out of {} in total", numOfSkippedPages, totalNumberOfPages);
+ }
+ }
+
/* *************************************************************
* Unsupported Operations
* *************************************************************
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
index c0475b1..cde79cb 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
@@ -221,6 +221,10 @@
return VarLenIntEncoderDecoder.decode(b, s);
}
+ public static int getNumBytesToStoreLength(byte[] bytes, int start) {
+ return getNumBytesToStoreLength(getUTFLength(bytes, start));
+ }
+
public static int getNumBytesToStoreLength(int strlen) {
return VarLenIntEncoderDecoder.getBytesRequired(strlen);
}