[ASTERIXDB-3165][STO] Avoid encoding primary keys
- user model changes: no
- storage format changes: yes
- interface changes: no
Details:
This patch removes the encoding for PKs to
allow for more efficient point lookups --
especially for upserting with secondary indexes.
Storage changes:
PKs are no longer encoded.
Change-Id: Id361d0d41b54a7ea84ec9212506809f5e0befc84
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17486
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java
index 9f1809d..5e2ef28 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java
@@ -39,10 +39,8 @@
this.reader = reader;
}
- public final void reset(AbstractBytesInputStream in, int startIndex, int numberOfTuples)
- throws HyracksDataException {
+ public final void reset(AbstractBytesInputStream in, int numberOfTuples) throws HyracksDataException {
reader.reset(in, numberOfTuples);
- reader.skip(startIndex);
}
@Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDoublePlainValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDoublePlainValuesReader.java
deleted file mode 100644
index 196bec2..0000000
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDoublePlainValuesReader.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.column.bytes.decoder;
-
-import java.io.IOException;
-
-import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
-import org.apache.parquet.bytes.LittleEndianDataInputStream;
-import org.apache.parquet.io.ParquetDecodingException;
-
-public class ParquetDoublePlainValuesReader extends AbstractParquetValuesReader {
- private LittleEndianDataInputStream in;
-
- @Override
- public void initFromPage(AbstractBytesInputStream stream) throws IOException {
- this.in = new LittleEndianDataInputStream(stream.remainingStream());
- }
-
- @Override
- public void skip() {
- try {
- in.skipBytes(8);
- } catch (IOException e) {
- throw new ParquetDecodingException("could not skip double", e);
- }
- }
-
- @Override
- public double readDouble() {
- try {
- return in.readDouble();
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read double", e);
- }
- }
-}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetPlainFixedLengthValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetPlainFixedLengthValuesReader.java
new file mode 100644
index 0000000..07713e1
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetPlainFixedLengthValuesReader.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.decoder;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.column.bytes.stream.in.ValueInputStream;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.io.ParquetDecodingException;
+
+public class ParquetPlainFixedLengthValuesReader extends AbstractParquetValuesReader {
+ private final ValueInputStream in;
+ private final int valueLength;
+ private final IPointable valueStorage;
+
+ public ParquetPlainFixedLengthValuesReader(int valueLength) {
+ in = new ValueInputStream();
+ this.valueLength = valueLength;
+ valueStorage = null;
+ }
+
+ public ParquetPlainFixedLengthValuesReader(IPointable valueStorage) {
+ in = new ValueInputStream();
+ this.valueLength = valueStorage.getByteArray().length;
+ this.valueStorage = valueStorage;
+ }
+
+ @Override
+ public void initFromPage(AbstractBytesInputStream stream) throws EOFException {
+ in.reset(stream.remainingStream());
+ }
+
+ @Override
+ public void skip() {
+ try {
+ in.skipBytes(valueLength);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not skip double", e);
+ }
+ }
+
+ @Override
+ public long readLong() {
+ try {
+ return in.readLong();
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read double", e);
+ }
+ }
+
+ @Override
+ public double readDouble() {
+ try {
+ return in.readDouble();
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read double", e);
+ }
+ }
+
+ @Override
+ public IValueReference readBytes() {
+ try {
+ return in.readBytes(valueStorage, valueLength);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read bytes", e);
+ }
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java
index 1b46116..4b09e4d 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java
@@ -91,6 +91,8 @@
int start = value.getStartOffset();
int length = value.getLength();
if (skipLengthBytes) {
+ // Length bytes are skipped so the prefix encoding works properly (e.g., "123", "1234")
+ // the prefix "123" is a common substring between the two; however, their lengths are not
int lengthBytes = UTF8StringUtil.getNumBytesToStoreLength(bytes, start);
start += lengthBytes;
length -= lengthBytes;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java
similarity index 65%
rename from asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java
rename to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java
index 0298e59..2aba7d2 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java
@@ -19,31 +19,46 @@
package org.apache.asterix.column.bytes.encoder;
import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
import org.apache.asterix.column.bytes.stream.out.AbstractBytesOutputStream;
import org.apache.asterix.column.bytes.stream.out.MultiTemporaryBufferBytesOutputStream;
+import org.apache.asterix.column.bytes.stream.out.ValueOutputStream;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.bytes.LittleEndianDataOutputStream;
import org.apache.parquet.column.values.plain.PlainValuesWriter;
import org.apache.parquet.io.ParquetEncodingException;
/**
* Re-implementation of {@link PlainValuesWriter}
*/
-public class ParquetPlainValuesWriter extends AbstractParquetValuesWriter {
- public static final Charset CHARSET = StandardCharsets.UTF_8;
-
+public class ParquetPlainFixedLengthValuesWriter extends AbstractParquetValuesWriter {
private final AbstractBytesOutputStream outputStream;
- private final LittleEndianDataOutputStream out;
+ private final ValueOutputStream out;
- public ParquetPlainValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+ public ParquetPlainFixedLengthValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
outputStream = new MultiTemporaryBufferBytesOutputStream(multiPageOpRef);
- out = new LittleEndianDataOutputStream(outputStream);
+ out = new ValueOutputStream(outputStream);
+ }
+
+ @Override
+ public void writeInteger(int v) {
+ try {
+ out.writeInt(v);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write int", e);
+ }
+ }
+
+ @Override
+ public void writeLong(long v) {
+ try {
+ out.writeLong(v);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write long", e);
+ }
}
@Override
@@ -55,6 +70,21 @@
}
}
+ /**
+ * Should only be used for UUID
+ *
+ * @param v the value to encode
+ * @param skipLengthBytes ignored
+ */
+ @Override
+ public void writeBytes(IValueReference v, boolean skipLengthBytes) {
+ try {
+ out.write(v.getByteArray(), v.getStartOffset(), v.getLength());
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write bytes", e);
+ }
+ }
+
@Override
public BytesInput getBytes() {
try {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java
new file mode 100644
index 0000000..63697bc
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.out.AbstractBytesOutputStream;
+import org.apache.asterix.column.bytes.stream.out.GrowableBytesOutputStream;
+import org.apache.asterix.column.bytes.stream.out.MultiTemporaryBufferBytesOutputStream;
+import org.apache.asterix.column.bytes.stream.out.ValueOutputStream;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.io.ParquetEncodingException;
+
+public class ParquetPlainVariableLengthValuesWriter extends AbstractParquetValuesWriter {
+ private final GrowableBytesOutputStream offsetStream;
+ private final AbstractBytesOutputStream valueStream;
+ private final ValueOutputStream offsetWriterStream;
+
+ public ParquetPlainVariableLengthValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+ offsetStream = new GrowableBytesOutputStream();
+ valueStream = new MultiTemporaryBufferBytesOutputStream(multiPageOpRef);
+ offsetWriterStream = new ValueOutputStream(offsetStream);
+ }
+
+ @Override
+ public void writeBytes(IValueReference v, boolean skipLengthBytes) {
+ try {
+ offsetWriterStream.writeInt(valueStream.size());
+ valueStream.write(v);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write bytes", e);
+ }
+ }
+
+ @Override
+ public BytesInput getBytes() {
+ try {
+ offsetStream.flush();
+ valueStream.flush();
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write page", e);
+ }
+ return BytesInput.concat(offsetStream.asBytesInput(), valueStream.asBytesInput());
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ offsetStream.reset();
+ valueStream.reset();
+ }
+
+ @Override
+ public void close() {
+ offsetStream.finish();
+ valueStream.finish();
+ }
+
+ @Override
+ public int getEstimatedSize() {
+ return offsetStream.size() + valueStream.size();
+ }
+
+ @Override
+ public int getAllocatedSize() {
+ return offsetStream.capacity() + valueStream.size();
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java
index b50143b..034df66 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java
@@ -61,6 +61,10 @@
@Override
public abstract int available();
+ public ByteBuffer getBuffer() {
+ throw new UnsupportedOperationException("Getting buffer is not supported");
+ }
+
public final void skipFully(long n) throws IOException {
long skipped = skip(n);
if (skipped < n) {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java
index 833765c..9c3dc08 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java
@@ -166,4 +166,9 @@
public int available() {
return buffer.remaining();
}
+
+ @Override
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ValueInputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ValueInputStream.java
new file mode 100644
index 0000000..ee975f1
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ValueInputStream.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.in;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.parquet.bytes.LittleEndianDataInputStream;
+
+/**
+ * Re-implementation of {@link LittleEndianDataInputStream}
+ */
+public final class ValueInputStream extends InputStream {
+ private final byte[] readBuffer;
+ private InputStream in;
+
+ public ValueInputStream() {
+ readBuffer = new byte[8];
+ }
+
+ public void reset(AbstractBytesInputStream in) {
+ this.in = in;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return in.read();
+ }
+
+ public int readInt() throws IOException {
+ readFully(readBuffer, Integer.BYTES);
+ return IntegerPointable.getInteger(readBuffer, 0);
+ }
+
+ public long readLong() throws IOException {
+ readFully(readBuffer, Long.BYTES);
+ return LongPointable.getLong(readBuffer, 0);
+ }
+
+ public double readDouble() throws IOException {
+ readFully(readBuffer, Double.BYTES);
+ return DoublePointable.getDouble(readBuffer, 0);
+ }
+
+ public IValueReference readBytes(IPointable valueStorage, int length) throws IOException {
+ readFully(valueStorage.getByteArray(), length);
+ return valueStorage;
+ }
+
+ public void skipBytes(int n) throws IOException {
+ int total = 0;
+ int cur;
+
+ while ((total < n) && ((cur = (int) in.skip(n - total)) > 0)) {
+ total += cur;
+ }
+ }
+
+ private void readFully(byte[] bytes, int len) throws IOException {
+ if (len < 0) {
+ throw new IndexOutOfBoundsException();
+ } else {
+ int count;
+ for (int n = 0; n < len; n += count) {
+ count = this.in.read(bytes, n, len - n);
+ if (count < 0) {
+ throw new EOFException();
+ }
+ }
+
+ }
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ValueOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ValueOutputStream.java
new file mode 100644
index 0000000..a106a00
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ValueOutputStream.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+
+public final class ValueOutputStream extends OutputStream {
+ private final OutputStream out;
+ private final byte[] writeBuffer;
+
+ public ValueOutputStream(OutputStream out) {
+ this.out = out;
+ writeBuffer = new byte[8];
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ public void writeInt(int value) throws IOException {
+ IntegerPointable.setInteger(writeBuffer, 0, value);
+ out.write(writeBuffer, 0, Integer.BYTES);
+ }
+
+ public void writeLong(long value) throws IOException {
+ LongPointable.setLong(writeBuffer, 0, value);
+ out.write(writeBuffer, 0, Long.BYTES);
+ }
+
+ public void writeDouble(double value) throws IOException {
+ DoublePointable.setDouble(writeBuffer, 0, value);
+ out.write(writeBuffer, 0, Double.BYTES);
+ }
+
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/ColumnAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/ColumnAssembler.java
index 2e60ee7..71d3ac6 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/ColumnAssembler.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/ColumnAssembler.java
@@ -51,8 +51,8 @@
tupleIndex = 0;
}
- public void resetColumn(AbstractBytesInputStream stream, int startIndex, int ordinal) throws HyracksDataException {
- assemblers.get(ordinal).reset(stream, startIndex, numberOfTuples);
+ public void resetColumn(AbstractBytesInputStream stream, int ordinal) throws HyracksDataException {
+ assemblers.get(ordinal).reset(stream, numberOfTuples);
}
public int getColumnIndex(int ordinal) {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
index f2407d3..b9babf1 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
@@ -31,8 +31,8 @@
import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
import org.apache.asterix.column.metadata.schema.visitor.SchemaClipperVisitor;
-import org.apache.asterix.column.values.IColumnValuesReader;
import org.apache.asterix.column.values.IColumnValuesReaderFactory;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
import org.apache.asterix.column.values.reader.filter.FilterAccessorProvider;
import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluator;
import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluatorFactory;
@@ -57,14 +57,14 @@
public class QueryColumnMetadata extends AbstractColumnImmutableReadMetadata {
private static final Logger LOGGER = LogManager.getLogger();
private final FieldNamesDictionary fieldNamesDictionary;
- private final IColumnValuesReader[] primaryKeyReaders;
+ private final PrimitiveColumnValuesReader[] primaryKeyReaders;
private final IColumnFilterEvaluator filterEvaluator;
private final List<IColumnFilterValueAccessor> filterValueAccessors;
protected final ColumnAssembler assembler;
protected QueryColumnMetadata(ARecordType datasetType, ARecordType metaType,
- IColumnValuesReader[] primaryKeyReaders, IValueReference serializedMetadata,
+ PrimitiveColumnValuesReader[] primaryKeyReaders, IValueReference serializedMetadata,
FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, IColumnValuesReaderFactory readerFactory,
IValueGetterFactory valueGetterFactory, IColumnFilterEvaluator filterEvaluator,
List<IColumnFilterValueAccessor> filterValueAccessors) throws HyracksDataException {
@@ -84,7 +84,7 @@
return fieldNamesDictionary;
}
- public final IColumnValuesReader[] getPrimaryKeyReaders() {
+ public final PrimitiveColumnValuesReader[] getPrimaryKeyReaders() {
return primaryKeyReaders;
}
@@ -169,7 +169,8 @@
IColumnFilterEvaluator filterEvaluator = filterEvaluatorFactory.create(filterAccessorProvider);
List<IColumnFilterValueAccessor> filterValueAccessors = filterAccessorProvider.getFilterAccessors();
- IColumnValuesReader[] primaryKeyReaders = createPrimaryKeyReaders(input, readerFactory, numberOfPrimaryKeys);
+ PrimitiveColumnValuesReader[] primaryKeyReaders =
+ createPrimaryKeyReaders(input, readerFactory, numberOfPrimaryKeys);
if (LOGGER.isInfoEnabled() && filterEvaluator != TrueColumnFilterEvaluator.INSTANCE) {
String filterString = filterEvaluator == FalseColumnFilterEvaluator.INSTANCE ? "SKIP_ALL"
@@ -192,14 +193,14 @@
return clippedRoot;
}
- protected static IColumnValuesReader[] createPrimaryKeyReaders(DataInput input,
+ protected static PrimitiveColumnValuesReader[] createPrimaryKeyReaders(DataInput input,
IColumnValuesReaderFactory readerFactory, int numberOfPrimaryKeys) throws IOException {
//skip number of columns
input.readInt();
- IColumnValuesReader[] primaryKeyReaders = new IColumnValuesReader[numberOfPrimaryKeys];
+ PrimitiveColumnValuesReader[] primaryKeyReaders = new PrimitiveColumnValuesReader[numberOfPrimaryKeys];
for (int i = 0; i < numberOfPrimaryKeys; i++) {
- primaryKeyReaders[i] = readerFactory.createValueReader(input);
+ primaryKeyReaders[i] = (PrimitiveColumnValuesReader) readerFactory.createValueReader(input);
}
return primaryKeyReaders;
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
index 6f0c974..2de1948 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
@@ -30,8 +30,8 @@
import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
import org.apache.asterix.column.metadata.schema.visitor.SchemaClipperVisitor;
-import org.apache.asterix.column.values.IColumnValuesReader;
import org.apache.asterix.column.values.IColumnValuesReaderFactory;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
import org.apache.asterix.column.values.reader.filter.FilterAccessorProvider;
import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluator;
import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluatorFactory;
@@ -51,7 +51,7 @@
private final ColumnAssembler metaAssembler;
private QueryColumnWithMetaMetadata(ARecordType datasetType, ARecordType metaType,
- IColumnValuesReader[] primaryKeyReaders, IValueReference serializedMetadata,
+ PrimitiveColumnValuesReader[] primaryKeyReaders, IValueReference serializedMetadata,
FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, ObjectSchemaNode metaRoot,
IColumnValuesReaderFactory readerFactory, IValueGetterFactory valueGetterFactory,
IColumnFilterEvaluator filterEvaluator, List<IColumnFilterValueAccessor> filterValueAccessors)
@@ -141,7 +141,8 @@
IColumnFilterEvaluator filterEvaluator = filterEvaluatorFactory.create(filterAccessorProvider);
List<IColumnFilterValueAccessor> filterValueAccessors = filterAccessorProvider.getFilterAccessors();
- IColumnValuesReader[] primaryKeyReaders = createPrimaryKeyReaders(input, readerFactory, numberOfPrimaryKeys);
+ PrimitiveColumnValuesReader[] primaryKeyReaders =
+ createPrimaryKeyReaders(input, readerFactory, numberOfPrimaryKeys);
return new QueryColumnWithMetaMetadata(datasetType, metaType, primaryKeyReaders, serializedMetadata,
fieldNamesDictionary, clippedRoot, metaClippedRoot, readerFactory, valueGetterFactory, filterEvaluator,
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java
index df6b554..01e9742 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java
@@ -18,25 +18,33 @@
*/
package org.apache.asterix.column.tuple;
+import static org.apache.hyracks.storage.am.common.frames.AbstractSlotManager.ERROR_INDICATOR;
+import static org.apache.hyracks.storage.am.common.frames.AbstractSlotManager.GREATEST_KEY_INDICATOR;
+
import org.apache.asterix.column.assembler.value.IValueGetter;
import org.apache.asterix.column.assembler.value.ValueGetterFactory;
import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
import org.apache.asterix.column.bytes.stream.in.ByteBufferInputStream;
import org.apache.asterix.column.bytes.stream.in.MultiByteBufferInputStream;
import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleMode;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples.AbstractColumnTupleReference;
+import org.apache.hyracks.storage.common.MultiComparator;
public abstract class AbstractAsterixColumnTupleReference extends AbstractColumnTupleReference {
private final IValueGetter[] primaryKeysValueGetters;
protected final ByteBufferInputStream[] primaryKeyStreams;
- protected final IColumnValuesReader[] primaryKeyReaders;
+ protected final PrimitiveColumnValuesReader[] primaryKeyReaders;
protected final VoidPointable[] primaryKeys;
protected final AbstractBytesInputStream[] columnStreams;
@@ -67,16 +75,24 @@
}
}
- protected abstract IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info);
+ protected abstract PrimitiveColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info);
@Override
- protected final void startPrimaryKey(IColumnBufferProvider provider, int startIndex, int ordinal,
- int numberOfTuples) throws HyracksDataException {
+ protected void setPrimaryKeysAt(int index, int skipCount) throws HyracksDataException {
+ for (int i = 0; i < primaryKeyReaders.length; i++) {
+ PrimitiveColumnValuesReader reader = primaryKeyReaders[i];
+ reader.reset(index, skipCount);
+ primaryKeys[i].set(primaryKeysValueGetters[i].getValue(reader));
+ }
+ }
+
+ @Override
+ protected final void startPrimaryKey(IColumnBufferProvider provider, int ordinal, int numberOfTuples)
+ throws HyracksDataException {
ByteBufferInputStream primaryKeyStream = primaryKeyStreams[ordinal];
primaryKeyStream.reset(provider);
IColumnValuesReader reader = primaryKeyReaders[ordinal];
reader.reset(primaryKeyStream, numberOfTuples);
- reader.skip(startIndex);
}
@Override
@@ -137,4 +153,79 @@
}
return compare;
}
+
+ @Override
+ public int findTupleIndex(ITupleReference searchKey, MultiComparator comparator, FindTupleMode mode,
+ FindTupleNoExactMatchPolicy matchPolicy) throws HyracksDataException {
+ int tupleCount = getTupleCount();
+ if (tupleCount <= 0) {
+ return GREATEST_KEY_INDICATOR;
+ }
+
+ int mid;
+ int begin = tupleIndex;
+ int end = tupleCount - 1;
+
+ while (begin <= end) {
+ mid = (begin + end) / 2;
+
+ setKeyAt(mid);
+ int cmp = comparator.compare(searchKey, this);
+ if (cmp < 0) {
+ end = mid - 1;
+ } else if (cmp > 0) {
+ begin = mid + 1;
+ } else {
+ if (mode == FindTupleMode.EXCLUSIVE) {
+ if (matchPolicy == FindTupleNoExactMatchPolicy.HIGHER_KEY) {
+ begin = mid + 1;
+ } else {
+ end = mid - 1;
+ }
+ } else {
+ if (mode == FindTupleMode.EXCLUSIVE_ERROR_IF_EXISTS) {
+ return ERROR_INDICATOR;
+ } else {
+ return mid;
+ }
+ }
+ }
+ }
+
+ if (mode == FindTupleMode.EXACT) {
+ return ERROR_INDICATOR;
+ }
+
+ if (matchPolicy == FindTupleNoExactMatchPolicy.HIGHER_KEY) {
+ if (begin > tupleCount - 1) {
+ return GREATEST_KEY_INDICATOR;
+ }
+
+ setKeyAt(begin);
+ if (comparator.compare(searchKey, this) < 0) {
+ return begin;
+ } else {
+ return GREATEST_KEY_INDICATOR;
+ }
+ } else {
+ if (end < 0) {
+ return GREATEST_KEY_INDICATOR;
+ }
+
+ setKeyAt(end);
+ if (comparator.compare(searchKey, this) > 0) {
+ return end;
+ } else {
+ return GREATEST_KEY_INDICATOR;
+ }
+ }
+ }
+
+ protected void setKeyAt(int index) {
+ for (int i = 0; i < primaryKeyReaders.length; i++) {
+ PrimitiveColumnValuesReader reader = primaryKeyReaders[i];
+ reader.getValue(index);
+ primaryKeys[i].set(primaryKeysValueGetters[i].getValue(reader));
+ }
+ }
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
index c10d415..56b0a57 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
@@ -24,6 +24,7 @@
import org.apache.asterix.column.operation.lsm.merge.IEndOfPageCallBack;
import org.apache.asterix.column.operation.lsm.merge.MergeColumnReadMetadata;
import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
@@ -43,11 +44,14 @@
}
@Override
- protected IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
+ protected PrimitiveColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
MergeColumnReadMetadata columnMetadata = (MergeColumnReadMetadata) info;
int numberOfPrimaryKeys = columnMetadata.getNumberOfPrimaryKeys();
- IColumnValuesReader[] primaryKeyReaders = new IColumnValuesReader[numberOfPrimaryKeys];
- System.arraycopy(columnMetadata.getColumnReaders(), 0, primaryKeyReaders, 0, numberOfPrimaryKeys);
+ PrimitiveColumnValuesReader[] primaryKeyReaders = new PrimitiveColumnValuesReader[numberOfPrimaryKeys];
+ IColumnValuesReader[] readers = columnMetadata.getColumnReaders();
+ for (int i = 0; i < numberOfPrimaryKeys; i++) {
+ primaryKeyReaders[i] = (PrimitiveColumnValuesReader) readers[i];
+ }
return primaryKeyReaders;
}
@@ -55,14 +59,15 @@
protected boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples) {
//Skip filters
pageZero.position(pageZero.position() + numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE);
- skipCount = 0;
+ // skip count is always start from zero as no "search" is conducted during a merge
+ this.skipCount = 0;
return true;
}
@Override
- protected void startColumn(IColumnBufferProvider buffersProvider, int startIndex, int ordinal, int numberOfTuples)
+ protected void startColumn(IColumnBufferProvider buffersProvider, int ordinal, int numberOfTuples)
throws HyracksDataException {
- int numberOfPrimaryKeys = primaryKeys.length;
+ int numberOfPrimaryKeys = primaryKeyStreams.length;
if (ordinal < numberOfPrimaryKeys) {
//Skip primary key
return;
@@ -71,7 +76,6 @@
columnStream.reset(buffersProvider);
IColumnValuesReader reader = columnReaders[ordinal];
reader.reset(columnStream, numberOfTuples);
- reader.skip(startIndex);
}
@Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java
index e286235..b70bddc 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java
@@ -24,7 +24,7 @@
import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
import org.apache.asterix.column.operation.query.ColumnAssembler;
import org.apache.asterix.column.operation.query.QueryColumnMetadata;
-import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
import org.apache.asterix.column.values.reader.filter.FilterAccessorProvider;
import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluator;
import org.apache.asterix.column.values.reader.filter.IColumnFilterValueAccessor;
@@ -50,7 +50,7 @@
}
@Override
- protected IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
+ protected PrimitiveColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
return ((QueryColumnMetadata) info).getPrimaryKeyReaders();
}
@@ -69,11 +69,11 @@
}
@Override
- protected void startColumn(IColumnBufferProvider buffersProvider, int startIndex, int ordinal, int numberOfTuples)
+ protected void startColumn(IColumnBufferProvider buffersProvider, int ordinal, int numberOfTuples)
throws HyracksDataException {
AbstractBytesInputStream columnStream = columnStreams[ordinal];
columnStream.reset(buffersProvider);
- assembler.resetColumn(columnStream, startIndex, ordinal);
+ assembler.resetColumn(columnStream, ordinal);
}
@Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java
index a5cedc1..5b7794e 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java
@@ -25,7 +25,7 @@
import org.apache.asterix.column.operation.query.ColumnAssembler;
import org.apache.asterix.column.operation.query.QueryColumnMetadata;
import org.apache.asterix.column.operation.query.QueryColumnWithMetaMetadata;
-import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
import org.apache.asterix.column.values.reader.filter.FilterAccessorProvider;
import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluator;
import org.apache.asterix.column.values.reader.filter.IColumnFilterValueAccessor;
@@ -53,7 +53,7 @@
}
@Override
- protected IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
+ protected PrimitiveColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
return ((QueryColumnMetadata) info).getPrimaryKeyReaders();
}
@@ -73,15 +73,15 @@
}
@Override
- protected void startColumn(IColumnBufferProvider buffersProvider, int startIndex, int ordinal, int numberOfTuples)
+ protected void startColumn(IColumnBufferProvider buffersProvider, int ordinal, int numberOfTuples)
throws HyracksDataException {
AbstractBytesInputStream columnStream = columnStreams[ordinal];
columnStream.reset(buffersProvider);
int metaColumnCount = metaAssembler.getNumberOfColumns();
if (ordinal >= metaColumnCount) {
- assembler.resetColumn(columnStream, startIndex, ordinal - metaColumnCount);
+ assembler.resetColumn(columnStream, ordinal - metaColumnCount);
} else {
- metaAssembler.resetColumn(columnStream, startIndex, ordinal);
+ metaAssembler.resetColumn(columnStream, ordinal);
}
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnKeyValueReader.java
new file mode 100644
index 0000000..4ee2780
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnKeyValueReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+/**
+ * Accessor for key columns
+ */
+public interface IColumnKeyValueReader {
+ /**
+ * Reset the reader at the given index
+ *
+ * @param startIndex start index
+ */
+ void reset(int startIndex, int skipCount) throws HyracksDataException;
+
+ /**
+ * Returns the value of the key at the given index
+ *
+ * @param index tuple index
+ * @return the key value
+ */
+ IValueReference getValue(int index);
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java
index 0f4cc0c..fe23b23 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java
@@ -104,7 +104,7 @@
IValueReference getBytes();
/* ***********************
- * Write function
+ * Write functions
* ***********************
*/
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
index c0cf18a..41dfea2 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
@@ -59,6 +59,8 @@
if (allMissing) {
return;
}
+
+ valueIndex++;
try {
int actualLevel = definitionLevels.readInt();
//Check whether the level is for a null value
@@ -90,7 +92,7 @@
valuesStream.resetAt(defLevelsSize, in);
int valueLength = BytesUtils.readZigZagVarInt(valuesStream);
if (valueLength > 0) {
- valueReader.resetValue(valuesStream);
+ valueReader.init(valuesStream, tupleCount);
}
} catch (IOException e) {
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java
index b233482..bf80580 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java
@@ -30,17 +30,21 @@
import org.apache.asterix.column.values.reader.value.NoOpValueReader;
import org.apache.asterix.column.values.reader.value.StringValueReader;
import org.apache.asterix.column.values.reader.value.UUIDValueReader;
+import org.apache.asterix.column.values.reader.value.key.DoubleKeyValueReader;
+import org.apache.asterix.column.values.reader.value.key.LongKeyValueReader;
+import org.apache.asterix.column.values.reader.value.key.StringKeyValueReader;
+import org.apache.asterix.column.values.reader.value.key.UUIDKeyValueReader;
import org.apache.asterix.om.types.ATypeTag;
public class ColumnValueReaderFactory implements IColumnValuesReaderFactory {
@Override
public IColumnValuesReader createValueReader(ATypeTag typeTag, int columnIndex, int maxLevel, boolean primaryKey) {
- return new PrimitiveColumnValuesReader(createReader(typeTag), columnIndex, maxLevel, primaryKey);
+ return new PrimitiveColumnValuesReader(createReader(typeTag, primaryKey), columnIndex, maxLevel, primaryKey);
}
@Override
public IColumnValuesReader createValueReader(ATypeTag typeTag, int columnIndex, int maxLevel, int[] delimiters) {
- return new RepeatedPrimitiveColumnValuesReader(createReader(typeTag), columnIndex, maxLevel, delimiters);
+ return new RepeatedPrimitiveColumnValuesReader(createReader(typeTag, false), columnIndex, maxLevel, delimiters);
}
@Override
@@ -60,7 +64,7 @@
return createValueReader(typeTag, columnIndex, maxLevel, primaryKey);
}
- private AbstractValueReader createReader(ATypeTag typeTag) {
+ private AbstractValueReader createReader(ATypeTag typeTag, boolean primaryKey) {
switch (typeTag) {
case MISSING:
case NULL:
@@ -68,13 +72,13 @@
case BOOLEAN:
return new BooleanValueReader();
case BIGINT:
- return new LongValueReader();
+ return primaryKey ? new LongKeyValueReader() : new LongValueReader();
case DOUBLE:
- return new DoubleValueReader();
+ return primaryKey ? new DoubleKeyValueReader() : new DoubleValueReader();
case STRING:
- return new StringValueReader();
+ return primaryKey ? new StringKeyValueReader() : new StringValueReader();
case UUID:
- return new UUIDValueReader();
+ return primaryKey ? new UUIDKeyValueReader() : new UUIDValueReader();
default:
throw new UnsupportedOperationException(typeTag + " is not supported");
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java
index e8c7bc5..7ae2954 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java
@@ -20,14 +20,16 @@
import java.io.IOException;
+import org.apache.asterix.column.values.IColumnKeyValueReader;
import org.apache.asterix.column.values.IColumnValuesWriter;
import org.apache.asterix.column.values.reader.value.AbstractValueReader;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
/**
* Reader for a non-repeated primitive value
*/
-public final class PrimitiveColumnValuesReader extends AbstractColumnValuesReader {
+public final class PrimitiveColumnValuesReader extends AbstractColumnValuesReader implements IColumnKeyValueReader {
/**
* A primary key value is always present. Anti-matter can be determined by checking whether the definition level
* indicates that the tuple's values are missing (i.e., by calling {@link #isMissing()}).
@@ -49,7 +51,6 @@
if (valueIndex == valueCount) {
return false;
}
- valueIndex++;
try {
nextLevel();
@@ -92,4 +93,18 @@
}
}
}
+
+ @Override
+ public IValueReference getValue(int index) {
+ return ((IColumnKeyValueReader) valueReader).getValue(index);
+ }
+
+ @Override
+ public void reset(int startIndex, int skipCount) throws HyracksDataException {
+ ((IColumnKeyValueReader) valueReader).reset(startIndex, skipCount);
+ nextLevel();
+ for (int i = 1; i < skipCount; i++) {
+ nextLevel();
+ }
+ }
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java
index 673aa98..1cd424b 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java
@@ -66,7 +66,6 @@
if (level == maxLevel) {
valueReader.nextValue();
}
- valueIndex++;
return true;
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java
index 3d4c744..4db082f 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java
@@ -27,7 +27,7 @@
public abstract class AbstractValueReader implements Comparable<AbstractValueReader> {
- public abstract void resetValue(AbstractBytesInputStream in) throws IOException;
+ public abstract void init(AbstractBytesInputStream in, int tupleCount) throws IOException;
public abstract void nextValue() throws HyracksDataException;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/BooleanValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/BooleanValueReader.java
index 3417773..6b5e0d4 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/BooleanValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/BooleanValueReader.java
@@ -34,7 +34,7 @@
}
@Override
- public void resetValue(AbstractBytesInputStream in) {
+ public void init(AbstractBytesInputStream in, int tupleCount) {
booleanReader.reset(in);
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java
index 24155f2..faa60d1 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java
@@ -20,20 +20,20 @@
import java.io.IOException;
-import org.apache.asterix.column.bytes.decoder.ParquetDoublePlainValuesReader;
+import org.apache.asterix.column.bytes.decoder.ParquetPlainFixedLengthValuesReader;
import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
import org.apache.asterix.om.types.ATypeTag;
public final class DoubleValueReader extends AbstractValueReader {
- private final ParquetDoublePlainValuesReader doubleReader;
+ private final ParquetPlainFixedLengthValuesReader doubleReader;
private double nextValue;
public DoubleValueReader() {
- doubleReader = new ParquetDoublePlainValuesReader();
+ doubleReader = new ParquetPlainFixedLengthValuesReader(Double.BYTES);
}
@Override
- public void resetValue(AbstractBytesInputStream in) throws IOException {
+ public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
doubleReader.initFromPage(in);
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java
index 09413d9..c22687e 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java
@@ -33,7 +33,7 @@
}
@Override
- public void resetValue(AbstractBytesInputStream in) throws IOException {
+ public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
longReader.initFromPage(in);
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/NoOpValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/NoOpValueReader.java
index fd56ff2..1982c54 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/NoOpValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/NoOpValueReader.java
@@ -31,7 +31,7 @@
}
@Override
- public void resetValue(AbstractBytesInputStream in) throws IOException {
+ public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
throw new UnsupportedOperationException(getClass().getName());
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/StringValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/StringValueReader.java
index 8fd8874..19da3dd 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/StringValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/StringValueReader.java
@@ -35,7 +35,7 @@
}
@Override
- public void resetValue(AbstractBytesInputStream in) throws IOException {
+ public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
stringReader.initFromPage(in);
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java
index 4f240e9..7517960 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java
@@ -20,22 +20,24 @@
import java.io.IOException;
-import org.apache.asterix.column.bytes.decoder.ParquetDeltaByteArrayReader;
+import org.apache.asterix.column.bytes.decoder.ParquetPlainFixedLengthValuesReader;
import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
import org.apache.asterix.dataflow.data.nontagged.comparators.AUUIDPartialBinaryComparatorFactory;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
public final class UUIDValueReader extends AbstractValueReader {
- private final ParquetDeltaByteArrayReader uuidReader;
+ private final ParquetPlainFixedLengthValuesReader uuidReader;
private IValueReference nextValue;
public UUIDValueReader() {
- uuidReader = new ParquetDeltaByteArrayReader(false);
+ ArrayBackedValueStorage storage = new ArrayBackedValueStorage(16);
+ uuidReader = new ParquetPlainFixedLengthValuesReader(storage);
}
@Override
- public void resetValue(AbstractBytesInputStream in) throws IOException {
+ public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
uuidReader.initFromPage(in);
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/AbstractFixedLengthColumnKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/AbstractFixedLengthColumnKeyValueReader.java
new file mode 100644
index 0000000..fc458f3
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/AbstractFixedLengthColumnKeyValueReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value.key;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.column.values.IColumnKeyValueReader;
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+
+abstract class AbstractFixedLengthColumnKeyValueReader extends AbstractValueReader implements IColumnKeyValueReader {
+ protected final IPointable value;
+ private ByteBuffer buffer;
+ private int startOffset;
+
+ AbstractFixedLengthColumnKeyValueReader() {
+ value = new VoidPointable();
+ }
+
+ @Override
+ public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
+ buffer = in.getBuffer();
+ startOffset = buffer.position();
+ value.set(null, 0, 0);
+ }
+
+ @Override
+ public void reset(int startIndex, int skipCount) {
+ getValue(startIndex);
+ }
+
+ @Override
+ public IValueReference getValue(int index) {
+ int valueLength = getValueLength();
+ int offset = startOffset + index * valueLength;
+ value.set(buffer.array(), offset, valueLength);
+ return value;
+ }
+
+ @Override
+ public void nextValue() {
+ if (value.getByteArray() == null) {
+ getValue(0);
+ return;
+ }
+ int valueLength = getValueLength();
+ int offset = value.getStartOffset() + valueLength;
+ value.set(buffer.array(), offset, valueLength);
+ }
+
+ protected abstract int getValueLength();
+
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/DoubleKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/DoubleKeyValueReader.java
new file mode 100644
index 0000000..cfb8af4
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/DoubleKeyValueReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value.key;
+
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+
+public final class DoubleKeyValueReader extends AbstractFixedLengthColumnKeyValueReader {
+ @Override
+ public ATypeTag getTypeTag() {
+ return ATypeTag.DOUBLE;
+ }
+
+ @Override
+ protected int getValueLength() {
+ return Double.BYTES;
+ }
+
+ @Override
+ public double getDouble() {
+ return DoublePointable.getDouble(value.getByteArray(), value.getStartOffset());
+ }
+
+ @Override
+ public int compareTo(AbstractValueReader o) {
+ return Double.compare(getDouble(), o.getDouble());
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/LongKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/LongKeyValueReader.java
new file mode 100644
index 0000000..a981dca
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/LongKeyValueReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value.key;
+
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+
+public final class LongKeyValueReader extends AbstractFixedLengthColumnKeyValueReader {
+ @Override
+ public ATypeTag getTypeTag() {
+ return ATypeTag.BIGINT;
+ }
+
+ @Override
+ protected int getValueLength() {
+ return Long.BYTES;
+ }
+
+ @Override
+ public long getLong() {
+ return LongPointable.getLong(value.getByteArray(), value.getStartOffset());
+ }
+
+ @Override
+ public int compareTo(AbstractValueReader o) {
+ return Long.compare(getLong(), o.getLong());
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/StringKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/StringKeyValueReader.java
new file mode 100644
index 0000000..d6d7c4c
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/StringKeyValueReader.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value.key;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.column.values.IColumnKeyValueReader;
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+
+public final class StringKeyValueReader extends AbstractValueReader implements IColumnKeyValueReader {
+ private final IPointable value;
+ private ByteBuffer buffer;
+ private int startOffset;
+ private int tupleCount;
+
+ public StringKeyValueReader() {
+ value = new VoidPointable();
+ }
+
+ @Override
+ public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
+ buffer = in.getBuffer();
+ startOffset = buffer.position();
+ this.tupleCount = tupleCount;
+ value.set(null, 0, 0);
+ }
+
+ @Override
+ public void reset(int startIndex, int skipCount) {
+ getValue(startIndex);
+ }
+
+ @Override
+ public IValueReference getValue(int index) {
+ byte[] bytes = buffer.array();
+ int indexOffset = startOffset + index * Integer.BYTES;
+ int valueOffset = startOffset + tupleCount * Integer.BYTES + IntegerPointable.getInteger(bytes, indexOffset);
+ int valueLength = UTF8StringUtil.getUTFLength(bytes, valueOffset);
+ valueLength += UTF8StringUtil.getNumBytesToStoreLength(valueLength);
+ value.set(bytes, valueOffset, valueLength);
+ return value;
+ }
+
+ @Override
+ public IValueReference getBytes() {
+ return value;
+ }
+
+ @Override
+ public void nextValue() {
+ if (value.getByteArray() == null) {
+ getValue(0);
+ return;
+ }
+ int offset = value.getStartOffset() + value.getLength();
+ int length = UTF8StringUtil.getUTFLength(buffer.array(), offset);
+ length += UTF8StringUtil.getNumBytesToStoreLength(length);
+ value.set(buffer.array(), offset, length);
+ }
+
+ @Override
+ public ATypeTag getTypeTag() {
+ return ATypeTag.STRING;
+ }
+
+ @Override
+ public int compareTo(AbstractValueReader o) {
+ return UTF8StringPointable.compare(getBytes(), o.getBytes());
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/UUIDKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/UUIDKeyValueReader.java
new file mode 100644
index 0000000..141e9f8
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/UUIDKeyValueReader.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value.key;
+
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
+import org.apache.asterix.dataflow.data.nontagged.comparators.AUUIDPartialBinaryComparatorFactory;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public final class UUIDKeyValueReader extends AbstractFixedLengthColumnKeyValueReader {
+ @Override
+ public ATypeTag getTypeTag() {
+ return ATypeTag.UUID;
+ }
+
+ @Override
+ protected int getValueLength() {
+ return 16;
+ }
+
+ @Override
+ public IValueReference getBytes() {
+ return value;
+ }
+
+ @Override
+ public int compareTo(AbstractValueReader o) {
+ return AUUIDPartialBinaryComparatorFactory.compare(getBytes(), o.getBytes());
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
index ca5cbb1..5963d23 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
@@ -20,7 +20,7 @@
import java.io.IOException;
-import org.apache.asterix.column.bytes.encoder.ParquetPlainValuesWriter;
+import org.apache.asterix.column.bytes.encoder.ParquetPlainFixedLengthValuesWriter;
import org.apache.asterix.column.values.IColumnValuesReader;
import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
import org.apache.asterix.column.values.writer.filters.DoubleColumnFilterWriter;
@@ -37,12 +37,12 @@
import org.apache.parquet.bytes.BytesInput;
public final class DoubleColumnValuesWriter extends AbstractColumnValuesWriter {
- private final ParquetPlainValuesWriter doubleWriter;
+ private final ParquetPlainFixedLengthValuesWriter doubleWriter;
public DoubleColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
boolean collection, boolean filtered) {
super(columnIndex, level, collection, filtered);
- doubleWriter = new ParquetPlainValuesWriter(multiPageOpRef);
+ doubleWriter = new ParquetPlainFixedLengthValuesWriter(multiPageOpRef);
}
@Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
index e71ec73..e6ada55 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
@@ -20,7 +20,9 @@
import java.io.IOException;
+import org.apache.asterix.column.bytes.encoder.AbstractParquetValuesWriter;
import org.apache.asterix.column.bytes.encoder.ParquetDeltaBinaryPackingValuesWriterForLong;
+import org.apache.asterix.column.bytes.encoder.ParquetPlainFixedLengthValuesWriter;
import org.apache.asterix.column.values.IColumnValuesReader;
import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
import org.apache.asterix.column.values.writer.filters.LongColumnFilterWriter;
@@ -35,12 +37,13 @@
import org.apache.parquet.bytes.BytesInput;
final class LongColumnValuesWriter extends AbstractColumnValuesWriter {
- private final ParquetDeltaBinaryPackingValuesWriterForLong longWriter;
+ private final AbstractParquetValuesWriter longWriter;
public LongColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
boolean collection, boolean filtered) {
super(columnIndex, level, collection, filtered);
- longWriter = new ParquetDeltaBinaryPackingValuesWriterForLong(multiPageOpRef);
+ longWriter = !filtered ? new ParquetPlainFixedLengthValuesWriter(multiPageOpRef)
+ : new ParquetDeltaBinaryPackingValuesWriterForLong(multiPageOpRef);
}
@Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
index e1a3ffd..b0d5a93 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
@@ -20,7 +20,9 @@
import java.io.IOException;
+import org.apache.asterix.column.bytes.encoder.AbstractParquetValuesWriter;
import org.apache.asterix.column.bytes.encoder.ParquetDeltaByteArrayWriter;
+import org.apache.asterix.column.bytes.encoder.ParquetPlainVariableLengthValuesWriter;
import org.apache.asterix.column.values.IColumnValuesReader;
import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
import org.apache.asterix.column.values.writer.filters.StringColumnFilterWriter;
@@ -32,18 +34,19 @@
import org.apache.parquet.bytes.BytesInput;
public class StringColumnValuesWriter extends AbstractColumnValuesWriter {
- private final ParquetDeltaByteArrayWriter stringWriter;
+ private final AbstractParquetValuesWriter stringWriter;
private final boolean skipLengthBytes;
public StringColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
boolean collection, boolean filtered) {
- this(multiPageOpRef, columnIndex, level, collection, filtered, true);
+ this(columnIndex, level, collection, filtered, true, filtered ? new ParquetDeltaByteArrayWriter(multiPageOpRef)
+ : new ParquetPlainVariableLengthValuesWriter(multiPageOpRef));
}
- protected StringColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
- boolean collection, boolean filtered, boolean skipLengthBytes) {
+ protected StringColumnValuesWriter(int columnIndex, int level, boolean collection, boolean filtered,
+ boolean skipLengthBytes, AbstractParquetValuesWriter stringWriter) {
super(columnIndex, level, collection, filtered);
- stringWriter = new ParquetDeltaByteArrayWriter(multiPageOpRef);
+ this.stringWriter = stringWriter;
this.skipLengthBytes = skipLengthBytes;
}
@@ -94,4 +97,5 @@
protected ATypeTag getTypeTag() {
return ATypeTag.STRING;
}
+
}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
index 1e98754..9d4ff9a 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.column.values.writer;
+import org.apache.asterix.column.bytes.encoder.ParquetPlainFixedLengthValuesWriter;
import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
import org.apache.asterix.column.values.writer.filters.UUIDColumnFilterWriter;
import org.apache.asterix.om.types.ATypeTag;
@@ -28,7 +29,8 @@
public UUIDColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
boolean collection, boolean filtered) {
- super(multiPageOpRef, columnIndex, level, collection, filtered, false);
+ // UUID is always written without encoding
+ super(columnIndex, level, collection, filtered, false, new ParquetPlainFixedLengthValuesWriter(multiPageOpRef));
}
@Override
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index 03c14ae..36ebab9 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -271,7 +271,7 @@
LOGGER.info("READ PageZero {}", pageNumber++);
assembler.reset(prepareRead(pageZero, providers, streams));
for (int i = 0; i < streams.length; i++) {
- assembler.resetColumn(streams[i], 0, i);
+ assembler.resetColumn(streams[i], i);
}
writeForPageZero(ps, assembler);
}
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
index 8f53ebc..ff088d6 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
@@ -125,7 +125,7 @@
Pair<PrintStream, ATypeTag> pair = new Pair<>(ps, ATypeTag.OBJECT);
assembler.reset(numberOfTuples);
for (int i = 0; i < columnMetadata.getNumberOfColumns(); i++) {
- assembler.resetColumn(streams[i], 0, i);
+ assembler.resetColumn(streams[i], i);
}
while (assembler.hasNext()) {
IValueReference record = assembler.nextValue();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java
index 0c95500..4034906 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java
@@ -19,9 +19,13 @@
package org.apache.hyracks.storage.am.lsm.btree.column.api;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleMode;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMTreeTupleReference;
+import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
@@ -31,11 +35,38 @@
*/
public interface IColumnTupleIterator extends ILSMTreeTupleReference, Comparable<IColumnTupleIterator> {
/**
+ * Indicates a new page was set to prepare the iterator
+ */
+ void newPage() throws HyracksDataException;
+
+ /**
* Reset the iterator starting at the provided index
*
* @param startIndex start from the tuple at this index
+ * @param endIndex stop at this index (exclusive)
*/
- void reset(int startIndex) throws HyracksDataException;
+ void reset(int startIndex, int endIndex) throws HyracksDataException;
+
+ /**
+ * Set the iterator at a new position
+ * NOTE:
+ * the new start index has to be greater than the current tuple index
+ *
+ * @param startIndex the new index to start from
+ */
+ void setAt(int startIndex) throws HyracksDataException;
+
+ /**
+ * Finds the tuple index given the search key
+ *
+ * @param searchKey search key
+ * @param cmp comparator
+ * @param ftm find tuple mode
+ * @param ftp find tuple policy
+ * @return index of the tuple
+ */
+ int findTupleIndex(ITupleReference searchKey, MultiComparator cmp, FindTupleMode ftm,
+ FindTupleNoExactMatchPolicy ftp) throws HyracksDataException;
/**
* Mark {@link IColumnTupleIterator} as consumed
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreePointSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreePointSearchCursor.java
index 683e099..db878c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreePointSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreePointSearchCursor.java
@@ -20,7 +20,10 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.btree.api.IDiskBTreeStatefulPointSearchCursor;
+import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleMode;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
@@ -56,7 +59,29 @@
@Override
public void setCursorToNextKey(ISearchPredicate searchPred) throws HyracksDataException {
- initCursorPosition(searchPred);
+ int index = getLowKeyIndex();
+ if (index == frame.getTupleCount()) {
+ frameTuple.consume();
+ yieldFirstCall = false;
+ return;
+ }
+ frameTuple.setAt(index);
+ yieldFirstCall = true;
+ }
+
+ @Override
+ protected void setSearchPredicate(ISearchPredicate searchPred) {
+ pred = (RangePredicate) searchPred;
+ lowKey = pred.getLowKey();
+ lowKeyFtm = FindTupleMode.EXACT;
+ lowKeyFtp = FindTupleNoExactMatchPolicy.NONE;
+ reusablePredicate.setLowKeyComparator(originalKeyCmp);
+ }
+
+ @Override
+ protected int getLowKeyIndex() throws HyracksDataException {
+ int index = frameTuple.findTupleIndex(pred.getLowKey(), pred.getLowKeyComparator(), lowKeyFtm, lowKeyFtp);
+ return index < 0 ? frame.getTupleCount() : index;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
index 3323494..e618aaa 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
@@ -24,6 +24,8 @@
import org.apache.hyracks.storage.am.btree.impls.BTreeCursorInitialState;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleMode;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
import org.apache.hyracks.storage.common.EnforcedIndexCursor;
@@ -53,6 +55,11 @@
protected RangePredicate pred;
protected ITupleReference lowKey;
protected ITupleReference highKey;
+
+ protected FindTupleMode lowKeyFtm;
+ protected FindTupleMode highKeyFtm;
+ protected FindTupleNoExactMatchPolicy lowKeyFtp;
+ protected FindTupleNoExactMatchPolicy highKeyFtp;
protected boolean yieldFirstCall;
protected final IIndexCursorStats stats;
@@ -67,7 +74,7 @@
}
@Override
- public void doDestroy() throws HyracksDataException {
+ public void doDestroy() {
// No Op all resources are released in the close call
}
@@ -84,7 +91,8 @@
bufferCache.unpin(page0);
page0 = nextLeaf;
frame.setPage(page0);
- frameTuple.reset(0);
+ frameTuple.newPage();
+ setCursorPosition();
nextLeafPage = frame.getNextLeaf();
} while (frame.getTupleCount() == 0 && nextLeafPage > 0);
}
@@ -120,20 +128,28 @@
pageId = ((BTreeCursorInitialState) initialState).getPageId();
frame.setPage(page0);
frame.setMultiComparator(originalKeyCmp);
- frameTuple.reset(0);
+ frameTuple.newPage();
initCursorPosition(searchPred);
}
protected void initCursorPosition(ISearchPredicate searchPred) throws HyracksDataException {
- pred = (RangePredicate) searchPred;
- lowKey = pred.getLowKey();
- highKey = pred.getHighKey();
-
+ setSearchPredicate(searchPred);
reusablePredicate.setLowKeyComparator(originalKeyCmp);
reusablePredicate.setHighKeyComparator(pred.getHighKeyComparator());
reusablePredicate.setHighKey(pred.getHighKey(), pred.isHighKeyInclusive());
yieldFirstCall = false;
- advanceTupleToLowKey();
+ setCursorPosition();
+ }
+
+ private void setCursorPosition() throws HyracksDataException {
+ int start = getLowKeyIndex();
+ int end = getHighKeyIndex();
+ if (end < start) {
+ frameTuple.consume();
+ return;
+ }
+ frameTuple.reset(start, end);
+ yieldFirstCall = shouldYieldFirstCall();
}
protected boolean isNextIncluded() throws HyracksDataException {
@@ -151,41 +167,6 @@
return highKey == null || isLessOrEqual(frameTuple, highKey, pred.isHighKeyInclusive());
}
- protected void advanceTupleToLowKey() throws HyracksDataException {
- if (highKey != null && isLessOrEqual(highKey, frame.getLeftmostTuple(), !pred.isHighKeyInclusive())
- || lowKey != null && isLessOrEqual(frame.getRightmostTuple(), lowKey, !pred.isLowKeyInclusive())) {
- /*
- * If
- * - The Lowest key from the frame is greater than the requested highKey
- * OR
- * - The highest key from the frame is less than the requested lowKey
- * Then:
- * No tuple will satisfy the search key. Consume the frameTuple to stop the search.
- */
- frameTuple.consume();
- return;
- } else if (lowKey == null) {
- // no lowKey was specified, start from tupleIndex = 0
- return;
- }
-
- //The requested key is somewhere within the frame tuples
- boolean stop = false;
- int counter = 0;
- while (!stop && !frameTuple.isConsumed()) {
- frameTuple.next();
- stop = isLessOrEqual(lowKey, frameTuple, pred.isLowKeyInclusive());
- counter++;
- }
-
- // Only proceed if needed
- yieldFirstCall = shouldYieldFirstCall();
- // Advance all columns to the proper position if needed
- if (yieldFirstCall && counter - 1 > 0) {
- frameTuple.skip(counter - 1);
- }
- }
-
protected boolean shouldYieldFirstCall() throws HyracksDataException {
// Proceed if the highKey is null or the current tuple's key is less than (or equal) the highKey
return highKey == null || isLessOrEqual(frameTuple, highKey, pred.isHighKeyInclusive());
@@ -205,6 +186,65 @@
return cmp < 0 || inclusive && cmp == 0;
}
+ protected int getLowKeyIndex() throws HyracksDataException {
+ if (lowKey == null) {
+ return 0;
+ } else if (isLessOrEqual(frame.getRightmostTuple(), lowKey, !pred.isLowKeyInclusive())) {
+ //The highest key from the frame is less than the requested lowKey
+ return frame.getTupleCount();
+ }
+
+ int index = frameTuple.findTupleIndex(lowKey, pred.getLowKeyComparator(), lowKeyFtm, lowKeyFtp);
+ if (pred.isLowKeyInclusive()) {
+ index++;
+ } else {
+ if (index < 0) {
+ index = frame.getTupleCount();
+ }
+ }
+
+ return index;
+ }
+
+ protected int getHighKeyIndex() throws HyracksDataException {
+ if (highKey == null) {
+ return frame.getTupleCount() - 1;
+ } else if (isLessOrEqual(highKey, frame.getLeftmostTuple(), !pred.isHighKeyInclusive())) {
+ return -1;
+ }
+
+ int index = frameTuple.findTupleIndex(highKey, pred.getHighKeyComparator(), highKeyFtm, highKeyFtp);
+ if (pred.isHighKeyInclusive()) {
+ if (index < 0) {
+ index = frame.getTupleCount() - 1;
+ } else {
+ index--;
+ }
+ }
+
+ return index;
+ }
+
+ protected void setSearchPredicate(ISearchPredicate searchPred) {
+ pred = (RangePredicate) searchPred;
+ lowKey = pred.getLowKey();
+ highKey = pred.getHighKey();
+
+ lowKeyFtm = FindTupleMode.EXCLUSIVE;
+ if (pred.isLowKeyInclusive()) {
+ lowKeyFtp = FindTupleNoExactMatchPolicy.LOWER_KEY;
+ } else {
+ lowKeyFtp = FindTupleNoExactMatchPolicy.HIGHER_KEY;
+ }
+
+ highKeyFtm = FindTupleMode.EXCLUSIVE;
+ if (pred.isHighKeyInclusive()) {
+ highKeyFtp = FindTupleNoExactMatchPolicy.HIGHER_KEY;
+ } else {
+ highKeyFtp = FindTupleNoExactMatchPolicy.LOWER_KEY;
+ }
+ }
+
@Override
public void doClose() throws HyracksDataException {
frameTuple.close();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
index d0e1e1d..6f95dbf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
@@ -42,6 +42,7 @@
private final int numberOfPrimaryKeys;
private int totalNumberOfMegaLeafNodes;
private int numOfSkippedMegaLeafNodes;
+ private int endIndex;
protected int tupleIndex;
/**
@@ -78,47 +79,72 @@
}
@Override
- public final void reset(int startIndex) throws HyracksDataException {
- tupleIndex = startIndex;
+ public final void newPage() throws HyracksDataException {
+ tupleIndex = 0;
ByteBuffer pageZero = frame.getBuffer();
pageZero.clear();
pageZero.position(HEADER_SIZE);
int numberOfTuples = frame.getTupleCount();
- //Start new page and check whether we should skip reading non-key columns or not
- boolean readColumnPages = startNewPage(pageZero, frame.getNumberOfColumns(), numberOfTuples);
//Start primary keys
for (int i = 0; i < numberOfPrimaryKeys; i++) {
IColumnBufferProvider provider = primaryKeyBufferProviders[i];
provider.reset(frame);
- startPrimaryKey(provider, tupleIndex, i, numberOfTuples);
+ startPrimaryKey(provider, i, numberOfTuples);
}
+ }
+ @Override
+ public final void reset(int startIndex, int endIndex) throws HyracksDataException {
+ tupleIndex = startIndex;
+ this.endIndex = endIndex;
+ ByteBuffer pageZero = frame.getBuffer();
+ int numberOfTuples = frame.getTupleCount();
+ //Start new page and check whether we should skip reading non-key columns or not
+ boolean readColumnPages = startNewPage(pageZero, frame.getNumberOfColumns(), numberOfTuples);
+ setPrimaryKeysAt(startIndex, startIndex);
if (readColumnPages) {
for (int i = 0; i < buffersProviders.length; i++) {
IColumnBufferProvider provider = buffersProviders[i];
//Release previous pinned pages if any
provider.releaseAll();
provider.reset(frame);
- startColumn(provider, tupleIndex, i, numberOfTuples);
+ startColumn(provider, i, numberOfTuples);
}
+ // Skip until before startIndex (i.e. stop at startIndex - 1)
+ skip(startIndex);
} else {
numOfSkippedMegaLeafNodes++;
}
totalNumberOfMegaLeafNodes++;
}
+ @Override
+ public final void setAt(int startIndex) throws HyracksDataException {
+ int skipCount = startIndex - tupleIndex;
+ tupleIndex = startIndex;
+ setPrimaryKeysAt(startIndex, skipCount);
+ // -1 because next would be called for all columns
+ skip(skipCount - 1);
+ }
+
+ protected abstract void setPrimaryKeysAt(int index, int skipCount) throws HyracksDataException;
+
protected abstract boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples);
- protected abstract void startPrimaryKey(IColumnBufferProvider bufferProvider, int startIndex, int ordinal,
- int numberOfTuples) throws HyracksDataException;
+ protected abstract void startPrimaryKey(IColumnBufferProvider bufferProvider, int ordinal, int numberOfTuples)
+ throws HyracksDataException;
- protected abstract void startColumn(IColumnBufferProvider buffersProvider, int startIndex, int ordinal,
- int numberOfTuples) throws HyracksDataException;
+ protected abstract void startColumn(IColumnBufferProvider buffersProvider, int ordinal, int numberOfTuples)
+ throws HyracksDataException;
protected abstract void onNext() throws HyracksDataException;
+ protected final int getTupleCount() {
+ return frame.getTupleCount();
+ }
+
@Override
public final void next() throws HyracksDataException {
onNext();
@@ -132,7 +158,7 @@
@Override
public final boolean isConsumed() {
- return tupleIndex >= frame.getTupleCount();
+ return tupleIndex >= endIndex;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
index 8c8a550..de4c4a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
@@ -103,12 +103,6 @@
cursor.next();
matchingTupleCount++;
ITupleReference tuple = cursor.getTuple();
- if (tupleFilter != null) {
- referenceFilterTuple.reset(tuple);
- if (!tupleFilter.accept(referenceFilterTuple)) {
- continue;
- }
- }
tb.reset();
if (retainInput && retainMissing) {
@@ -124,7 +118,13 @@
tb.addFieldEndOffset();
}
}
- writeTupleToOutput(tuple);
+ ITupleReference projectedTuple = writeTupleToOutput(tuple);
+ if (tupleFilter != null) {
+ referenceFilterTuple.reset(projectedTuple);
+ if (!tupleFilter.accept(referenceFilterTuple)) {
+ continue;
+ }
+ }
FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
if (outputLimit >= 0 && ++outputCount >= outputLimit) {
finished = true;