[ASTERIXDB-3137][STO] Introduce LSM write operations for columnar format
- user mode changes: no
- storage format changes: no
- interface changes: yes
Details:
This patch adds the support for writing columnar
values to LSM indexes. By write we mean LSM flush,
merge, and load operations
Interface changes:
ITupleProjector#project() now returns ITupleReference
Change-Id: Ibe494d6de4478954df8e2f3ba0941391934954c2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17424
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractNestedValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractNestedValueAssembler.java
new file mode 100644
index 0000000..1a4c3ef
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractNestedValueAssembler.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+abstract class AbstractNestedValueAssembler extends AbstractValueAssembler {
+ protected final ArrayBackedValueStorage storage;
+
+ AbstractNestedValueAssembler(int level, AssemblerInfo info) {
+ super(level, info);
+ storage = new ArrayBackedValueStorage();
+ }
+
+ /**
+ * @return whether the nested assembler was started or not
+ */
+ final boolean isStarted() {
+ return started;
+ }
+
+ /**
+ * Add a nested value
+ *
+ * @param value contains the value and its information
+ */
+ abstract void addValue(AbstractValueAssembler value) throws HyracksDataException;
+
+ /**
+ * Add a nested {@link ATypeTag#NULL}
+ *
+ * @param value contains the value's information
+ */
+ abstract void addNull(AbstractValueAssembler value) throws HyracksDataException;
+
+ /**
+ * Add a nested {@link ATypeTag#MISSING}
+ */
+ void addMissing() throws HyracksDataException {
+ //By default, we ignore missing
+ }
+
+ @Override
+ final void addNullToAncestor(int nullLevel) throws HyracksDataException {
+ AbstractNestedValueAssembler parent = getParent();
+ if (nullLevel + 1 == level) {
+ parent.start();
+ parent.addNull(this);
+ return;
+ }
+ parent.addNullToAncestor(nullLevel);
+ }
+
+ @Override
+ final void addMissingToAncestor(int missingLevel) throws HyracksDataException {
+ AbstractNestedValueAssembler parent = getParent();
+ if (missingLevel + 1 == level) {
+ parent.start();
+ parent.addMissing();
+ return;
+ }
+ parent.addMissingToAncestor(missingLevel);
+ }
+
+ /**
+ * Recursively start the path of this assembler by staring all un-started parents
+ */
+ public final void start() {
+ if (started) {
+ return;
+ }
+ started = true;
+ reset();
+ AbstractNestedValueAssembler parent = getParent();
+ if (parent != null && !parent.isStarted()) {
+ parent.start();
+ }
+ }
+
+ /**
+ * End the assembler and add this nested value to its parent
+ */
+ public final void end() throws HyracksDataException {
+ if (started) {
+ addValueToParent();
+ started = false;
+ }
+
+ if (isDelegate()) {
+ getParent().end();
+ }
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java
new file mode 100644
index 0000000..9f1809d
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import org.apache.asterix.column.assembler.value.IValueGetter;
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public abstract class AbstractPrimitiveValueAssembler extends AbstractValueAssembler {
+ /**
+ * An indicator to go to the next value
+ */
+ public static final int NEXT_ASSEMBLER = -1;
+ protected final IValueGetter primitiveValueGetter;
+ protected final IColumnValuesReader reader;
+
+ AbstractPrimitiveValueAssembler(int level, AssemblerInfo info, IColumnValuesReader reader,
+ IValueGetter primitiveValueGetter) {
+ super(level, info);
+ this.primitiveValueGetter = primitiveValueGetter;
+ this.reader = reader;
+ }
+
+ public final void reset(AbstractBytesInputStream in, int startIndex, int numberOfTuples)
+ throws HyracksDataException {
+ reader.reset(in, numberOfTuples);
+ reader.skip(startIndex);
+ }
+
+ @Override
+ public final IValueReference getValue() throws HyracksDataException {
+ return primitiveValueGetter.getValue(reader);
+ }
+
+ @Override
+ void addNullToAncestor(int nullLevel) throws HyracksDataException {
+ AbstractNestedValueAssembler parent = getParent();
+ if (nullLevel + 1 == level) {
+ parent.start();
+ parent.addNull(this);
+ return;
+ }
+ parent.addNullToAncestor(nullLevel);
+ }
+
+ @Override
+ void addMissingToAncestor(int missingLevel) throws HyracksDataException {
+ AbstractNestedValueAssembler parent = getParent();
+ if (missingLevel + 1 == level) {
+ parent.start();
+ parent.addMissing();
+ return;
+ }
+ parent.addMissingToAncestor(missingLevel);
+ }
+
+ @Override
+ final void addValueToParent() throws HyracksDataException {
+ AbstractNestedValueAssembler parent = getParent();
+ parent.start();
+ getParent().addValue(this);
+ }
+
+ public final int getColumnIndex() {
+ return reader.getColumnIndex();
+ }
+
+ public final void skip(int count) throws HyracksDataException {
+ reader.skip(count);
+ }
+
+ /**
+ * Move to the next primitive value assembler
+ *
+ * @return the index of the next value
+ */
+ public abstract int next() throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractValueAssembler.java
new file mode 100644
index 0000000..0071917
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractValueAssembler.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+
+public abstract class AbstractValueAssembler {
+ protected static final VoidPointable NULL;
+ protected static final VoidPointable MISSING;
+ private final AbstractNestedValueAssembler parent;
+ private final IValueReference fieldName;
+ private final int fieldIndex;
+ private final boolean delegate;
+ protected final int level;
+ protected boolean started;
+
+ static {
+ NULL = new VoidPointable();
+ NULL.set(new byte[] { ATypeTag.NULL.serialize() }, 0, 1);
+
+ MISSING = new VoidPointable();
+ MISSING.set(new byte[] { ATypeTag.MISSING.serialize() }, 0, 1);
+ }
+
+ protected AbstractValueAssembler(int level, AssemblerInfo info) {
+ this.parent = info.getParent();
+ this.fieldName = info.getFieldName();
+ this.fieldIndex = info.getFieldIndex();
+ this.delegate = info.isDelegate();
+ this.level = level;
+ }
+
+ /**
+ * Add {@link ATypeTag#NULL} value to the ancestor at {@code nullLevel}
+ *
+ * @param nullLevel at what level the null occurred
+ */
+ abstract void addNullToAncestor(int nullLevel) throws HyracksDataException;
+
+ /**
+ * Add {@link ATypeTag#MISSING} value to the ancestor at {@code missingLevel}
+ *
+ * @param missingLevel at what level the missing occurred
+ */
+ abstract void addMissingToAncestor(int missingLevel) throws HyracksDataException;
+
+ /**
+ * Add the value of this assembler to its parent
+ */
+ abstract void addValueToParent() throws HyracksDataException;
+
+ /**
+ * @return the assembled value
+ */
+ public abstract IValueReference getValue() throws HyracksDataException;
+
+ /**
+ * Reset assembler
+ */
+ void reset() {
+ //NoOp
+ }
+
+ /**
+ * @return whether this assembler is the delegate (or representative) of its siblings
+ */
+ final boolean isDelegate() {
+ return delegate;
+ }
+
+ /**
+ * @return parent of the assembler
+ */
+ final AbstractNestedValueAssembler getParent() {
+ return parent;
+ }
+
+ /**
+ * Return the field name of the value of this assembler
+ */
+ final IValueReference getFieldName() {
+ return fieldName;
+ }
+
+ /**
+ * Return the field index of the value of this assembler (for closed types)
+ */
+ final int getFieldIndex() {
+ return fieldIndex;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayValueAssembler.java
new file mode 100644
index 0000000..2352e7f
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayValueAssembler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.builders.ListBuilderFactory;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class ArrayValueAssembler extends AbstractNestedValueAssembler {
+ private final IAsterixListBuilder listBuilder;
+ private final AbstractCollectionType collectionType;
+ private final int firstValueIndex;
+
+ ArrayValueAssembler(int level, AssemblerInfo info, int firstValueIndex) {
+ super(level, info);
+ this.firstValueIndex = firstValueIndex;
+ collectionType = (AbstractCollectionType) info.getDeclaredType();
+ listBuilder = new ListBuilderFactory().create(collectionType.getTypeTag());
+ }
+
+ final int getFirstValueIndex() {
+ return firstValueIndex;
+ }
+
+ @Override
+ void reset() {
+ listBuilder.reset(collectionType);
+ storage.reset();
+ }
+
+ @Override
+ void addValue(AbstractValueAssembler value) throws HyracksDataException {
+ listBuilder.addItem(value.getValue());
+ }
+
+ @Override
+ void addNull(AbstractValueAssembler value) throws HyracksDataException {
+ listBuilder.addItem(NULL);
+ }
+
+ @Override
+ void addMissing() throws HyracksDataException {
+ listBuilder.addItem(MISSING);
+ }
+
+ @Override
+ void addValueToParent() throws HyracksDataException {
+ storage.reset();
+ listBuilder.write(storage.getDataOutput(), true);
+ getParent().addValue(this);
+ }
+
+ @Override
+ public IValueReference getValue() {
+ return storage;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayWithUnionValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayWithUnionValueAssembler.java
new file mode 100644
index 0000000..dcd240b
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayWithUnionValueAssembler.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.UnionSchemaNode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ArrayWithUnionValueAssembler extends ArrayValueAssembler {
+ private final int numberOfUnionChildren;
+ private int numberOfAddedValues;
+ private boolean nonMissingValueAdded;
+
+ ArrayWithUnionValueAssembler(int level, AssemblerInfo info, int firstValueIndex, AbstractSchemaNode itemNode) {
+ super(level, info, firstValueIndex);
+ this.numberOfUnionChildren = ((UnionSchemaNode) itemNode).getChildren().size();
+ }
+
+ @Override
+ void reset() {
+ numberOfAddedValues = 0;
+ nonMissingValueAdded = false;
+ super.reset();
+ }
+
+ @Override
+ void addValue(AbstractValueAssembler value) throws HyracksDataException {
+ nonMissingValueAdded = true;
+ numberOfAddedValues++;
+ super.addValue(value);
+ }
+
+ @Override
+ void addNull(AbstractValueAssembler value) throws HyracksDataException {
+ nonMissingValueAdded = true;
+ numberOfAddedValues++;
+ super.addNull(value);
+ }
+
+ @Override
+ void addMissing() throws HyracksDataException {
+ numberOfAddedValues++;
+ if (nonMissingValueAdded && numberOfAddedValues >= numberOfUnionChildren) {
+ nonMissingValueAdded = false;
+ numberOfAddedValues = numberOfAddedValues % numberOfUnionChildren;
+ } else if (numberOfAddedValues == numberOfUnionChildren) {
+ super.addMissing();
+ numberOfAddedValues = 0;
+ }
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerInfo.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerInfo.java
new file mode 100644
index 0000000..712e65c
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerInfo.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+
+public class AssemblerInfo {
+ private final AbstractNestedValueAssembler parent;
+ private final IAType declaredType;
+ private final boolean delegate;
+ private final IValueReference fieldName;
+ private final int fieldIndex;
+
+ public AssemblerInfo() {
+ this(BuiltinType.ANY, null, false);
+ }
+
+ public AssemblerInfo(IAType declaredType, EmptyAssembler parent) {
+ this(declaredType, parent, false);
+ }
+
+ public AssemblerInfo(IAType declaredType, AbstractNestedValueAssembler parent, boolean delegate) {
+ this(declaredType, parent, delegate, null, -1);
+ }
+
+ public AssemblerInfo(IAType declaredType, AbstractNestedValueAssembler parent, boolean delegate,
+ IValueReference fieldName) {
+ this(declaredType, parent, delegate, fieldName, -1);
+ }
+
+ public AssemblerInfo(IAType declaredType, AbstractNestedValueAssembler parent, boolean delegate, int fieldIndex) {
+ this(declaredType, parent, delegate, null, fieldIndex);
+ }
+
+ public AssemblerInfo(IAType declaredType, AbstractNestedValueAssembler parent, boolean delegate,
+ IValueReference fieldName, int fieldIndex) {
+ this(declaredType, parent, delegate, fieldName, fieldIndex, false);
+ }
+
+ public AssemblerInfo(IAType declaredType, AbstractNestedValueAssembler parent, boolean delegate,
+ IValueReference fieldName, int fieldIndex, boolean fieldNameTagged) {
+ this.parent = parent;
+ this.declaredType = declaredType;
+ this.delegate = delegate;
+ this.fieldName = fieldNameTagged ? fieldName : createTaggedFieldName(fieldName);
+ this.fieldIndex = fieldIndex;
+ }
+
+ private IValueReference createTaggedFieldName(IValueReference fieldName) {
+ if (fieldName == null) {
+ return null;
+ }
+ byte[] storage = new byte[1 + fieldName.getLength()];
+ storage[0] = ATypeTag.STRING.serialize();
+ System.arraycopy(fieldName.getByteArray(), fieldName.getStartOffset(), storage, 1, fieldName.getLength());
+ VoidPointable taggedFieldName = new VoidPointable();
+ taggedFieldName.set(storage, 0, storage.length);
+ return taggedFieldName;
+ }
+
+ public AbstractNestedValueAssembler getParent() {
+ return parent;
+ }
+
+ public IAType getDeclaredType() {
+ return declaredType;
+ }
+
+ public boolean isDelegate() {
+ return delegate;
+ }
+
+ public IValueReference getFieldName() {
+ return fieldName;
+ }
+
+ public int getFieldIndex() {
+ return fieldIndex;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/EmptyAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/EmptyAssembler.java
new file mode 100644
index 0000000..406a401
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/EmptyAssembler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class EmptyAssembler extends AbstractNestedValueAssembler {
+
+ EmptyAssembler() {
+ super(-1, new AssemblerInfo());
+ }
+
+ @Override
+ void addValue(AbstractValueAssembler value) throws HyracksDataException {
+ //noOp
+ }
+
+ @Override
+ void addValueToParent() throws HyracksDataException {
+ //noOp
+ }
+
+ @Override
+ void addNull(AbstractValueAssembler value) throws HyracksDataException {
+ //noOp
+ }
+
+ @Override
+ public IValueReference getValue() throws HyracksDataException {
+ return null;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ObjectValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ObjectValueAssembler.java
new file mode 100644
index 0000000..536ce02
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ObjectValueAssembler.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class ObjectValueAssembler extends AbstractNestedValueAssembler {
+ private final RecordBuilder recordBuilder;
+ private final ARecordType recordType;
+
+ ObjectValueAssembler(int level, AssemblerInfo info) {
+ super(level, info);
+ recordBuilder = new RecordBuilder();
+ recordType = (ARecordType) info.getDeclaredType();
+ }
+
+ @Override
+ void reset() {
+ recordBuilder.reset(recordType);
+ storage.reset();
+ }
+
+ @Override
+ void addValue(AbstractValueAssembler value) throws HyracksDataException {
+ int valueIndex = value.getFieldIndex();
+ if (valueIndex >= 0) {
+ recordBuilder.addField(valueIndex, value.getValue());
+ } else {
+ recordBuilder.addField(value.getFieldName(), value.getValue());
+ }
+ }
+
+ @Override
+ void addNull(AbstractValueAssembler value) throws HyracksDataException {
+ int valueIndex = value.getFieldIndex();
+ if (valueIndex >= 0) {
+ recordBuilder.addField(valueIndex, NULL);
+ } else {
+ recordBuilder.addField(value.getFieldName(), NULL);
+ }
+ }
+
+ @Override
+ void addValueToParent() throws HyracksDataException {
+ storage.reset();
+ recordBuilder.write(storage.getDataOutput(), true);
+ getParent().addValue(this);
+ }
+
+ @Override
+ public IValueReference getValue() {
+ return storage;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/PrimitiveValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/PrimitiveValueAssembler.java
new file mode 100644
index 0000000..9592a12
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/PrimitiveValueAssembler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import org.apache.asterix.column.assembler.value.IValueGetter;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class PrimitiveValueAssembler extends AbstractPrimitiveValueAssembler {
+
+ PrimitiveValueAssembler(int level, AssemblerInfo info, IColumnValuesReader reader, IValueGetter primitiveValue) {
+ super(level, info, reader, primitiveValue);
+ }
+
+ @Override
+ public int next() throws HyracksDataException {
+ if (!reader.next()) {
+ throw new IllegalAccessError("no more values");
+ } else if (reader.isNull() && (isDelegate() || reader.getLevel() + 1 == level)) {
+ addNullToAncestor(reader.getLevel());
+ } else if (reader.isValue()) {
+ addValueToParent();
+ }
+
+ if (isDelegate()) {
+ getParent().end();
+ }
+ //Go to next value
+ return -1;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/RepeatedPrimitiveValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/RepeatedPrimitiveValueAssembler.java
new file mode 100644
index 0000000..8fa228f
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/RepeatedPrimitiveValueAssembler.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.column.assembler.value.IValueGetter;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+class RepeatedPrimitiveValueAssembler extends AbstractPrimitiveValueAssembler {
+ private final List<ArrayValueAssembler> arrays;
+
+ RepeatedPrimitiveValueAssembler(int level, AssemblerInfo info, IColumnValuesReader reader,
+ IValueGetter primitiveValue) {
+ super(level, info, reader, primitiveValue);
+ this.arrays = new ArrayList<>();
+ }
+
+ public void addArray(ArrayValueAssembler assembler) {
+ arrays.add(assembler);
+ }
+
+ @Override
+ public int next() throws HyracksDataException {
+ if (!reader.next()) {
+ throw new IllegalStateException("No more values");
+ } else if (reader.isNull() && (!arrays.isEmpty() || reader.getLevel() + 1 == level)) {
+ /*
+ * There are two cases here for where the null belongs to:
+ * 1- If the null is an array item, then add it
+ * 2- If the null is an ancestor, then we only add null if this column is the array delegate
+ * (i.e., !arrays.isEmpty())
+ */
+ addNullToAncestor(reader.getLevel());
+ } else if (reader.isMissing() && reader.getLevel() + 1 == level) {
+ /*
+ * Add a missing item
+ */
+ addMissingToAncestor(reader.getLevel());
+ } else if (reader.isValue()) {
+ addValueToParent();
+ }
+
+ if (isDelegate()) {
+ getParent().end();
+ }
+
+ //Initially, go to the next primitive assembler
+ int nextIndex = NEXT_ASSEMBLER;
+ if (!arrays.isEmpty()) {
+ /*
+ * This assembler is a delegate of a repeated group
+ * The delimiter index tells us that this assembler is responsible for a finished group
+ */
+ int delimiterIndex = reader.getDelimiterIndex();
+ if (delimiterIndex < arrays.size() && reader.isDelimiter()) {
+ //Also finish the next group
+ delimiterIndex++;
+ }
+
+ int numberOfFinishedGroups = Math.min(delimiterIndex, arrays.size());
+ for (int i = 0; i < numberOfFinishedGroups; i++) {
+ //I'm the delegate for this group of repeated values and the group(s) is finished
+ ArrayValueAssembler assembler = arrays.get(i);
+ assembler.end();
+ }
+
+ //Is the repeated group (determined by the delimiter index) still unfinished?
+ if (delimiterIndex < arrays.size()) {
+ //Yes, go to the first value of the unfinished repeated group
+ nextIndex = arrays.get(delimiterIndex).getFirstValueIndex();
+ }
+ }
+
+ //Go to next value
+ return nextIndex;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/AbstractFixedLengthValueGetter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/AbstractFixedLengthValueGetter.java
new file mode 100644
index 0000000..aeef686
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/AbstractFixedLengthValueGetter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler.value;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+
+public abstract class AbstractFixedLengthValueGetter implements IValueGetter {
+ protected final VoidPointable value;
+
+ AbstractFixedLengthValueGetter(ATypeTag typeTag, int nonTaggedLength) {
+ //+1 for the type tag
+ byte[] storage = new byte[1 + nonTaggedLength];
+ storage[0] = typeTag.serialize();
+ value = new VoidPointable();
+ value.set(storage, 0, storage.length);
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/BooleanValueGetter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/BooleanValueGetter.java
new file mode 100644
index 0000000..4a776ab
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/BooleanValueGetter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler.value;
+
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.BooleanPointable;
+
+class BooleanValueGetter extends AbstractFixedLengthValueGetter {
+ BooleanValueGetter() {
+ super(ATypeTag.BOOLEAN, 1);
+ }
+
+ @Override
+ public IValueReference getValue(IColumnValuesReader reader) {
+ BooleanPointable.setBoolean(value.getByteArray(), value.getStartOffset() + 1, reader.getBoolean());
+ return value;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/DoubleValueGetter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/DoubleValueGetter.java
new file mode 100644
index 0000000..2e88896
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/DoubleValueGetter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler.value;
+
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+
+class DoubleValueGetter extends AbstractFixedLengthValueGetter {
+ DoubleValueGetter() {
+ super(ATypeTag.DOUBLE, Double.BYTES);
+ }
+
+ @Override
+ public IValueReference getValue(IColumnValuesReader reader) {
+ DoublePointable.setDouble(value.getByteArray(), value.getStartOffset() + 1, reader.getDouble());
+ return value;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetter.java
new file mode 100644
index 0000000..9e58ab8
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler.value;
+
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+@FunctionalInterface
+public interface IValueGetter {
+ IValueReference getValue(IColumnValuesReader reader);
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetterFactory.java
new file mode 100644
index 0000000..0b58cfc4
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetterFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler.value;
+
+import org.apache.asterix.om.types.ATypeTag;
+
+@FunctionalInterface
+public interface IValueGetterFactory {
+ IValueGetter createValueGetter(ATypeTag typeTag);
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java
new file mode 100644
index 0000000..e76e3c9
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler.value;
+
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+
+class LongValueGetter extends AbstractFixedLengthValueGetter {
+ LongValueGetter() {
+ super(ATypeTag.BIGINT, Long.BYTES);
+ }
+
+ @Override
+ public IValueReference getValue(IColumnValuesReader reader) {
+ LongPointable.setLong(value.getByteArray(), value.getStartOffset() + 1, reader.getLong());
+ return value;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/MissingValueGetter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/MissingValueGetter.java
new file mode 100644
index 0000000..1ae84ee
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/MissingValueGetter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler.value;
+
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+
+public class MissingValueGetter implements IValueGetter {
+ public static final IValueGetter INSTANCE = new MissingValueGetter();
+ private static final VoidPointable MISSING;
+
+ static {
+ MISSING = new VoidPointable();
+ MISSING.set(new byte[] { ATypeTag.MISSING.serialize() }, 0, 1);
+ }
+
+ private MissingValueGetter() {
+ }
+
+ @Override
+ public IValueReference getValue(IColumnValuesReader reader) {
+ return MISSING;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/NullValueGetter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/NullValueGetter.java
new file mode 100644
index 0000000..e050252
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/NullValueGetter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler.value;
+
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+
+public class NullValueGetter implements IValueGetter {
+ public static final IValueGetter INSTANCE = new NullValueGetter();
+ private static final VoidPointable NULL;
+
+ static {
+ NULL = new VoidPointable();
+ NULL.set(new byte[] { ATypeTag.NULL.serialize() }, 0, 1);
+ }
+
+ private NullValueGetter() {
+ }
+
+ @Override
+ public IValueReference getValue(IColumnValuesReader reader) {
+ return NULL;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/StringValueGetter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/StringValueGetter.java
new file mode 100644
index 0000000..1dd1aa7
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/StringValueGetter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler.value;
+
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+class StringValueGetter implements IValueGetter {
+ private final ArrayBackedValueStorage value;
+
+ public StringValueGetter() {
+ value = new ArrayBackedValueStorage();
+ }
+
+ @Override
+ public IValueReference getValue(IColumnValuesReader reader) {
+ IValueReference string = reader.getBytes();
+ value.setSize(1 + string.getLength());
+ byte[] bytes = value.getByteArray();
+ bytes[0] = ATypeTag.STRING.serialize();
+ System.arraycopy(string.getByteArray(), string.getStartOffset(), bytes, 1, string.getLength());
+ return value;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/UUIDValueGetter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/UUIDValueGetter.java
new file mode 100644
index 0000000..135ed85
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/UUIDValueGetter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler.value;
+
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+class UUIDValueGetter extends AbstractFixedLengthValueGetter {
+ UUIDValueGetter() {
+ super(ATypeTag.UUID, 16);
+ }
+
+ @Override
+ public IValueReference getValue(IColumnValuesReader reader) {
+ IValueReference uuid = reader.getBytes();
+ System.arraycopy(uuid.getByteArray(), uuid.getStartOffset(), value.getByteArray(), value.getStartOffset() + 1,
+ uuid.getLength());
+ return value;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/ValueGetterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/ValueGetterFactory.java
new file mode 100644
index 0000000..5f7fd7e
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/ValueGetterFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler.value;
+
+import org.apache.asterix.om.types.ATypeTag;
+
+public class ValueGetterFactory implements IValueGetterFactory {
+ public static final IValueGetterFactory INSTANCE = new ValueGetterFactory();
+
+ private ValueGetterFactory() {
+ }
+
+ @Override
+ public IValueGetter createValueGetter(ATypeTag typeTag) {
+ switch (typeTag) {
+ case NULL:
+ return NullValueGetter.INSTANCE;
+ case MISSING:
+ return MissingValueGetter.INSTANCE;
+ case BOOLEAN:
+ return new BooleanValueGetter();
+ case BIGINT:
+ return new LongValueGetter();
+ case DOUBLE:
+ return new DoubleValueGetter();
+ case STRING:
+ return new StringValueGetter();
+ case UUID:
+ return new UUIDValueGetter();
+ default:
+ throw new UnsupportedOperationException(typeTag + " is not supported");
+ }
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
new file mode 100644
index 0000000..4cbe09b
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.flush;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ISchemaNodeVisitor;
+import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
+import org.apache.asterix.column.metadata.schema.UnionSchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import org.apache.asterix.column.values.IColumnBatchWriter;
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class BatchFinalizerVisitor implements ISchemaNodeVisitor<Void, AbstractSchemaNestedNode> {
+ private final FlushColumnMetadata columnSchemaMetadata;
+ private final IColumnValuesWriter[] primaryKeyWriters;
+ private final PriorityQueue<IColumnValuesWriter> orderedColumns;
+ private int level;
+
+ public BatchFinalizerVisitor(FlushColumnMetadata columnSchemaMetadata) {
+ this.columnSchemaMetadata = columnSchemaMetadata;
+ orderedColumns = new PriorityQueue<>(Comparator.comparingInt(x -> -x.getEstimatedSize()));
+ int numberOfPrimaryKeys = columnSchemaMetadata.getNumberOfPrimaryKeys();
+ primaryKeyWriters = new IColumnValuesWriter[numberOfPrimaryKeys];
+ for (int i = 0; i < numberOfPrimaryKeys; i++) {
+ primaryKeyWriters[i] = columnSchemaMetadata.getWriter(i);
+ }
+ level = -1;
+ }
+
+ public int finalizeBatch(IColumnBatchWriter batchWriter, FlushColumnMetadata columnMetadata)
+ throws HyracksDataException {
+ orderedColumns.clear();
+
+ columnMetadata.getRoot().accept(this, null);
+ if (columnMetadata.getMetaRoot() != null) {
+ columnMetadata.getMetaRoot().accept(this, null);
+ }
+
+ int allocatedSpace = batchWriter.writePrimaryKeyColumns(primaryKeyWriters);
+ allocatedSpace += batchWriter.writeColumns(orderedColumns);
+ return allocatedSpace;
+ }
+
+ @Override
+ public Void visit(ObjectSchemaNode objectNode, AbstractSchemaNestedNode arg) throws HyracksDataException {
+ level++;
+ columnSchemaMetadata.flushDefinitionLevels(level, arg, objectNode);
+ List<AbstractSchemaNode> children = objectNode.getChildren();
+ for (int i = 0; i < children.size(); i++) {
+ children.get(i).accept(this, objectNode);
+ }
+ objectNode.setCounter(0);
+ columnSchemaMetadata.clearDefinitionLevels(objectNode);
+ level--;
+ return null;
+ }
+
+ @Override
+ public Void visit(AbstractCollectionSchemaNode collectionNode, AbstractSchemaNestedNode arg)
+ throws HyracksDataException {
+ level++;
+ columnSchemaMetadata.flushDefinitionLevels(level, arg, collectionNode);
+ collectionNode.getItemNode().accept(this, collectionNode);
+ collectionNode.setCounter(0);
+ columnSchemaMetadata.clearDefinitionLevels(collectionNode);
+ level--;
+ return null;
+ }
+
+ @Override
+ public Void visit(UnionSchemaNode unionNode, AbstractSchemaNestedNode arg) throws HyracksDataException {
+ columnSchemaMetadata.flushDefinitionLevels(level, arg, unionNode);
+ for (AbstractSchemaNode node : unionNode.getChildren().values()) {
+ node.accept(this, unionNode);
+ }
+ unionNode.setCounter(0);
+ columnSchemaMetadata.clearDefinitionLevels(unionNode);
+ return null;
+ }
+
+ @Override
+ public Void visit(PrimitiveSchemaNode primitiveNode, AbstractSchemaNestedNode arg) throws HyracksDataException {
+ columnSchemaMetadata.flushDefinitionLevels(level, arg, primitiveNode);
+ if (!primitiveNode.isPrimaryKey()) {
+ orderedColumns.add(columnSchemaMetadata.getWriter(primitiveNode.getColumnIndex()));
+ }
+
+ //Prepare for the next batch
+ primitiveNode.setCounter(0);
+ return null;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java
new file mode 100644
index 0000000..48cd442
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.flush;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
+import org.apache.asterix.column.metadata.schema.UnionSchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.om.lazy.AbstractLazyVisitablePointable;
+import org.apache.asterix.om.lazy.AbstractListLazyVisitablePointable;
+import org.apache.asterix.om.lazy.FlatLazyVisitablePointable;
+import org.apache.asterix.om.lazy.ILazyVisitablePointableVisitor;
+import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
+
+public class ColumnTransformer implements ILazyVisitablePointableVisitor<AbstractSchemaNode, AbstractSchemaNode> {
+ private final FlushColumnMetadata columnMetadata;
+ private final VoidPointable nonTaggedValue;
+ private final ObjectSchemaNode root;
+ private AbstractSchemaNestedNode currentParent;
+ private int primaryKeysLength;
+
+ public ColumnTransformer(FlushColumnMetadata columnMetadata, ObjectSchemaNode root) {
+ this.columnMetadata = columnMetadata;
+ this.root = root;
+ nonTaggedValue = new VoidPointable();
+ }
+
+ /**
+ * Transform a tuple in row format into columns
+ *
+ * @param pointable record pointable
+ * @return the estimated size (possibly overestimated) of the primary key(s) columns
+ */
+ public int transform(RecordLazyVisitablePointable pointable) throws HyracksDataException {
+ primaryKeysLength = 0;
+ pointable.accept(this, root);
+ return primaryKeysLength;
+ }
+
+ public int writeAntiMatter(LSMBTreeTupleReference tuple) throws HyracksDataException {
+ int pkSize = 0;
+ for (int i = 0; i < columnMetadata.getNumberOfPrimaryKeys(); i++) {
+ byte[] bytes = tuple.getFieldData(i);
+ int start = tuple.getFieldStart(i);
+ ATypeTag tag = ATypeTag.VALUE_TYPE_MAPPING[bytes[start]];
+ nonTaggedValue.set(bytes, start + 1, tuple.getFieldLength(i) - 1);
+ IColumnValuesWriter writer = columnMetadata.getWriter(i);
+ writer.writeAntiMatter(tag, nonTaggedValue);
+ pkSize += writer.getEstimatedSize();
+ }
+ return pkSize;
+ }
+
+ @Override
+ public AbstractSchemaNode visit(RecordLazyVisitablePointable pointable, AbstractSchemaNode arg)
+ throws HyracksDataException {
+ columnMetadata.enterNode(currentParent, arg);
+ AbstractSchemaNestedNode previousParent = currentParent;
+
+ ObjectSchemaNode objectNode = (ObjectSchemaNode) arg;
+ currentParent = objectNode;
+ for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+ pointable.nextChild();
+ IValueReference fieldName = pointable.getFieldName();
+ ATypeTag childTypeTag = pointable.getChildTypeTag();
+ if (childTypeTag != ATypeTag.MISSING) {
+ //Only write actual field values (including NULL) but ignore MISSING fields
+ AbstractSchemaNode childNode = objectNode.getOrCreateChild(fieldName, childTypeTag, columnMetadata);
+ acceptActualNode(pointable.getChildVisitablePointable(), childNode);
+ }
+ }
+
+ columnMetadata.exitNode(arg);
+ currentParent = previousParent;
+ return null;
+ }
+
+ @Override
+ public AbstractSchemaNode visit(AbstractListLazyVisitablePointable pointable, AbstractSchemaNode arg)
+ throws HyracksDataException {
+ columnMetadata.enterNode(currentParent, arg);
+ AbstractSchemaNestedNode previousParent = currentParent;
+
+ AbstractCollectionSchemaNode collectionNode = (AbstractCollectionSchemaNode) arg;
+ RunLengthIntArray defLevels = columnMetadata.getDefinitionLevels(collectionNode);
+ //the level at which an item is missing
+ int missingLevel = columnMetadata.getLevel();
+ currentParent = collectionNode;
+
+ int numberOfChildren = pointable.getNumberOfChildren();
+ for (int i = 0; i < numberOfChildren; i++) {
+ pointable.nextChild();
+ ATypeTag childTypeTag = pointable.getChildTypeTag();
+ AbstractSchemaNode childNode = collectionNode.getOrCreateItem(childTypeTag, columnMetadata);
+ acceptActualNode(pointable.getChildVisitablePointable(), childNode);
+ /*
+ * The array item may change (e.g., BIGINT --> UNION). Thus, new items would be considered as missing
+ */
+ defLevels.add(missingLevel);
+ }
+
+ columnMetadata.exitCollectionNode(collectionNode, numberOfChildren);
+ currentParent = previousParent;
+ return null;
+ }
+
+ @Override
+ public AbstractSchemaNode visit(FlatLazyVisitablePointable pointable, AbstractSchemaNode arg)
+ throws HyracksDataException {
+ columnMetadata.enterNode(currentParent, arg);
+ ATypeTag valueTypeTag = pointable.getTypeTag();
+ PrimitiveSchemaNode node = (PrimitiveSchemaNode) arg;
+ IColumnValuesWriter writer = columnMetadata.getWriter(node.getColumnIndex());
+ if (valueTypeTag == ATypeTag.MISSING) {
+ writer.writeLevel(columnMetadata.getLevel());
+ } else if (valueTypeTag == ATypeTag.NULL) {
+ writer.writeNull(columnMetadata.getLevel());
+ } else if (pointable.isTagged()) {
+ //Remove type tag
+ nonTaggedValue.set(pointable.getByteArray(), pointable.getStartOffset() + 1, pointable.getLength() - 1);
+ writer.writeValue(pointable.getTypeTag(), nonTaggedValue);
+ } else {
+ writer.writeValue(pointable.getTypeTag(), pointable);
+ }
+ if (node.isPrimaryKey()) {
+ primaryKeysLength += writer.getEstimatedSize();
+ }
+ columnMetadata.exitNode(arg);
+ return null;
+ }
+
+ private void acceptActualNode(AbstractLazyVisitablePointable pointable, AbstractSchemaNode node)
+ throws HyracksDataException {
+ if (node.getTypeTag() == ATypeTag.UNION) {
+ columnMetadata.enterNode(currentParent, node);
+ AbstractSchemaNestedNode previousParent = currentParent;
+
+ UnionSchemaNode unionNode = (UnionSchemaNode) node;
+ currentParent = unionNode;
+
+ ATypeTag childTypeTag = pointable.getTypeTag();
+ AbstractSchemaNode actualNode;
+ if (childTypeTag == ATypeTag.NULL || childTypeTag == ATypeTag.MISSING) {
+ actualNode = unionNode.getOriginalType();
+ } else {
+ actualNode = unionNode.getOrCreateChild(pointable.getTypeTag(), columnMetadata);
+ }
+ pointable.accept(this, actualNode);
+
+ currentParent = previousParent;
+ columnMetadata.exitNode(node);
+ } else if (pointable.getTypeTag() == ATypeTag.NULL && node.isNested()) {
+ columnMetadata.addNestedNull(currentParent, (AbstractSchemaNestedNode) node);
+ } else {
+ pointable.accept(this, node);
+ }
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
new file mode 100644
index 0000000..9b1b0a2
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.flush;
+
+import org.apache.asterix.column.metadata.AbstractColumnImmutableReadMetadata;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReaderWriterFactory;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+
+public class FlushColumnTupleReaderWriterFactory extends AbstractColumnTupleReaderWriterFactory {
+ private static final long serialVersionUID = -9197679192729634493L;
+
+ public FlushColumnTupleReaderWriterFactory(int pageSize, int maxNumberOfTuples, float tolerance) {
+ super(pageSize, maxNumberOfTuples, tolerance);
+ }
+
+ @Override
+ public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata) {
+ FlushColumnMetadata flushColumnMetadata = (FlushColumnMetadata) columnMetadata;
+ if (flushColumnMetadata.getMetaType() == null) {
+ //no meta
+ return new FlushColumnTupleWriter(flushColumnMetadata, pageSize, maxNumberOfTuples, tolerance);
+ }
+ return new FlushColumnTupleWithMetaWriter(flushColumnMetadata, pageSize, maxNumberOfTuples, tolerance);
+ }
+
+ @Override
+ public AbstractColumnTupleReader createColumnReader(IColumnProjectionInfo columnProjectionInfo) {
+ return ((AbstractColumnImmutableReadMetadata) columnProjectionInfo).createTupleReader();
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
new file mode 100644
index 0000000..9c527da
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.flush;
+
+import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
+import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
+
+public class FlushColumnTupleWithMetaWriter extends FlushColumnTupleWriter {
+ private final ColumnTransformer metaColumnTransformer;
+ private final RecordLazyVisitablePointable metaPointable;
+
+ public FlushColumnTupleWithMetaWriter(FlushColumnMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
+ float tolerance) {
+ super(columnMetadata, pageSize, maxNumberOfTuples, tolerance);
+ metaColumnTransformer = new ColumnTransformer(columnMetadata, columnMetadata.getMetaRoot());
+ metaPointable = new TypedRecordLazyVisitablePointable(columnMetadata.getMetaType());
+ }
+
+ @Override
+ protected void writeMeta(LSMBTreeTupleReference btreeTuple) throws HyracksDataException {
+ if (btreeTuple.isAntimatter()) {
+ return;
+ }
+
+ int metaFieldId = columnMetadata.getMetaRecordFieldIndex();
+ metaPointable.set(btreeTuple.getFieldData(metaFieldId), btreeTuple.getFieldStart(metaFieldId),
+ btreeTuple.getFieldLength(metaFieldId));
+ //In case the primary key is not in the meta part, we take the maximum
+ primaryKeysEstimatedSize = Math.max(metaColumnTransformer.transform(metaPointable), primaryKeysEstimatedSize);
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
new file mode 100644
index 0000000..1af043f
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.flush;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.column.values.writer.ColumnBatchWriter;
+import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
+import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
+import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
+
+public class FlushColumnTupleWriter extends AbstractColumnTupleWriter {
+ protected final FlushColumnMetadata columnMetadata;
+ protected final BatchFinalizerVisitor finalizer;
+ protected final ColumnBatchWriter writer;
+
+ private final ColumnTransformer transformer;
+ private final RecordLazyVisitablePointable pointable;
+ private final int maxNumberOfTuples;
+
+ protected int primaryKeysEstimatedSize;
+
+ public FlushColumnTupleWriter(FlushColumnMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
+ float tolerance) {
+ this.columnMetadata = columnMetadata;
+ transformer = new ColumnTransformer(columnMetadata, columnMetadata.getRoot());
+ finalizer = new BatchFinalizerVisitor(columnMetadata);
+ writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), pageSize, tolerance);
+ this.maxNumberOfTuples = maxNumberOfTuples;
+ pointable = new TypedRecordLazyVisitablePointable(columnMetadata.getDatasetType());
+ }
+
+ @Override
+ public final void init(IColumnWriteMultiPageOp multiPageOp) throws HyracksDataException {
+ columnMetadata.init(multiPageOp);
+ }
+
+ @Override
+ public final int getNumberOfColumns() {
+ return columnMetadata.getNumberOfColumns();
+ }
+
+ @Override
+ public final int bytesRequired(ITupleReference tuple) {
+ int primaryKeysSize = 0;
+ for (int i = 0; i < columnMetadata.getNumberOfPrimaryKeys(); i++) {
+ primaryKeysSize += tuple.getFieldLength(i);
+ }
+
+ //Mostly it is an overestimated size
+ return primaryKeysSize;
+ }
+
+ @Override
+ public final int getOccupiedSpace() {
+ int numberOfColumns = getNumberOfColumns();
+ int filterSize = numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE;
+ return primaryKeysEstimatedSize + filterSize;
+ }
+
+ @Override
+ public final int getMaxNumberOfTuples() {
+ return maxNumberOfTuples;
+ }
+
+ @Override
+ public final void close() {
+ columnMetadata.close();
+ }
+
+ @Override
+ public void writeTuple(ITupleReference tuple) throws HyracksDataException {
+ //This from an in-memory component, hence the cast
+ LSMBTreeTupleReference btreeTuple = (LSMBTreeTupleReference) tuple;
+ if (btreeTuple.isAntimatter()) {
+ //Write only the primary keys of an anti-matter tuple
+ primaryKeysEstimatedSize = transformer.writeAntiMatter(btreeTuple);
+ return;
+ }
+ writeRecord(tuple);
+ writeMeta(btreeTuple);
+ }
+
+ @Override
+ public final int flush(ByteBuffer pageZero) throws HyracksDataException {
+ writer.setPageZeroBuffer(pageZero, getNumberOfColumns(), columnMetadata.getNumberOfPrimaryKeys());
+ return finalizer.finalizeBatch(writer, columnMetadata);
+ }
+
+ protected void writeRecord(ITupleReference tuple) throws HyracksDataException {
+ int recordFieldId = columnMetadata.getRecordFieldIndex();
+ pointable.set(tuple.getFieldData(recordFieldId), tuple.getFieldStart(recordFieldId),
+ tuple.getFieldLength(recordFieldId));
+ primaryKeysEstimatedSize = transformer.transform(pointable);
+ }
+
+ protected void writeMeta(LSMBTreeTupleReference btreeTuple) throws HyracksDataException {
+ //NoOp
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
new file mode 100644
index 0000000..0c1990f
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.load;
+
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnTupleReaderWriterFactory;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
+
+public class LoadColumnTupleReaderWriterFactory extends FlushColumnTupleReaderWriterFactory {
+ private static final long serialVersionUID = -7583574057314353873L;
+
+ public LoadColumnTupleReaderWriterFactory(int pageSize, int maxNumberOfTuples, float tolerance) {
+ super(pageSize, maxNumberOfTuples, tolerance);
+ }
+
+ @Override
+ public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata) {
+ return new LoadColumnTupleWriter((FlushColumnMetadata) columnMetadata, pageSize, maxNumberOfTuples, tolerance);
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
new file mode 100644
index 0000000..e4604da
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.load;
+
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnTupleWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class LoadColumnTupleWriter extends FlushColumnTupleWriter {
+ public LoadColumnTupleWriter(FlushColumnMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
+ float tolerance) {
+ super(columnMetadata, pageSize, maxNumberOfTuples, tolerance);
+ }
+
+ @Override
+ public void writeTuple(ITupleReference tuple) throws HyracksDataException {
+ writeRecord(tuple);
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/IEndOfPageCallBack.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/IEndOfPageCallBack.java
new file mode 100644
index 0000000..93df021
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/IEndOfPageCallBack.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.merge;
+
+import org.apache.asterix.column.tuple.MergeColumnTupleReference;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeRangeSearchCursor;
+
+/**
+ * An interface to signal {@link MergeColumnTupleWriter} that a component's page has reached the end.
+ */
+@FunctionalInterface
+public interface IEndOfPageCallBack {
+ /**
+ * Call {@link MergeColumnTupleWriter} to finish the current "vertical" merging batch.
+ * The caller of this method is {@link MergeColumnTupleReference#lastTupleReached()}
+ *
+ * @see ColumnBTreeRangeSearchCursor#doHasNext()
+ */
+ void callEnd(MergeColumnTupleReference columnTuple) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java
new file mode 100644
index 0000000..11f3059
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.merge;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.asterix.column.metadata.AbstractColumnImmutableReadMetadata;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.IColumnValuesReaderFactory;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+
+/**
+ * Merge column read metadata belongs to read an {@link ILSMDiskComponent}
+ * This only for reading an existing on-disk component for a merge operation. The schema here is immutable and cannot
+ * be changed.
+ */
+public final class MergeColumnReadMetadata extends AbstractColumnImmutableReadMetadata {
+ private final IColumnValuesReader[] columnReaders;
+
+ private MergeColumnReadMetadata(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys,
+ IColumnValuesReader[] columnReaders, IValueReference serializedMetadata) {
+ super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata, columnReaders.length);
+ this.columnReaders = columnReaders;
+ }
+
+ /**
+ * create ColumnMergeReadMetadata from columnMetadata
+ *
+ * @param serializedMetadata columnMetadata
+ * @return {@link MergeColumnReadMetadata}
+ * @see FlushColumnMetadata#serializeColumnsMetadata() for more information about serialization order
+ */
+ public static MergeColumnReadMetadata create(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys,
+ IColumnValuesReaderFactory readerFactory, IValueReference serializedMetadata) throws IOException {
+ byte[] bytes = serializedMetadata.getByteArray();
+ int offset = serializedMetadata.getStartOffset();
+ int length = serializedMetadata.getLength();
+
+ int pathInfoStart = offset + IntegerPointable.getInteger(bytes, offset + PATH_INFO_POINTER);
+ DataInput input = new DataInputStream(new ByteArrayInputStream(bytes, pathInfoStart, length));
+ int numberOfColumns = input.readInt();
+ IColumnValuesReader[] columnReaders = new IColumnValuesReader[numberOfColumns];
+ for (int i = 0; i < numberOfColumns; i++) {
+ IColumnValuesReader columnReader = readerFactory.createValueReader(input);
+ //The order at which the path info was written is not ordered by the column index
+ columnReaders[columnReader.getColumnIndex()] = columnReader;
+ }
+
+ return new MergeColumnReadMetadata(datasetType, metaType, numberOfPrimaryKeys, columnReaders,
+ serializedMetadata);
+ }
+
+ public IColumnValuesReader[] getColumnReaders() {
+ return columnReaders;
+ }
+
+ @Override
+ public int getColumnIndex(int ordinal) {
+ return ordinal;
+ }
+
+ @Override
+ public int getNumberOfProjectedColumns() {
+ return columnReaders.length;
+ }
+
+ @Override
+ public AbstractColumnTupleReader createTupleReader() {
+ return new MergeColumnTupleReader(this);
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleProjector.java
new file mode 100644
index 0000000..f03506e
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleProjector.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.merge;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.column.values.IColumnValuesReaderFactory;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+
+public class MergeColumnTupleProjector implements IColumnTupleProjector {
+ private final ARecordType datasetType;
+ private final ARecordType metaType;
+ private final int numberOfPrimaryKeys;
+ private final IColumnValuesReaderFactory readerFactory;
+
+ public MergeColumnTupleProjector(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys,
+ IColumnValuesReaderFactory readerFactory) {
+ this.datasetType = datasetType;
+ this.metaType = metaType;
+ this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+ this.readerFactory = readerFactory;
+ }
+
+ @Override
+ public IColumnProjectionInfo createProjectionInfo(IValueReference columnMetadata) throws HyracksDataException {
+ try {
+ return MergeColumnReadMetadata.create(datasetType, metaType, numberOfPrimaryKeys, readerFactory,
+ columnMetadata);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public ITupleReference project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException {
+ throw new IllegalAccessError(getClass().getName());
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReader.java
new file mode 100644
index 0000000..4114f10
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReader.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.merge;
+
+import org.apache.asterix.column.metadata.AbstractColumnImmutableReadMetadata;
+import org.apache.asterix.column.tuple.MergeColumnTupleReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+
+public class MergeColumnTupleReader extends AbstractColumnTupleReader {
+ private final MergeColumnReadMetadata columnMetadata;
+
+ public MergeColumnTupleReader(AbstractColumnImmutableReadMetadata columnMetadata) {
+ this.columnMetadata = (MergeColumnReadMetadata) columnMetadata;
+ }
+
+ @Override
+ public IColumnTupleIterator createTupleIterator(ColumnBTreeReadLeafFrame frame, int componentIndex,
+ IColumnReadMultiPageOp multiPageOp) {
+ return new MergeColumnTupleReference(componentIndex, frame, columnMetadata, multiPageOp);
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
new file mode 100644
index 0000000..1ac94fe
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.merge;
+
+import org.apache.asterix.column.metadata.AbstractColumnImmutableReadMetadata;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReaderWriterFactory;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+
+public class MergeColumnTupleReaderWriterFactory extends AbstractColumnTupleReaderWriterFactory {
+ private static final long serialVersionUID = -2131401304338796428L;
+
+ public MergeColumnTupleReaderWriterFactory(int pageSize, int maxNumberOfTuples, float tolerance) {
+ super(pageSize, maxNumberOfTuples, tolerance);
+ }
+
+ @Override
+ public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata) {
+ MergeColumnWriteMetadata mergeWriteMetadata = (MergeColumnWriteMetadata) columnMetadata;
+ return new MergeColumnTupleWriter(mergeWriteMetadata, pageSize, maxNumberOfTuples, tolerance);
+ }
+
+ @Override
+ public AbstractColumnTupleReader createColumnReader(IColumnProjectionInfo columnProjectionInfo) {
+ return ((AbstractColumnImmutableReadMetadata) columnProjectionInfo).createTupleReader();
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
new file mode 100644
index 0000000..fbda6d0
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.merge;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.asterix.column.tuple.MergeColumnTupleReference;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.column.values.writer.ColumnBatchWriter;
+import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+
+public class MergeColumnTupleWriter extends AbstractColumnTupleWriter {
+ private final MergeColumnWriteMetadata columnMetadata;
+ private final MergeColumnTupleReference[] componentsTuples;
+ private final RunLengthIntArray writtenComponents;
+
+ private final IColumnValuesWriter[] primaryKeyWriters;
+ private final PriorityQueue<IColumnValuesWriter> orderedColumns;
+ private final ColumnBatchWriter writer;
+ private final int maxNumberOfTuples;
+ private int primaryKeysEstimatedSize;
+
+ public MergeColumnTupleWriter(MergeColumnWriteMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
+ float tolerance) {
+ this.columnMetadata = columnMetadata;
+ List<IColumnTupleIterator> componentsTuplesList = columnMetadata.getComponentsTuples();
+ this.componentsTuples = new MergeColumnTupleReference[componentsTuplesList.size()];
+ for (int i = 0; i < componentsTuplesList.size(); i++) {
+ MergeColumnTupleReference mergeTuple = (MergeColumnTupleReference) componentsTuplesList.get(i);
+ this.componentsTuples[i] = mergeTuple;
+ mergeTuple.registerEndOfPageCallBack(this::writeAllColumns);
+ }
+ this.writtenComponents = new RunLengthIntArray();
+ this.maxNumberOfTuples = maxNumberOfTuples;
+ writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), pageSize, tolerance);
+ writtenComponents.reset();
+
+ primaryKeyWriters = new IColumnValuesWriter[columnMetadata.getNumberOfPrimaryKeys()];
+ for (int i = 0; i < primaryKeyWriters.length; i++) {
+ primaryKeyWriters[i] = columnMetadata.getWriter(i);
+ }
+ orderedColumns = new PriorityQueue<>(Comparator.comparingInt(x -> -x.getEstimatedSize()));
+ }
+
+ @Override
+ public int bytesRequired(ITupleReference tuple) {
+ int primaryKeysSize = 0;
+ for (int i = 0; i < columnMetadata.getNumberOfPrimaryKeys(); i++) {
+ primaryKeysSize += tuple.getFieldLength(i);
+ }
+
+ return primaryKeysSize;
+ }
+
+ @Override
+ public void init(IColumnWriteMultiPageOp multiPageOp) throws HyracksDataException {
+ columnMetadata.init(multiPageOp);
+ }
+
+ @Override
+ public int getNumberOfColumns() {
+ return columnMetadata.getNumberOfColumns();
+ }
+
+ @Override
+ public int getMaxNumberOfTuples() {
+ return maxNumberOfTuples;
+ }
+
+ @Override
+ public int getOccupiedSpace() {
+ int numberOfColumns = getNumberOfColumns();
+ int filterSize = numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE;
+ return primaryKeysEstimatedSize + filterSize;
+ }
+
+ @Override
+ public void writeTuple(ITupleReference tuple) throws HyracksDataException {
+ MergeColumnTupleReference columnTuple = (MergeColumnTupleReference) tuple;
+ int componentIndex = columnTuple.getComponentIndex();
+ int skipCount = columnTuple.getAndResetSkipCount();
+ if (skipCount > 0) {
+ writtenComponents.add(-componentIndex, skipCount);
+ }
+ if (columnTuple.isAntimatter()) {
+ writtenComponents.add(-componentIndex);
+ } else {
+ writtenComponents.add(componentIndex);
+ }
+ writePrimaryKeys(columnTuple);
+ }
+
+ private void writePrimaryKeys(MergeColumnTupleReference columnTuple) throws HyracksDataException {
+ int primaryKeySize = 0;
+ for (int i = 0; i < columnMetadata.getNumberOfPrimaryKeys(); i++) {
+ IColumnValuesReader columnReader = columnTuple.getReader(i);
+ IColumnValuesWriter columnWriter = primaryKeyWriters[i];
+ columnReader.write(columnWriter, false);
+ primaryKeySize += columnWriter.getEstimatedSize();
+ }
+ primaryKeysEstimatedSize = primaryKeySize;
+ }
+
+ private void writeNonKeyColumns() throws HyracksDataException {
+ for (int i = 0; i < writtenComponents.getNumberOfBlocks(); i++) {
+ int componentIndex = writtenComponents.getBlockValue(i);
+ if (componentIndex < 0) {
+ //Skip writing values of deleted tuples
+ componentIndex = -componentIndex;
+ skipReaders(componentIndex, writtenComponents.getBlockSize(i));
+ continue;
+ }
+ MergeColumnTupleReference componentTuple = componentsTuples[componentIndex];
+ int count = writtenComponents.getBlockSize(i);
+ for (int j = columnMetadata.getNumberOfPrimaryKeys(); j < columnMetadata.getNumberOfColumns(); j++) {
+ IColumnValuesReader columnReader = componentTuple.getReader(j);
+ IColumnValuesWriter columnWriter = columnMetadata.getWriter(j);
+ columnReader.write(columnWriter, count);
+ }
+ }
+ }
+
+ private void skipReaders(int componentIndex, int count) throws HyracksDataException {
+ MergeColumnTupleReference componentTuple = componentsTuples[componentIndex];
+ for (int j = columnMetadata.getNumberOfPrimaryKeys(); j < columnMetadata.getNumberOfColumns(); j++) {
+ IColumnValuesReader columnReader = componentTuple.getReader(j);
+ columnReader.skip(count);
+ }
+ }
+
+ @Override
+ public int flush(ByteBuffer pageZero) throws HyracksDataException {
+ int numberOfColumns = columnMetadata.getNumberOfColumns();
+ int numberOfPrimaryKeys = columnMetadata.getNumberOfPrimaryKeys();
+ if (writtenComponents.getSize() > 0) {
+ writeNonKeyColumns();
+ writtenComponents.reset();
+ }
+ for (int i = numberOfPrimaryKeys; i < numberOfColumns; i++) {
+ orderedColumns.add(columnMetadata.getWriter(i));
+ }
+ writer.setPageZeroBuffer(pageZero, numberOfColumns, numberOfPrimaryKeys);
+ int allocatedSpace = writer.writePrimaryKeyColumns(primaryKeyWriters);
+ allocatedSpace += writer.writeColumns(orderedColumns);
+ return allocatedSpace;
+ }
+
+ @Override
+ public void close() {
+ columnMetadata.close();
+ }
+
+ private void writeAllColumns(MergeColumnTupleReference columnTuple) throws HyracksDataException {
+ /*
+ * The last tuple from one of the components was reached. Since we are going to the next leaf, we will not be
+ * able to access the readers of this component's leaf after this tuple. So, we are going to write
+ * the values of all columns as recorded in writtenComponents
+ */
+ int skipCount = columnTuple.getAndResetSkipCount();
+ if (skipCount > 0) {
+ writtenComponents.add(-columnTuple.getComponentIndex(), skipCount);
+ }
+ writeNonKeyColumns();
+ writtenComponents.reset();
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
new file mode 100644
index 0000000..b0d1a01
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.merge;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.column.metadata.AbstractColumnImmutableMetadata;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.column.values.IColumnValuesWriterFactory;
+import org.apache.asterix.column.values.writer.ColumnValuesWriterFactory;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+
+/**
+ * Merge column write metadata belongs to write a new merge {@link ILSMDiskComponent}
+ * This is for writing a new on-disk component by merging two or more on disk components. The final schema for this
+ * component will the most recent schema, which belongs to the newest merged component. The schema here is immutable
+ * and cannot be changed.
+ */
+public final class MergeColumnWriteMetadata extends AbstractColumnImmutableMetadata {
+ private final Mutable<IColumnWriteMultiPageOp> multiPageOpRef;
+ private final List<IColumnValuesWriter> columnWriters;
+ private final List<IColumnTupleIterator> componentsTuples;
+
+ /**
+ * For LSM Merge
+ */
+ private MergeColumnWriteMetadata(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys,
+ Mutable<IColumnWriteMultiPageOp> multiPageOpRef, List<IColumnValuesWriter> columnWriters,
+ IValueReference serializedMetadata, List<IColumnTupleIterator> componentsTuples) {
+ super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata, columnWriters.size());
+ this.multiPageOpRef = multiPageOpRef;
+ this.columnWriters = columnWriters;
+ this.componentsTuples = componentsTuples;
+ }
+
+ /**
+ * Set {@link IColumnWriteMultiPageOp} for {@link IColumnValuesWriter}
+ *
+ * @param multiPageOp multi-buffer allocator
+ */
+ public void init(IColumnWriteMultiPageOp multiPageOp) throws HyracksDataException {
+ multiPageOpRef.setValue(multiPageOp);
+
+ //Reset writer for the first write
+ for (int i = 0; i < columnWriters.size(); i++) {
+ columnWriters.get(i).reset();
+ }
+ }
+
+ public Mutable<IColumnWriteMultiPageOp> getMultiPageOpRef() {
+ return multiPageOpRef;
+ }
+
+ public IColumnValuesWriter getWriter(int columnIndex) {
+ return columnWriters.get(columnIndex);
+ }
+
+ public void close() {
+ multiPageOpRef.setValue(null);
+ for (int i = 0; i < columnWriters.size(); i++) {
+ columnWriters.get(i).close();
+ }
+ }
+
+ public static MergeColumnWriteMetadata create(ARecordType datasetType, ARecordType metaType,
+ int numberOfPrimaryKeys, Mutable<IColumnWriteMultiPageOp> multiPageOpRef,
+ IValueReference serializedMetadata, List<IColumnTupleIterator> componentsTuples) throws IOException {
+ byte[] bytes = serializedMetadata.getByteArray();
+ int offset = serializedMetadata.getStartOffset();
+ int length = serializedMetadata.getLength();
+
+ int writersOffset = offset + IntegerPointable.getInteger(bytes, offset + WRITERS_POINTER);
+ DataInput input = new DataInputStream(new ByteArrayInputStream(bytes, writersOffset, length));
+
+ IColumnValuesWriterFactory writerFactory = new ColumnValuesWriterFactory(multiPageOpRef);
+ List<IColumnValuesWriter> writers = new ArrayList<>();
+ FlushColumnMetadata.deserializeWriters(input, writers, writerFactory);
+
+ return new MergeColumnWriteMetadata(datasetType, metaType, numberOfPrimaryKeys, multiPageOpRef, writers,
+ serializedMetadata, componentsTuples);
+ }
+
+ public List<IColumnTupleIterator> getComponentsTuples() {
+ return componentsTuples;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java
new file mode 100644
index 0000000..df6b554
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.tuple;
+
+import org.apache.asterix.column.assembler.value.IValueGetter;
+import org.apache.asterix.column.assembler.value.ValueGetterFactory;
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.column.bytes.stream.in.ByteBufferInputStream;
+import org.apache.asterix.column.bytes.stream.in.MultiByteBufferInputStream;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples.AbstractColumnTupleReference;
+
+public abstract class AbstractAsterixColumnTupleReference extends AbstractColumnTupleReference {
+ private final IValueGetter[] primaryKeysValueGetters;
+ protected final ByteBufferInputStream[] primaryKeyStreams;
+ protected final IColumnValuesReader[] primaryKeyReaders;
+ protected final VoidPointable[] primaryKeys;
+ protected final AbstractBytesInputStream[] columnStreams;
+
+ protected AbstractAsterixColumnTupleReference(int componentIndex, ColumnBTreeReadLeafFrame frame,
+ IColumnProjectionInfo info, IColumnReadMultiPageOp multiPageOp) {
+ super(componentIndex, frame, info, multiPageOp);
+ primaryKeyReaders = getPrimaryKeyReaders(info);
+ int numberOfPrimaryKeys = primaryKeyReaders.length;
+
+ this.primaryKeyStreams = new ByteBufferInputStream[numberOfPrimaryKeys];
+ primaryKeysValueGetters = new IValueGetter[numberOfPrimaryKeys];
+ primaryKeys = new VoidPointable[numberOfPrimaryKeys];
+
+ for (int i = 0; i < numberOfPrimaryKeys; i++) {
+ primaryKeyStreams[i] = new ByteBufferInputStream();
+ primaryKeysValueGetters[i] =
+ ValueGetterFactory.INSTANCE.createValueGetter(primaryKeyReaders[i].getTypeTag());
+ primaryKeys[i] = new VoidPointable();
+ }
+
+ this.columnStreams = new AbstractBytesInputStream[info.getNumberOfProjectedColumns()];
+ for (int i = 0; i < columnStreams.length; i++) {
+ if (info.getColumnIndex(i) >= numberOfPrimaryKeys) {
+ columnStreams[i] = new MultiByteBufferInputStream();
+ } else {
+ columnStreams[i] = new ByteBufferInputStream();
+ }
+ }
+ }
+
+ protected abstract IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info);
+
+ @Override
+ protected final void startPrimaryKey(IColumnBufferProvider provider, int startIndex, int ordinal,
+ int numberOfTuples) throws HyracksDataException {
+ ByteBufferInputStream primaryKeyStream = primaryKeyStreams[ordinal];
+ primaryKeyStream.reset(provider);
+ IColumnValuesReader reader = primaryKeyReaders[ordinal];
+ reader.reset(primaryKeyStream, numberOfTuples);
+ reader.skip(startIndex);
+ }
+
+ @Override
+ protected final void onNext() throws HyracksDataException {
+ for (int i = 0; i < primaryKeys.length; i++) {
+ IColumnValuesReader reader = primaryKeyReaders[i];
+ reader.next();
+ primaryKeys[i].set(primaryKeysValueGetters[i].getValue(reader));
+ }
+ }
+
+ @Override
+ public void lastTupleReached() throws HyracksDataException {
+ //Default: noOp
+ }
+
+ @Override
+ public final int getFieldCount() {
+ return primaryKeys.length;
+ }
+
+ @Override
+ public final byte[] getFieldData(int fIdx) {
+ return primaryKeys[fIdx].getByteArray();
+ }
+
+ @Override
+ public final int getFieldStart(int fIdx) {
+ return primaryKeys[fIdx].getStartOffset();
+ }
+
+ @Override
+ public final int getFieldLength(int fIdx) {
+ return primaryKeys[fIdx].getLength();
+ }
+
+ @Override
+ public final int getTupleSize() {
+ return -1;
+ }
+
+ @Override
+ public final boolean isAntimatter() {
+ /*
+ * The primary key cannot be missing, but the actual tuple is missing. There is no need to check other
+ * primary key readers (for composite primary keys). One primary key reader is sufficient to determine if a
+ * tuple is an anti-matter tuple.
+ */
+ return primaryKeyReaders[0].isMissing();
+ }
+
+ @Override
+ public final int compareTo(IColumnTupleIterator o) {
+ AbstractAsterixColumnTupleReference other = (AbstractAsterixColumnTupleReference) o;
+ int compare = 0;
+ for (int i = 0; i < primaryKeys.length && compare == 0; i++) {
+ compare = primaryKeyReaders[i].compareTo(other.primaryKeyReaders[i]);
+ }
+ return compare;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
new file mode 100644
index 0000000..c10d415
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.tuple;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.column.bytes.stream.in.MultiByteBufferInputStream;
+import org.apache.asterix.column.operation.lsm.merge.IEndOfPageCallBack;
+import org.apache.asterix.column.operation.lsm.merge.MergeColumnReadMetadata;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+
+public final class MergeColumnTupleReference extends AbstractAsterixColumnTupleReference {
+ private final IColumnValuesReader[] columnReaders;
+ private int skipCount;
+ private IEndOfPageCallBack endOfPageCallBack;
+
+ public MergeColumnTupleReference(int componentIndex, ColumnBTreeReadLeafFrame frame,
+ MergeColumnReadMetadata columnMetadata, IColumnReadMultiPageOp multiPageOp) {
+ super(componentIndex, frame, columnMetadata, multiPageOp);
+ this.columnReaders = columnMetadata.getColumnReaders();
+ }
+
+ @Override
+ protected IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
+ MergeColumnReadMetadata columnMetadata = (MergeColumnReadMetadata) info;
+ int numberOfPrimaryKeys = columnMetadata.getNumberOfPrimaryKeys();
+ IColumnValuesReader[] primaryKeyReaders = new IColumnValuesReader[numberOfPrimaryKeys];
+ System.arraycopy(columnMetadata.getColumnReaders(), 0, primaryKeyReaders, 0, numberOfPrimaryKeys);
+ return primaryKeyReaders;
+ }
+
+ @Override
+ protected boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples) {
+ //Skip filters
+ pageZero.position(pageZero.position() + numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE);
+ skipCount = 0;
+ return true;
+ }
+
+ @Override
+ protected void startColumn(IColumnBufferProvider buffersProvider, int startIndex, int ordinal, int numberOfTuples)
+ throws HyracksDataException {
+ int numberOfPrimaryKeys = primaryKeys.length;
+ if (ordinal < numberOfPrimaryKeys) {
+ //Skip primary key
+ return;
+ }
+ MultiByteBufferInputStream columnStream = (MultiByteBufferInputStream) columnStreams[ordinal];
+ columnStream.reset(buffersProvider);
+ IColumnValuesReader reader = columnReaders[ordinal];
+ reader.reset(columnStream, numberOfTuples);
+ reader.skip(startIndex);
+ }
+
+ @Override
+ public void skip(int count) throws HyracksDataException {
+ skipCount += count;
+ }
+
+ @Override
+ public void lastTupleReached() throws HyracksDataException {
+ endOfPageCallBack.callEnd(this);
+ }
+
+ public int getAndResetSkipCount() {
+ int currentSkipCount = skipCount;
+ skipCount = 0;
+ return currentSkipCount;
+ }
+
+ public IColumnValuesReader getReader(int columnIndex) {
+ return columnReaders[columnIndex];
+ }
+
+ public void registerEndOfPageCallBack(IEndOfPageCallBack endOfPageCallBack) {
+ this.endOfPageCallBack = endOfPageCallBack;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java
index 00cb0c5..c63912b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java
@@ -32,10 +32,11 @@
}
@Override
- public void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException {
+ public ITupleReference project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException {
for (int i = 0; i < tuple.getFieldCount(); i++) {
dos.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
tb.addFieldEndOffset();
}
+ return tuple;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
index 8ca1a82..ba23e30 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
@@ -25,5 +25,5 @@
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
public interface ITupleProjector {
- void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException;
+ ITupleReference project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException;
}