Merge branch 'gerrit/goldfish' into 'master'
Change-Id: Ia6f1478cc62fb07c2eba3c6c9828b568c43a3cc0
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggregateIntoNestedSubplanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggregateIntoNestedSubplanRule.java
index d2ac8e5..378c758 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggregateIntoNestedSubplanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggregateIntoNestedSubplanRule.java
@@ -398,14 +398,16 @@
if (!pushableNestedSubplan) {
return false;
}
-
+ Set<ILogicalOperator> visited = new HashSet<>();
for (int i = 0; i < nspOp.getNestedPlans().size(); i++) {
Mutable<ILogicalOperator> nspAggRef = nspOp.getNestedPlans().get(i).getRoots().get(0);
AggregateOperator nspAgg = (AggregateOperator) nspAggRef.getValue();
Mutable<ILogicalOperator> nspAggChildRef = nspAgg.getInputs().get(0);
LogicalVariable listifyVar = findListifiedVariable(nspAgg, varFromNestedAgg);
if (listifyVar != null) {
- OperatorManipulationUtil.substituteVarRec(aggInSubplanOp, unnestVar, listifyVar, true, context);
+ OperatorManipulationUtil.substituteVarRec(aggInSubplanOp, unnestVar, listifyVar, true, context,
+ visited);
+ visited.clear();
nspAgg.getVariables().addAll(aggInSubplanOp.getVariables());
nspAgg.getExpressions().addAll(aggInSubplanOp.getExpressions());
for (LogicalVariable v : aggInSubplanOp.getVariables()) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushGroupByThroughProduct.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushGroupByThroughProduct.java
index 6d92b51..28166a9 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushGroupByThroughProduct.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushGroupByThroughProduct.java
@@ -120,12 +120,14 @@
AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) opRefJoin.getValue();
gby.getDecorList().clear();
gby.getDecorList().addAll(decorToPush);
+ Set<ILogicalOperator> visited = new HashSet<>();
for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : decorNotToPush) {
LogicalVariable v1 = p.first;
if (v1 != null) {
VariableReferenceExpression varRef = (VariableReferenceExpression) p.second.getValue();
LogicalVariable v2 = varRef.getVariableReference();
- OperatorManipulationUtil.substituteVarRec(join, v2, v1, true, context);
+ OperatorManipulationUtil.substituteVarRec(join, v2, v1, true, context, visited);
+ visited.clear();
}
}
Mutable<ILogicalOperator> branchRef = join.getInputs().get(branch);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
index db1bf50..07789b7 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
@@ -37,6 +37,7 @@
import java.util.Collection;
import org.apache.asterix.test.external_dataset.avro.AvroFileConverterUtil;
+import org.apache.asterix.test.external_dataset.deltalake.DeltaAllTypeGenerator;
import org.apache.asterix.test.external_dataset.deltalake.DeltaTableGenerator;
import org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil;
import org.apache.asterix.testframework.context.TestCaseContext;
@@ -122,6 +123,7 @@
// cleaning directory
BinaryFileConverterUtil.cleanBinaryDirectory(basePath, DELTA_GEN_BASEDIR);
DeltaTableGenerator.prepareDeltaTableContainer(new Configuration());
+ DeltaAllTypeGenerator.createTableInsertData(new Configuration());
}
/**
@@ -434,6 +436,8 @@
loadDeltaDirectory(generatedDataBasePath, "/modified_delta_table/_delta_log", JSON_FILTER, "delta-data/");
loadDeltaDirectory(generatedDataBasePath, "/multiple_file_delta_table", PARQUET_FILTER, "delta-data/");
loadDeltaDirectory(generatedDataBasePath, "/multiple_file_delta_table/_delta_log", JSON_FILTER, "delta-data/");
+ loadDeltaDirectory(generatedDataBasePath, "/delta_all_type/_delta_log", JSON_FILTER, "delta-data/");
+ loadDeltaDirectory(generatedDataBasePath, "/delta_all_type", PARQUET_FILTER, "delta-data/");
}
private static void loadDeltaDirectory(String dataBasePath, String rootPath, FilenameFilter filter,
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaAllTypeGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaAllTypeGenerator.java
new file mode 100644
index 0000000..5c302a5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaAllTypeGenerator.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.external_dataset.deltalake;
+
+import static io.delta.kernel.internal.util.Utils.toCloseableIterator;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+
+import io.delta.kernel.DataWriteContext;
+import io.delta.kernel.Operation;
+import io.delta.kernel.Table;
+import io.delta.kernel.Transaction;
+import io.delta.kernel.TransactionBuilder;
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.defaults.internal.data.DefaultColumnarBatch;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampType;
+import io.delta.kernel.utils.CloseableIterable;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.DataFileStatus;
+
+public class DeltaAllTypeGenerator {
+ public static final DecimalType decimal_t = new DecimalType(10, 5);
+ protected static final StructType exampleTableSchema = new StructType().add("integer_type", IntegerType.INTEGER)
+ .add("string_type", StringType.STRING).add("decimal_type", decimal_t).add("double_type", DoubleType.DOUBLE)
+ .add("timestamp_type", TimestampType.TIMESTAMP).add("date_type", DateType.DATE);
+ public static final String DELTA_ALL_TYPE_TABLE_PATH =
+ "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "delta_all_type";
+
+ public static void createTableInsertData(Configuration conf) throws IOException {
+ Engine engine = DefaultEngine.create(conf);
+ Table table = Table.forPath(engine, DELTA_ALL_TYPE_TABLE_PATH);
+ TransactionBuilder txnBuilder = table.createTransactionBuilder(engine, "Examples", Operation.CREATE_TABLE);
+ txnBuilder = txnBuilder.withSchema(engine, exampleTableSchema);
+ Transaction txn = txnBuilder.build(engine);
+ Row txnState = txn.getTransactionState(engine);
+ ColumnVector[] vectors = new ColumnVector[exampleTableSchema.length()];
+ vectors[0] = intVector(Arrays.asList(123, 124, 125, 126, 127));
+ vectors[1] = stringVector(
+ Arrays.asList("FirstPerson", "SecondPerson", "ThirdPerson", "FourthPerson", "FifthPerson"));
+ vectors[2] = decimalVector(Arrays.asList(new BigDecimal("1.25432"), new BigDecimal("2666.223"),
+ new BigDecimal("1245.2421"), new BigDecimal("23731.2"), new BigDecimal("80911.222456")));
+ vectors[3] = doubleVector(Arrays.asList(100.34d, 200.055d, 300.02d, 400.21014d, 500.219d));
+ vectors[4] = timestampVector(
+ Arrays.asList(1732010400000L, 1732010400330L, 1732010400450L, 1732010403000L, 1732010401200L));
+ vectors[5] = dateVector(Arrays.asList(127, 23, 11, 456, 23));
+ ColumnarBatch batch = new DefaultColumnarBatch(5, exampleTableSchema, vectors);
+ FilteredColumnarBatch f1 = new FilteredColumnarBatch(batch, Optional.empty());
+ CloseableIterator<FilteredColumnarBatch> data = toCloseableIterator(Arrays.asList(f1).iterator());
+ CloseableIterator<FilteredColumnarBatch> physicalData =
+ Transaction.transformLogicalData(engine, txnState, data, Collections.emptyMap());
+ DataWriteContext writeContext = Transaction.getWriteContext(engine, txnState, Collections.emptyMap());
+ CloseableIterator<DataFileStatus> dataFiles = engine.getParquetHandler().writeParquetFiles(
+ writeContext.getTargetDirectory(), physicalData, writeContext.getStatisticsColumns());
+ CloseableIterator<Row> dataActions =
+ Transaction.generateAppendActions(engine, txnState, dataFiles, writeContext);
+ CloseableIterable<Row> dataActionsIterable = CloseableIterable.inMemoryIterable(dataActions);
+ txn.commit(engine, dataActionsIterable);
+
+ }
+
+ static ColumnVector stringVector(List<String> data) {
+ return new ColumnVector() {
+ @Override
+ public DataType getDataType() {
+ return StringType.STRING;
+ }
+
+ @Override
+ public int getSize() {
+ return data.size();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public boolean isNullAt(int rowId) {
+ return data.get(rowId) == null;
+ }
+
+ @Override
+ public String getString(int rowId) {
+ return data.get(rowId);
+ }
+ };
+ }
+
+ static ColumnVector intVector(List<Integer> data) {
+ return new ColumnVector() {
+ @Override
+ public DataType getDataType() {
+ return IntegerType.INTEGER;
+ }
+
+ @Override
+ public int getSize() {
+ return data.size();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public boolean isNullAt(int rowId) {
+ return false;
+ }
+
+ @Override
+ public int getInt(int rowId) {
+ return data.get(rowId);
+ }
+ };
+ }
+
+ static ColumnVector doubleVector(List<Double> data) {
+ return new ColumnVector() {
+ @Override
+ public DataType getDataType() {
+ return DoubleType.DOUBLE;
+ }
+
+ @Override
+ public int getSize() {
+ return data.size();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public boolean isNullAt(int rowId) {
+ return data.get(rowId) == null;
+ }
+
+ @Override
+ public double getDouble(int rowId) {
+ return data.get(rowId);
+ }
+ };
+ }
+
+ static ColumnVector decimalVector(List<BigDecimal> data) {
+ return new ColumnVector() {
+ @Override
+ public DataType getDataType() {
+ return decimal_t; // Use the specific DecimalType passed (scale and precision)
+ }
+
+ @Override
+ public int getSize() {
+ return data.size();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public boolean isNullAt(int rowId) {
+ return data.get(rowId) == null;
+ }
+
+ @Override
+ public BigDecimal getDecimal(int rowId) {
+ // Return the BigDecimal value directly as Delta Kernel works natively with BigDecimal for decimals
+ return data.get(rowId);
+ }
+ };
+ }
+
+ static ColumnVector timestampVector(List<Long> data) { // Assuming timestamp values are stored as microseconds since epoch
+ return new ColumnVector() {
+ @Override
+ public DataType getDataType() {
+ return TimestampType.TIMESTAMP;
+ }
+
+ @Override
+ public int getSize() {
+ return data.size();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public boolean isNullAt(int rowId) {
+ return data.get(rowId) == null;
+ }
+
+ @Override
+ public long getLong(int rowId) {
+ // Delta Lake often uses microseconds since epoch for timestamps
+ return data.get(rowId);
+ }
+ };
+ }
+
+ static ColumnVector dateVector(List<Integer> data) { // Assuming date values are stored as days since epoch
+ return new ColumnVector() {
+ @Override
+ public DataType getDataType() {
+ return DateType.DATE;
+ }
+
+ @Override
+ public int getSize() {
+ return data.size();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public boolean isNullAt(int rowId) {
+ return data.get(rowId) == null;
+ }
+
+ @Override
+ public int getInt(int rowId) {
+ // Delta Lake often uses days since epoch for dates
+ return data.get(rowId);
+ }
+ };
+ }
+
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.00.ddl.sqlpp
new file mode 100644
index 0000000..cdae363
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.00.ddl.sqlpp
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+
+CREATE TYPE DeltalakeTableType as {
+ };
+
+CREATE EXTERNAL COLLECTION DeltalakeDataset1(DeltalakeTableType) USING %adapter%
+ (
+ %template%,
+ ("container"="playground"),
+ ("definition"="delta-data/delta_all_type"),
+ ("decimal-to-double" = "true"),
+ ("timestamp-to-long" = "false"),
+ ("date-to-int" = "false"),
+ ("timezone" = "PST"),
+ ("table-format" = "delta")
+ );
+
+CREATE EXTERNAL COLLECTION DeltalakeDataset2(DeltalakeTableType) USING %adapter%
+ (
+ %template%,
+ ("container"="playground"),
+ ("definition"="delta-data/delta_all_type"),
+ ("decimal-to-double" = "true"),
+ ("timestamp-to-long" = "true"),
+ ("date-to-int" = "true"),
+ ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.01.query.sqlpp
new file mode 100644
index 0000000..90f80ab
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT element ds FROM DeltalakeDataset1 as ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.02.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.02.query.sqlpp
new file mode 100644
index 0000000..b75203b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.02.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT element ds FROM DeltalakeDataset2 as ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.00.ddl.sqlpp
new file mode 100644
index 0000000..4987403
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.00.ddl.sqlpp
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset1(DeltalakeTableType) USING %adapter%
+ (
+ %template%,
+ ("container"="playground"),
+ ("definition"="delta-data/delta_all_type"),
+ ("decimal-to-double" = "true"),
+ ("timestamp-to-long"="false"),
+ ("date-to-int"="false"),
+ ("timezone" = "PST"),
+ ("table-format" = "delta")
+ );
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset2(DeltalakeTableType) USING %adapter%
+ (
+ %template%,
+ ("container"="playground"),
+ ("definition"="delta-data/delta_all_type"),
+ ("decimal-to-double" = "true"),
+ ("timestamp-to-long"="false"),
+ ("date-to-int"="false"),
+ ("timezone" = "PST"),
+ ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.01.query.sqlpp
new file mode 100644
index 0000000..4180599
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.01.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+USE test;
+
+SET `compiler.external.field.pushdown` "false";
+
+
+SELECT VALUE d
+FROM DeltalakeDataset1 d
+ORDER BY d.integer_type;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.02.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.02.query.sqlpp
new file mode 100644
index 0000000..d300959
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.02.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "false";
+
+Explain
+SELECT VALUE d
+FROM DeltalakeDataset1 d
+ORDER BY d.integer_type;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.03.query.sqlpp
new file mode 100644
index 0000000..073171c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.03.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "true";
+
+SELECT VALUE d
+FROM DeltalakeDataset1 d
+ORDER BY d.integer_type;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.04.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.04.query.sqlpp
new file mode 100644
index 0000000..d300959
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.04.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "false";
+
+Explain
+SELECT VALUE d
+FROM DeltalakeDataset1 d
+ORDER BY d.integer_type;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.05.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.05.query.sqlpp
new file mode 100644
index 0000000..fa60442
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.05.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "false";
+
+
+SELECT d1.integer_type, d2.string_type, d2.timestamp_type
+FROM DeltalakeDataset1 d1, DeltalakeDataset2 d2
+WHERE d1.integer_type = d2.integer_type
+ORDER BY d2.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.06.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.06.query.sqlpp
new file mode 100644
index 0000000..e1ecedd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.06.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "false";
+
+EXPLAIN
+SELECT d1.integer_type, d2.string_type, d2.timestamp_type
+FROM DeltalakeDataset1 d1, DeltalakeDataset2 d2
+WHERE d1.integer_type = d2.integer_type
+ORDER BY d2.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.07.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.07.query.sqlpp
new file mode 100644
index 0000000..1e440bf
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.07.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "true";
+
+SELECT d1.integer_type, d2.string_type, d2.timestamp_type
+FROM DeltalakeDataset1 d1, DeltalakeDataset2 d2
+WHERE d1.integer_type = d2.integer_type
+ORDER BY d2.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.08.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.08.query.sqlpp
new file mode 100644
index 0000000..786618a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.08.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "true";
+
+Explain
+SELECT d1.integer_type, d2.string_type, d2.timestamp_type
+FROM DeltalakeDataset1 d1, DeltalakeDataset2 d2
+WHERE d1.integer_type = d2.integer_type
+ORDER BY d2.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.09.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.09.query.sqlpp
new file mode 100644
index 0000000..d2c9153
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.09.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "true";
+
+SELECT d1.integer_type, d1.date_type, d2.decimal_type
+FROM DeltalakeDataset1 d1, DeltalakeDataset2 d2
+WHERE d1.integer_type = d2.integer_type
+ORDER BY d2.integer_type;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.10.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.10.query.sqlpp
new file mode 100644
index 0000000..bb4701a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.10.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "true";
+
+Explain
+SELECT d1.integer_type, d1.date_type, d2.decimal_type
+FROM DeltalakeDataset1 d1, DeltalakeDataset2 d2
+WHERE d1.integer_type = d2.integer_type
+ORDER BY d2.integer_type;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp
new file mode 100644
index 0000000..c57de93
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter%
+ (
+ %template%,
+ ("container"="playground"),
+ ("definition"="delta-data/s1"),
+ ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-all-type/deltalake-all-type.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-all-type/deltalake-all-type.01.adm
new file mode 100644
index 0000000..e5f99a3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-all-type/deltalake-all-type.01.adm
@@ -0,0 +1,5 @@
+{ "integer_type": 123, "string_type": "FirstPerson", "decimal_type": 1.25432, "double_type": 100.34, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-05-08") }
+{ "integer_type": 124, "string_type": "SecondPerson", "decimal_type": 2666.223, "double_type": 200.055, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-01-24") }
+{ "integer_type": 125, "string_type": "ThirdPerson", "decimal_type": 1245.2421, "double_type": 300.02, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-01-12") }
+{ "integer_type": 126, "string_type": "FourthPerson", "decimal_type": 23731.2, "double_type": 400.21014, "timestamp_type": datetime("1970-01-20T17:06:50.403"), "date_type": date("1971-04-02") }
+{ "integer_type": 127, "string_type": "FifthPerson", "decimal_type": 80911.22245, "double_type": 500.219, "timestamp_type": datetime("1970-01-20T17:06:50.401"), "date_type": date("1970-01-24") }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-all-type/deltalake-all-type.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-all-type/deltalake-all-type.02.adm
new file mode 100644
index 0000000..9cc21d6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-all-type/deltalake-all-type.02.adm
@@ -0,0 +1,5 @@
+{ "integer_type": 123, "string_type": "FirstPerson", "decimal_type": 1.25432, "double_type": 100.34, "timestamp_type": 1732010400, "date_type": 127 }
+{ "integer_type": 124, "string_type": "SecondPerson", "decimal_type": 2666.223, "double_type": 200.055, "timestamp_type": 1732010400, "date_type": 23 }
+{ "integer_type": 125, "string_type": "ThirdPerson", "decimal_type": 1245.2421, "double_type": 300.02, "timestamp_type": 1732010400, "date_type": 11 }
+{ "integer_type": 126, "string_type": "FourthPerson", "decimal_type": 23731.2, "double_type": 400.21014, "timestamp_type": 1732010403, "date_type": 456 }
+{ "integer_type": 127, "string_type": "FifthPerson", "decimal_type": 80911.22245, "double_type": 500.219, "timestamp_type": 1732010401, "date_type": 23 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.01.adm
new file mode 100644
index 0000000..e5f99a3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.01.adm
@@ -0,0 +1,5 @@
+{ "integer_type": 123, "string_type": "FirstPerson", "decimal_type": 1.25432, "double_type": 100.34, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-05-08") }
+{ "integer_type": 124, "string_type": "SecondPerson", "decimal_type": 2666.223, "double_type": 200.055, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-01-24") }
+{ "integer_type": 125, "string_type": "ThirdPerson", "decimal_type": 1245.2421, "double_type": 300.02, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-01-12") }
+{ "integer_type": 126, "string_type": "FourthPerson", "decimal_type": 23731.2, "double_type": 400.21014, "timestamp_type": datetime("1970-01-20T17:06:50.403"), "date_type": date("1971-04-02") }
+{ "integer_type": 127, "string_type": "FifthPerson", "decimal_type": 80911.22245, "double_type": 500.219, "timestamp_type": datetime("1970-01-20T17:06:50.401"), "date_type": date("1970-01-24") }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.02.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.02.plan
new file mode 100644
index 0000000..6f01ee7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.02.plan
@@ -0,0 +1,22 @@
+distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$d]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SORT_MERGE_EXCHANGE [$$14(ASC) ] |PARTITIONED|
+ order (ASC, $$14) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STABLE_SORT [$$14(ASC)] |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ assign [$$14] <- [$$d.getField("integer_type")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$d] <- test.DeltalakeDataset1 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.03.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.03.adm
new file mode 100644
index 0000000..e5f99a3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.03.adm
@@ -0,0 +1,5 @@
+{ "integer_type": 123, "string_type": "FirstPerson", "decimal_type": 1.25432, "double_type": 100.34, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-05-08") }
+{ "integer_type": 124, "string_type": "SecondPerson", "decimal_type": 2666.223, "double_type": 200.055, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-01-24") }
+{ "integer_type": 125, "string_type": "ThirdPerson", "decimal_type": 1245.2421, "double_type": 300.02, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-01-12") }
+{ "integer_type": 126, "string_type": "FourthPerson", "decimal_type": 23731.2, "double_type": 400.21014, "timestamp_type": datetime("1970-01-20T17:06:50.403"), "date_type": date("1971-04-02") }
+{ "integer_type": 127, "string_type": "FifthPerson", "decimal_type": 80911.22245, "double_type": 500.219, "timestamp_type": datetime("1970-01-20T17:06:50.401"), "date_type": date("1970-01-24") }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.04.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.04.plan
new file mode 100644
index 0000000..6f01ee7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.04.plan
@@ -0,0 +1,22 @@
+distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$d]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SORT_MERGE_EXCHANGE [$$14(ASC) ] |PARTITIONED|
+ order (ASC, $$14) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STABLE_SORT [$$14(ASC)] |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ assign [$$14] <- [$$d.getField("integer_type")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$d] <- test.DeltalakeDataset1 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.05.adm
new file mode 100644
index 0000000..1f480e4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.05.adm
@@ -0,0 +1,5 @@
+{ "integer_type": 125, "string_type": "ThirdPerson", "timestamp_type": datetime("1970-01-20T17:06:50.400") }
+{ "integer_type": 126, "string_type": "FourthPerson", "timestamp_type": datetime("1970-01-20T17:06:50.403") }
+{ "integer_type": 127, "string_type": "FifthPerson", "timestamp_type": datetime("1970-01-20T17:06:50.401") }
+{ "integer_type": 124, "string_type": "SecondPerson", "timestamp_type": datetime("1970-01-20T17:06:50.400") }
+{ "integer_type": 123, "string_type": "FirstPerson", "timestamp_type": datetime("1970-01-20T17:06:50.400") }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.06.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.06.plan
new file mode 100644
index 0000000..848bc04
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.06.plan
@@ -0,0 +1,50 @@
+distribute result [$$32] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$32]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$32] <- [{"integer_type": $$34, "string_type": $$37, "timestamp_type": $$38}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ project ([$$34, $$37, $$38]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SORT_MERGE_EXCHANGE [$$36(ASC) ] |PARTITIONED|
+ order (ASC, $$36) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STABLE_SORT [$$36(ASC)] |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$34, $$37, $$38, $$36]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ join (eq($$34, $$35)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- HYBRID_HASH_JOIN [$$34][$$35] |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- HASH_PARTITION_EXCHANGE [$$34] |PARTITIONED|
+ project ([$$34]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$34] <- [$$d1.getField("integer_type")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$d1] <- test.DeltalakeDataset1 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- HASH_PARTITION_EXCHANGE [$$35] |PARTITIONED|
+ project ([$$37, $$38, $$36, $$35]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$38, $$37, $$36, $$35] <- [$$d2.getField("timestamp_type"), $$d2.getField("string_type"), $$d2.getField("id"), $$d2.getField("integer_type")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$d2] <- test.DeltalakeDataset2 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.07.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.07.adm
new file mode 100644
index 0000000..1f480e4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.07.adm
@@ -0,0 +1,5 @@
+{ "integer_type": 125, "string_type": "ThirdPerson", "timestamp_type": datetime("1970-01-20T17:06:50.400") }
+{ "integer_type": 126, "string_type": "FourthPerson", "timestamp_type": datetime("1970-01-20T17:06:50.403") }
+{ "integer_type": 127, "string_type": "FifthPerson", "timestamp_type": datetime("1970-01-20T17:06:50.401") }
+{ "integer_type": 124, "string_type": "SecondPerson", "timestamp_type": datetime("1970-01-20T17:06:50.400") }
+{ "integer_type": 123, "string_type": "FirstPerson", "timestamp_type": datetime("1970-01-20T17:06:50.400") }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.08.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.08.plan
new file mode 100644
index 0000000..34671dd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.08.plan
@@ -0,0 +1,50 @@
+distribute result [$$32] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$32]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$32] <- [{"integer_type": $$34, "string_type": $$37, "timestamp_type": $$38}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ project ([$$34, $$37, $$38]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SORT_MERGE_EXCHANGE [$$36(ASC) ] |PARTITIONED|
+ order (ASC, $$36) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STABLE_SORT [$$36(ASC)] |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$34, $$37, $$38, $$36]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ join (eq($$34, $$35)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- HYBRID_HASH_JOIN [$$34][$$35] |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- HASH_PARTITION_EXCHANGE [$$34] |PARTITIONED|
+ project ([$$34]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$34] <- [$$d1.getField("integer_type")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$d1] <- test.DeltalakeDataset1 project ({integer_type:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- HASH_PARTITION_EXCHANGE [$$35] |PARTITIONED|
+ project ([$$37, $$38, $$36, $$35]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$38, $$37, $$36, $$35] <- [$$d2.getField("timestamp_type"), $$d2.getField("string_type"), $$d2.getField("id"), $$d2.getField("integer_type")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$d2] <- test.DeltalakeDataset2 project ({string_type:any,timestamp_type:any,id:any,integer_type:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.09.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.09.adm
new file mode 100644
index 0000000..5e1edce
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.09.adm
@@ -0,0 +1,5 @@
+{ "integer_type": 123, "date_type": date("1970-05-08"), "decimal_type": 1.25432 }
+{ "integer_type": 124, "date_type": date("1970-01-24"), "decimal_type": 2666.223 }
+{ "integer_type": 125, "date_type": date("1970-01-12"), "decimal_type": 1245.2421 }
+{ "integer_type": 126, "date_type": date("1971-04-02"), "decimal_type": 23731.2 }
+{ "integer_type": 127, "date_type": date("1970-01-24"), "decimal_type": 80911.22245 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.10.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.10.adm
new file mode 100644
index 0000000..f706ba2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.10.adm
@@ -0,0 +1,46 @@
+distribute result [$$32] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$32]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$32] <- [{"integer_type": $$34, "date_type": $$37, "decimal_type": $$38}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ project ([$$34, $$37, $$38]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SORT_MERGE_EXCHANGE [$$35(ASC) ] |PARTITIONED|
+ order (ASC, $$35) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STABLE_SORT [$$35(ASC)] |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ join (eq($$34, $$35)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- HYBRID_HASH_JOIN [$$34][$$35] |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- HASH_PARTITION_EXCHANGE [$$34] |PARTITIONED|
+ project ([$$34, $$37]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$37, $$34] <- [$$d1.getField("date_type"), $$d1.getField("integer_type")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$d1] <- test.DeltalakeDataset1 project ({date_type:any,integer_type:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- HASH_PARTITION_EXCHANGE [$$35] |PARTITIONED|
+ project ([$$38, $$35]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$38, $$35] <- [$$d2.getField("decimal_type"), $$d2.getField("integer_type")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$d2] <- test.DeltalakeDataset2 project ({decimal_type:any,integer_type:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 7aaa8b2..5ae2fc5 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -525,6 +525,18 @@
</compilation-unit>
</test-case>
<test-case FilePath="external-dataset">
+ <compilation-unit name="common/deltalake-all-type">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">common/deltalake-all-type</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/deltalake-field-access-pushdown">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">common/deltalake-field-access-pushdown</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
<compilation-unit name="common/deltalake-invalid-file-format">
<placeholder name="adapter" value="S3" />
<output-dir compare="Text">none</output-dir>
@@ -532,6 +544,13 @@
</compilation-unit>
</test-case>
<test-case FilePath="external-dataset">
+ <compilation-unit name="common/deltalake-table-not-exists">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">none</output-dir>
+ <expected-error>ASX1108: External source error. io.delta.kernel.exceptions.TableNotFoundException: Delta table at path `s3a://playground/delta-data/s1` is not found.</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
<compilation-unit name="common/avro/avro-types/avro-map">
<placeholder name="adapter" value="S3" />
<output-dir compare="Text">common/avro/avro-types/avro-map</output-dir>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
index 915f3b3..3ab86c0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
@@ -47,6 +47,11 @@
}
@Override
+ public void beforeExit(boolean success) throws HyracksDataException {
+ // No Op
+ }
+
+ @Override
public void close() throws IOException {
// No Op
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixDeltaRuntimeException.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixDeltaRuntimeException.java
new file mode 100644
index 0000000..9dd2000
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixDeltaRuntimeException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AsterixDeltaRuntimeException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+ private final HyracksDataException hyracksDataException;
+
+ public AsterixDeltaRuntimeException(HyracksDataException e) {
+ this.hyracksDataException = e;
+ }
+
+ public HyracksDataException getHyracksDataException() {
+ return hyracksDataException;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java
new file mode 100644
index 0000000..3ca704f
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
+import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
+
+import java.util.Map;
+
+import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
+import org.apache.asterix.external.parser.DeltaDataParser;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.IATypeVisitor;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
+
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.StructType;
+
+/**
+ * This visitor clips the filesSchema stored in Delta table metadata using the expected type
+ */
+public class AsterixTypeToDeltaTypeVisitor implements IATypeVisitor<DataType, DataType> {
+
+ private final DeltaConverterContext context;
+ private Map<String, FunctionCallInformation> funcInfo;
+
+ public AsterixTypeToDeltaTypeVisitor(DeltaConverterContext context) {
+ this.context = context;
+ }
+
+ public StructType clipType(ARecordType rootType, StructType fileSchema,
+ Map<String, FunctionCallInformation> funcInfo) {
+ if (rootType == EMPTY_TYPE) {
+ return new StructType();
+ } else if (rootType == ALL_FIELDS_TYPE) {
+ return fileSchema;
+ }
+ StructType builder = new StructType();
+ this.funcInfo = funcInfo;
+ return clipObjectChildren(builder, rootType, fileSchema);
+ }
+
+ @Override
+ public DataType visit(ARecordType recordType, DataType arg) {
+ if (isNotCompatibleType(arg, recordType)) {
+ return null;
+ }
+ StructType builder = new StructType();
+ builder = clipObjectChildren(builder, recordType, (StructType) arg);
+ if (builder.fields().size() == 0) {
+ return null;
+ }
+ return builder;
+ }
+
+ @Override
+ public DataType visit(AbstractCollectionType collectionType, DataType arg) {
+ if (isNotCompatibleType(arg, collectionType)) {
+ return null;
+ }
+ DataType elementSchema = ((ArrayType) arg).getElementType();
+ DataType requestedChildType = collectionType.getItemType().accept(this, elementSchema);
+ return new ArrayType(requestedChildType, true);
+ }
+
+ private StructType clipObjectChildren(StructType builder, ARecordType recordType, StructType arg) {
+ String[] fieldNames = recordType.getFieldNames();
+ IAType[] fieldTypes = recordType.getFieldTypes();
+ for (int i = 0; i < fieldNames.length; i++) {
+ // If the field is not present in the file schema, we skip it
+ if (arg.fieldNames().contains(fieldNames[i])) {
+ DataType type = arg.get(fieldNames[i]).getDataType();
+ DataType childType = fieldTypes[i].accept(this, type);
+ if (childType != null) {
+ // We only add non-MISSING children
+ builder = builder.add(fieldNames[i], childType);
+ }
+ }
+ }
+ return builder;
+ }
+
+ @Override
+ public DataType visit(AUnionType unionType, DataType arg) {
+ if (arg instanceof ArrayType) {
+ return unionType.getType(ATypeTag.ARRAY).accept(this, arg);
+ } else {
+ return unionType.getType(ATypeTag.OBJECT).accept(this, arg);
+ }
+ }
+
+ @Override
+ public DataType visitFlat(IAType node, DataType arg) {
+ return arg;
+ }
+
+ private boolean isNotCompatibleType(DataType type, IAType node) {
+ // typeName is unique
+ FunctionCallInformation info = funcInfo.get(node.getTypeName());
+ ATypeTag actualType = null;
+ try {
+ actualType = DeltaDataParser.getTypeTag(type, false, context);
+ } catch (HyracksDataException e) {
+ throw new AsterixDeltaRuntimeException(e);
+ }
+ ATypeTag expectedType = node.getTypeTag();
+
+ boolean isNotExpected = actualType != expectedType;
+ if (isNotExpected) {
+ //If no warning is created, then it means it has been reported
+ Warning warning = null;
+ if (actualType != ATypeTag.SYSTEM_NULL) {
+ warning = info.createWarning(expectedType, actualType);
+ }
+ if (warning != null) {
+ //New warning that we saw for the first time. We should report it.
+ context.getWarnings().add(warning);
+ }
+ }
+ return isNotExpected;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index 9909cc3..8dc820b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -19,23 +19,33 @@
package org.apache.asterix.external.input.record.reader.aws.delta;
import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
import java.util.Set;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -43,6 +53,7 @@
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -52,7 +63,11 @@
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelException;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.FileStatus;
public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
@@ -61,10 +76,9 @@
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
private static final Logger LOGGER = LogManager.getLogger();
private transient AlgebricksAbsolutePartitionConstraint locationConstraints;
- private Map<Integer, List<String>> schedule;
private String scanState;
private Map<String, String> configuration;
- private List<String> scanFiles;
+ protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
@Override
public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
@@ -77,16 +91,7 @@
throws AlgebricksException, HyracksDataException {
this.configuration = configuration;
Configuration conf = new Configuration();
- conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
- conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
- if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
- conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
- }
- conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME));
- String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
- if (serviceEndpoint != null) {
- conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
- }
+ configurationBuilder(configuration, conf);
String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
+ configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+ configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
@@ -95,26 +100,61 @@
Engine engine = DefaultEngine.create(conf);
io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
- Snapshot snapshot = table.getLatestSnapshot(engine);
- Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, snapshot.getSchema(engine)).build();
+ Snapshot snapshot;
+ try {
+ snapshot = table.getLatestSnapshot(engine);
+ } catch (KernelException e) {
+ LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e);
+ throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
+ }
+
+ List<Warning> warnings = new ArrayList<>();
+ DeltaConverterContext converterContext = new DeltaConverterContext(configuration, warnings);
+ AsterixTypeToDeltaTypeVisitor visitor = new AsterixTypeToDeltaTypeVisitor(converterContext);
+ StructType requiredSchema;
+ try {
+ ARecordType expectedType = HDFSUtils.getExpectedType(conf);
+ Map<String, FunctionCallInformation> functionCallInformationMap =
+ HDFSUtils.getFunctionCallInformationMap(conf);
+ StructType fileSchema = snapshot.getSchema(engine);
+ requiredSchema = visitor.clipType(expectedType, fileSchema, functionCallInformationMap);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (AsterixDeltaRuntimeException e) {
+ throw e.getHyracksDataException();
+ }
+ Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine);
- scanFiles = new ArrayList<>();
+ List<Row> scanFiles = new ArrayList<>();
while (iter.hasNext()) {
FilteredColumnarBatch batch = iter.next();
CloseableIterator<Row> rowIter = batch.getRows();
while (rowIter.hasNext()) {
Row row = rowIter.next();
- scanFiles.add(RowSerDe.serializeRowToJson(row));
+ scanFiles.add(row);
}
}
- locationConstraints = configureLocationConstraints(appCtx);
+ locationConstraints = configureLocationConstraints(appCtx, scanFiles);
configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
- distributeFiles();
+ distributeFiles(scanFiles);
+ issueWarnings(warnings, warningCollector);
}
- private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx) {
+ private void issueWarnings(List<Warning> warnings, IWarningCollector warningCollector) {
+ if (!warnings.isEmpty()) {
+ for (Warning warning : warnings) {
+ if (warningCollector.shouldWarn()) {
+ warningCollector.warn(warning);
+ }
+ }
+ }
+ warnings.clear();
+ }
+
+ private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx,
+ List<Row> scanFiles) {
IClusterStateManager csm = appCtx.getClusterStateManager();
String[] locations = csm.getClusterLocations().getLocations();
@@ -131,24 +171,47 @@
return new AlgebricksAbsolutePartitionConstraint(locations);
}
- private void distributeFiles() {
- final int numComputePartitions = getPartitionConstraint().getLocations().length;
- schedule = new HashMap<>();
- for (int i = 0; i < numComputePartitions; i++) {
- schedule.put(i, new ArrayList<>());
+ private void distributeFiles(List<Row> scanFiles) {
+ final int partitionsCount = getPartitionConstraint().getLocations().length;
+ PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount,
+ Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
+
+ // Prepare the workloads based on the number of partitions
+ for (int i = 0; i < partitionsCount; i++) {
+ workloadQueue.add(new PartitionWorkLoadBasedOnSize());
}
- int i = 0;
- for (String scanFile : scanFiles) {
- schedule.get(i).add(scanFile);
- i = (i + 1) % numComputePartitions;
+ for (Row scanFileRow : scanFiles) {
+ PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
+ FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow);
+ workload.addScanFile(RowSerDe.serializeRowToJson(scanFileRow), fileStatus.getSize());
+ workloadQueue.add(workload);
}
+ partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
+ }
+
+ public static void configurationBuilder(Map<String, String> configuration, Configuration conf) {
+ conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
+ conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+ if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
+ conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
+ }
+ conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME));
+ String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
+ if (serviceEndpoint != null) {
+ conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
+ }
+ conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
+ configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
+ conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
+ configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, ""));
}
@Override
public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException {
try {
int partition = context.getPartition();
- return new DeltaFileRecordReader(schedule.get(partition), scanState, configuration, context);
+ return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState,
+ configuration, context);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
@@ -169,4 +232,31 @@
return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
}
+ public static class PartitionWorkLoadBasedOnSize implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final List<String> scanFiles = new ArrayList<>();
+ private long totalSize = 0;
+
+ public PartitionWorkLoadBasedOnSize() {
+ }
+
+ public List<String> getScanFiles() {
+ return scanFiles;
+ }
+
+ public void addScanFile(String scanFile, long size) {
+ this.scanFiles.add(scanFile);
+ this.totalSize += size;
+ }
+
+ public long getTotalSize() {
+ return totalSize;
+ }
+
+ @Override
+ public String toString() {
+ return "Files: " + scanFiles.size() + ", Total Size: " + totalSize;
+ }
+ }
+
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index 558f8a9..a5b21b6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -133,9 +133,13 @@
scanFile = scanFiles.get(fileIndex);
fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
- physicalDataIter = engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
- physicalReadSchema, Optional.empty());
- dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
+ try {
+ physicalDataIter = engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
+ physicalReadSchema, Optional.empty());
+ dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
return this.hasNext();
} else {
return false;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
index 81e465c..a404e2f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
@@ -19,6 +19,7 @@
package org.apache.asterix.external.input.record.reader.aws.delta.converter;
import java.io.DataOutput;
+import java.util.List;
import java.util.Map;
import java.util.TimeZone;
@@ -32,6 +33,7 @@
import org.apache.asterix.om.types.BuiltinType;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
public class DeltaConverterContext extends ParserContext {
@SuppressWarnings("unchecked")
@@ -47,8 +49,10 @@
private final int timeZoneOffset;
private final AMutableDate mutableDate = new AMutableDate(0);
private final AMutableDateTime mutableDateTime = new AMutableDateTime(0);
+ private final List<Warning> warnings;
- public DeltaConverterContext(Map<String, String> configuration) {
+ public DeltaConverterContext(Map<String, String> configuration, List<Warning> warnings) {
+ this.warnings = warnings;
decimalToDouble = Boolean.parseBoolean(configuration
.getOrDefault(ExternalDataConstants.DeltaOptions.DECIMAL_TO_DOUBLE, ExternalDataConstants.FALSE));
timestampAsLong = Boolean.parseBoolean(configuration
@@ -96,4 +100,8 @@
public boolean isDateAsInt() {
return dateAsInt;
}
+
+ public List<Warning> getWarnings() {
+ return warnings;
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
index ea02d77..adb846d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
@@ -23,6 +23,8 @@
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -42,6 +44,7 @@
import org.apache.asterix.om.types.ATypeTag;
import org.apache.avro.AvroRuntimeException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
import org.apache.hyracks.data.std.api.IValueReference;
@@ -68,7 +71,8 @@
private final IExternalFilterValueEmbedder valueEmbedder;
public DeltaDataParser(IExternalDataRuntimeContext context, Map<String, String> conf) {
- parserContext = new DeltaConverterContext(conf);
+ List<Warning> warnings = new ArrayList<>();
+ parserContext = new DeltaConverterContext(conf, warnings);
valueEmbedder = context.getValueEmbedder();
}
@@ -91,7 +95,7 @@
for (int i = 0; i < schema.fields().size(); i++) {
DataType fieldSchema = schema.fields().get(i).getDataType();
String fieldName = schema.fieldNames().get(i);
- ATypeTag typeTag = getTypeTag(fieldSchema, record.isNullAt(i));
+ ATypeTag typeTag = getTypeTag(fieldSchema, record.isNullAt(i), parserContext);
IValueReference value = null;
if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
value = valueEmbedder.getEmbeddedValue();
@@ -120,7 +124,7 @@
for (int i = 0; i < schema.fields().size(); i++) {
DataType fieldSchema = schema.fields().get(i).getDataType();
String fieldName = schema.fieldNames().get(i);
- ATypeTag typeTag = getTypeTag(fieldSchema, column.getChild(i).isNullAt(index));
+ ATypeTag typeTag = getTypeTag(fieldSchema, column.getChild(i).isNullAt(index), parserContext);
IValueReference value = null;
if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
value = valueEmbedder.getEmbeddedValue();
@@ -157,7 +161,8 @@
parserContext.exitCollection(valueBuffer, arrayBuilder);
}
- private ATypeTag getTypeTag(DataType schema, boolean isNull) throws HyracksDataException {
+ public static ATypeTag getTypeTag(DataType schema, boolean isNull, DeltaConverterContext parserContext)
+ throws HyracksDataException {
if (isNull) {
return ATypeTag.NULL;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 1d5f2ed..f06638d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -35,6 +35,7 @@
import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
import static org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import static org.msgpack.core.MessagePack.Code.ARRAY16;
import java.io.ByteArrayOutputStream;
@@ -71,6 +72,7 @@
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
+import org.apache.asterix.external.input.record.reader.aws.delta.AwsS3DeltaReaderFactory;
import org.apache.asterix.external.library.JavaLibrary;
import org.apache.asterix.external.library.msgpack.MessagePackUtils;
import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
@@ -109,6 +111,12 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelException;
public class ExternalDataUtils {
@@ -117,6 +125,8 @@
private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
private static final int HEADER_FUDGE = 64;
+ private static final Logger LOGGER = LogManager.getLogger();
+
static {
valueParserFactoryMap.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE);
valueParserFactoryMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
@@ -505,6 +515,26 @@
}
}
+ public static void validateDeltaTableExists(Map<String, String> configuration) throws CompilationException {
+ Configuration conf = new Configuration();
+ String tableMetadataPath = null;
+ if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
+ .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
+ AwsS3DeltaReaderFactory.configurationBuilder(configuration, conf);
+ tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
+ + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+ + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+ }
+ Engine engine = DefaultEngine.create(conf);
+ io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
+ try {
+ table.getLatestSnapshot(engine);
+ } catch (KernelException e) {
+ LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e);
+ throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
+ }
+ }
+
public static void prepareIcebergTableFormat(Map<String, String> configuration, Configuration conf,
String tableMetadataPath) throws AlgebricksException {
HadoopTables tables = new HadoopTables(conf);
@@ -936,8 +966,8 @@
}
public static boolean supportsPushdown(Map<String, String> properties) {
- //Currently, only Apache Parquet format is supported
- return isParquetFormat(properties);
+ //Currently, only Apache Parquet/Delta table format is supported
+ return isParquetFormat(properties) || isDeltaTable(properties);
}
/**
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index 2ba1844..f36d25d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -24,6 +24,7 @@
import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableExists;
import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
import static org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
@@ -468,5 +469,8 @@
if (!response.sdkHttpResponse().isSuccessful()) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
}
+ if (isDeltaTable(configuration)) {
+ validateDeltaTableExists(configuration);
+ }
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
index 6314ce8..2613f34 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
@@ -45,4 +45,16 @@
public static final String HADOOP_ENDPOINT = "fs.gs.storage.root.url";
public static final String HADOOP_MAX_REQUESTS_PER_BATCH = "fs.gs.max.requests.per.batch";
public static final String HADOOP_BATCH_THREADS = "fs.gs.batch.threads";
+
+ public static class JSON_CREDENTIALS_FIELDS {
+ public static final String PRIVATE_KEY_ID = "private_key_id";
+ public static final String PRIVATE_KEY = "private_key";
+ public static final String CLIENT_EMAIL = "client_email";
+ }
+
+ public static class HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS {
+ public static final String PRIVATE_KEY_ID = "fs.gs.auth.service.account.private.key.id";
+ public static final String PRIVATE_KEY = "fs.gs.auth.service.account.private.key";
+ public static final String CLIENT_EMAIL = "fs.gs.auth.service.account.email";
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index 5274c44..74a664d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -25,8 +25,6 @@
import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
import static org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH;
import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_TYPE;
import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_UNAUTHENTICATED;
import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_ENDPOINT;
@@ -37,7 +35,6 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -59,6 +56,10 @@
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.exceptions.Warning;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.json.JsonReadFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.gax.paging.Page;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.BaseServiceException;
@@ -73,6 +74,11 @@
}
+ private static final ObjectMapper JSON_CREDENTIALS_OBJECT_MAPPER = new ObjectMapper();
+ static {
+ JSON_CREDENTIALS_OBJECT_MAPPER.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true);
+ }
+
/**
* Builds the client using the provided configuration
*
@@ -218,7 +224,8 @@
* @param configuration properties
* @param numberOfPartitions number of partitions in the cluster
*/
- public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration, int numberOfPartitions) {
+ public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration, int numberOfPartitions)
+ throws AlgebricksException {
String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
@@ -234,15 +241,25 @@
// conf.set(GCSConstants.HADOOP_BATCH_THREADS, String.valueOf(numberOfPartitions));
// authentication method
- // TODO(htowaileb): find a way to pass the content instead of the path to keyfile, this line is temporary
- Path credentials = Path.of("credentials.json");
if (jsonCredentials == null) {
// anonymous access
conf.set(HADOOP_AUTH_TYPE, HADOOP_AUTH_UNAUTHENTICATED);
} else {
- // TODO(htowaileb) need to pass the file content
- conf.set(HADOOP_AUTH_TYPE, HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE);
- conf.set(HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH, credentials.toAbsolutePath().toString());
+ try {
+ JsonNode jsonCreds = JSON_CREDENTIALS_OBJECT_MAPPER.readTree(jsonCredentials);
+ // Setting these values instead of HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH is supported
+ // in com.google.cloud.bigdataoss:util-hadoop only up to version hadoop3-2.2.x and is removed in
+ // version 3.x.y, which also removed support for hadoop-2
+ conf.set(GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS.PRIVATE_KEY_ID,
+ jsonCreds.get(GCSConstants.JSON_CREDENTIALS_FIELDS.PRIVATE_KEY_ID).asText());
+ conf.set(GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS.PRIVATE_KEY,
+ jsonCreds.get(GCSConstants.JSON_CREDENTIALS_FIELDS.PRIVATE_KEY).asText());
+ conf.set(GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS.CLIENT_EMAIL,
+ jsonCreds.get(GCSConstants.JSON_CREDENTIALS_FIELDS.CLIENT_EMAIL).asText());
+ } catch (JsonProcessingException e) {
+ throw CompilationException.create(EXTERNAL_SOURCE_ERROR, "Unable to parse Json Credentials",
+ getMessageOrToString(e));
+ }
}
// set endpoint if provided, default is https://storage.googleapis.com/
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 072a9b6..9b8b103 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.runtime.operators;
+import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER;
+
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
@@ -58,6 +60,7 @@
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
@@ -95,12 +98,13 @@
private final LSMTreeIndexAccessor[] lsmAccessorForUniqunessChecks;
private final IFrameOperationCallback[] frameOpCallbacks;
- private boolean flushedPartialTuples;
private final PermutingFrameTupleReference keyTuple;
private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>();
private final IntSet processedTuples = new IntOpenHashSet();
private final IntSet flushedTuples = new IntOpenHashSet();
private final SourceLocation sourceLoc;
+ private boolean flushedPartialTuples;
+ private IBatchController batchController;
public LSMPrimaryInsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, IIndexDataflowHelperFactory keyIndexHelperFactory,
@@ -284,6 +288,7 @@
searchPred = new RangePredicate(frameTuple, frameTuple, true, true, keySearchCmp, keySearchCmp, null, null);
appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
frameTuple = new FrameTupleReference();
+ batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE);
} catch (Throwable e) { // NOSONAR: Re-thrown
throw HyracksDataException.create(e);
}
@@ -305,7 +310,8 @@
LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessors[pIdx];
IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx];
IFrameTupleProcessor processor = processors[pIdx];
- lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, p2tuplesMapEntry.getValue());
+ lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController,
+ p2tuplesMapEntry.getValue());
}
writeBuffer.ensureFrameSize(buffer.capacity());
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 8bc2b1c..dd9f4070d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.runtime.operators;
+import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER;
+
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -71,6 +73,7 @@
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
@@ -140,11 +143,12 @@
private final ITracer tracer;
private final long traceCategory;
private final ITupleProjector tupleProjector;
- private long lastRecordInTimeStamp = 0L;
private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>();
private final boolean hasSecondaries;
private final ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory;
private final int[] fieldPermutation;
+ private long lastRecordInTimeStamp = 0L;
+ private IBatchController batchController;
public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
@@ -362,6 +366,11 @@
}
@Override
+ public void beforeExit(boolean success) throws HyracksDataException {
+ callback.beforeExit(success);
+ }
+
+ @Override
public void close() throws IOException {
callback.close();
}
@@ -378,6 +387,7 @@
};
frameOpCallbacks[i].open();
}
+ batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE);
}
protected void resetSearchPredicate(int tupleIndex) {
@@ -437,7 +447,8 @@
LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessors[pIdx];
IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx];
IFrameTupleProcessor processor = processors[pIdx];
- lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, p2tuplesMapEntry.getValue());
+ lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController,
+ p2tuplesMapEntry.getValue());
}
if (itemCount > 0) {
lastRecordInTimeStamp = System.currentTimeMillis();
@@ -624,4 +635,5 @@
}
}
}
+
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
new file mode 100644
index 0000000..40465c6
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+
+class StandardBatchController implements IBatchController {
+ static final IBatchController INSTANCE = new StandardBatchController();
+
+ private StandardBatchController() {
+ }
+
+ @Override
+ public void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+ throws HyracksDataException {
+ lsmHarness.enter(ctx, LSMOperationType.MODIFICATION);
+ }
+
+ @Override
+ public void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+ boolean batchSuccessful) throws HyracksDataException {
+ lsmHarness.exit(ctx, callback, batchSuccessful, LSMOperationType.MODIFICATION);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 2413ed2..f8fb6c2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -181,17 +181,24 @@
}
public static void substituteVarRec(AbstractLogicalOperator op, LogicalVariable v1, LogicalVariable v2,
- boolean goThroughNts, ITypingContext ctx) throws AlgebricksException {
+ boolean goThroughNts, ITypingContext ctx, Set<ILogicalOperator> visited) throws AlgebricksException {
VariableUtilities.substituteVariables(op, v1, v2, goThroughNts, ctx);
for (Mutable<ILogicalOperator> opRef2 : op.getInputs()) {
- substituteVarRec((AbstractLogicalOperator) opRef2.getValue(), v1, v2, goThroughNts, ctx);
+ if (visited.contains(opRef2.getValue())) {
+ continue;
+ }
+ visited.add(opRef2.getValue());
+ substituteVarRec((AbstractLogicalOperator) opRef2.getValue(), v1, v2, goThroughNts, ctx, visited);
}
if (op.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE && goThroughNts) {
NestedTupleSourceOperator nts = (NestedTupleSourceOperator) op;
if (nts.getDataSourceReference() != null) {
AbstractLogicalOperator op2 =
(AbstractLogicalOperator) nts.getDataSourceReference().getValue().getInputs().get(0).getValue();
- substituteVarRec(op2, v1, v2, goThroughNts, ctx);
+ if (!visited.contains(op2)) {
+ visited.add(op2);
+ substituteVarRec(op2, v1, v2, goThroughNts, ctx, visited);
+ }
}
}
if (op.hasNestedPlans()) {
@@ -199,7 +206,11 @@
for (ILogicalPlan p : aonp.getNestedPlans()) {
for (Mutable<ILogicalOperator> ref : p.getRoots()) {
AbstractLogicalOperator aop = (AbstractLogicalOperator) ref.getValue();
- substituteVarRec(aop, v1, v2, goThroughNts, ctx);
+ if (visited.contains(aop)) {
+ continue;
+ }
+ visited.add(aop);
+ substituteVarRec(aop, v1, v2, goThroughNts, ctx, visited);
}
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractDecorrelationRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractDecorrelationRule.java
index 55c8400..dbdf6c4 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractDecorrelationRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractDecorrelationRule.java
@@ -92,6 +92,7 @@
protected void buildVarExprList(Collection<LogicalVariable> vars, IOptimizationContext context, GroupByOperator g,
List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> outVeList) throws AlgebricksException {
SourceLocation sourceLoc = g.getSourceLocation();
+ Set<ILogicalOperator> visited = new HashSet<>();
for (LogicalVariable ov : vars) {
LogicalVariable newVar = context.newVar();
VariableReferenceExpression varExpr = new VariableReferenceExpression(newVar);
@@ -101,7 +102,8 @@
for (ILogicalPlan p : g.getNestedPlans()) {
for (Mutable<ILogicalOperator> r : p.getRoots()) {
OperatorManipulationUtil.substituteVarRec((AbstractLogicalOperator) r.getValue(), ov, newVar, true,
- context);
+ context, visited);
+ visited.clear();
}
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java
index 391d03a..36cad77 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java
@@ -331,6 +331,7 @@
List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> outVeList) throws AlgebricksException {
SourceLocation sourceLoc = g.getSourceLocation();
Map<LogicalVariable, LogicalVariable> m = new HashMap<LogicalVariable, LogicalVariable>();
+ Set<ILogicalOperator> visited = new HashSet<>();
for (LogicalVariable ov : vars) {
LogicalVariable newVar = context.newVar();
ILogicalExpression varExpr = new VariableReferenceExpression(newVar);
@@ -340,11 +341,13 @@
for (ILogicalPlan p : g.getNestedPlans()) {
for (Mutable<ILogicalOperator> r : p.getRoots()) {
OperatorManipulationUtil.substituteVarRec((AbstractLogicalOperator) r.getValue(), ov, newVar, true,
- context);
+ context, visited);
+ visited.clear();
}
}
AbstractLogicalOperator opUnder = (AbstractLogicalOperator) g.getInputs().get(0).getValue();
- OperatorManipulationUtil.substituteVarRec(opUnder, ov, newVar, true, context);
+ OperatorManipulationUtil.substituteVarRec(opUnder, ov, newVar, true, context, visited);
+ visited.clear();
m.put(ov, newVar);
}
return m;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
index 0fc24c7..98215bf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
@@ -63,7 +63,7 @@
* @param writer the FrameWriter to write to and flush
* @throws HyracksDataException
*/
- public default void flush(IFrameWriter writer) throws HyracksDataException {
+ default void flush(IFrameWriter writer) throws HyracksDataException {
write(writer, true);
writer.flush();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java
new file mode 100644
index 0000000..7e3d599
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.util;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@FunctionalInterface
+public interface HyracksThrowingAction {
+ void run() throws HyracksDataException; // NOSONAR
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index 3ed17b1..ad5e919 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -288,6 +288,42 @@
}
}
+ @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs
+ public static void tryHyracksWithCleanups(HyracksThrowingAction action, HyracksThrowingAction... cleanups)
+ throws HyracksDataException {
+ Throwable savedT = null;
+ boolean suppressedInterrupted = false;
+ try {
+ action.run();
+ } catch (Throwable t) {
+ savedT = t;
+ } finally {
+ for (HyracksThrowingAction cleanup : cleanups) {
+ try {
+ cleanup.run();
+ } catch (Throwable t) {
+ if (savedT != null) {
+ savedT.addSuppressed(t);
+ suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException;
+ } else {
+ savedT = t;
+ }
+ }
+ }
+ }
+ if (savedT == null) {
+ return;
+ }
+ if (suppressedInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ if (savedT instanceof Error) {
+ throw (Error) savedT;
+ } else {
+ throw HyracksDataException.create(savedT);
+ }
+ }
+
public static void tryWithCleanupsUnchecked(Runnable action, Runnable... cleanups) {
Throwable savedT = null;
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index c27a7e6..6c073f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -172,13 +172,4 @@
++tupleCount;
IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
}
-
- /*
- * Always write and then flush to send out the message if exists
- */
- @Override
- public void flush(IFrameWriter writer) throws HyracksDataException {
- write(writer, true);
- writer.flush();
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
index 75c95b0..b77883d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
@@ -74,4 +74,18 @@
Map<String, Object> sharedMap = TaskUtil.getSharedMap(ctx, false);
return sharedMap == null ? null : (T) sharedMap.get(key);
}
+
+ /**
+ * get a <T> object from the shared map of the task, or returns the default value
+ *
+ * @param key
+ * @param ctx
+ * @param defaultValue
+ * @return the value associated with the key casted as T
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T getOrDefault(String key, IHyracksTaskContext ctx, T defaultValue) {
+ Map<String, T> sharedMap = (Map<String, T>) TaskUtil.getSharedMap(ctx, false);
+ return sharedMap == null ? defaultValue : sharedMap.getOrDefault(key, defaultValue);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
new file mode 100644
index 0000000..e061589
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IBatchController {
+ String KEY_BATCH_CONTROLLER = "BATCH_CONTROLLER";
+
+ void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+ throws HyracksDataException;
+
+ void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+ boolean batchSuccessful) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
index 1f89af2..2fbc0c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
@@ -28,14 +28,24 @@
public interface IFrameOperationCallback extends Closeable {
/**
* Called once processing the frame is done before calling nextFrame on the next IFrameWriter in
- * the pipeline
+ * the pipeline. In the event this frame completion will also exit the component, this will be
+ * called prior to {@link #beforeExit(boolean)}.
*
* @throws HyracksDataException
*/
void frameCompleted() throws HyracksDataException;
/**
- * Called when the task has failed.
+ * Called just prior to exiting the component on batch completion: not all batches may result
+ * in a component exit, depending on the decision of the {@link IBatchController}.
+ *
+ * @throws HyracksDataException
+ */
+ void beforeExit(boolean success) throws HyracksDataException;
+
+ /**
+ * Called when the batch processing, {@link #frameCompleted()} or {@link #beforeExit(boolean)}
+ * invocation has failed.
*
* @param th
*/
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 214d9dc..dbf34c9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -227,10 +227,17 @@
* the callback at the end of the frame
* @param tuples
* the indexes of tuples to process
+ * @param batchController
+ * the controller of the batch lifecycle
* @throws HyracksDataException
*/
void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
- IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples)
+ IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController,
+ Set<Integer> tuples) throws HyracksDataException;
+
+ void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws HyracksDataException;
+
+ void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success, LSMOperationType op)
throws HyracksDataException;
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 2d04020..d019a08 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -37,6 +37,7 @@
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -692,15 +693,27 @@
lsmIndex.updateFilter(ctx, tuple);
}
- private void enter(ILSMIndexOperationContext ctx) throws HyracksDataException {
+ @Override
+ public void enter(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException {
if (!lsmIndex.isMemoryComponentsAllocated()) {
lsmIndex.allocateMemoryComponents();
}
- getAndEnterComponents(ctx, LSMOperationType.MODIFICATION, false);
+ getAndEnterComponents(ctx, op, false);
}
- private void exit(ILSMIndexOperationContext ctx) throws HyracksDataException {
- getAndExitComponentsAndComplete(ctx, LSMOperationType.MODIFICATION);
+ @Override
+ public void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success,
+ LSMOperationType op) throws HyracksDataException {
+ try {
+ callback.beforeExit(success);
+ } catch (Throwable th) {
+ // TODO(mblow): we don't distinguish between the three distinct phases we can encounter
+ // failures in the callback API- we might need this eventually
+ callback.fail(th);
+ throw th;
+ } finally {
+ getAndExitComponentsAndComplete(ctx, op);
+ }
}
private void getAndExitComponentsAndComplete(ILSMIndexOperationContext ctx, LSMOperationType op)
@@ -715,13 +728,15 @@
@Override
public void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
- IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples)
- throws HyracksDataException {
+ IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController,
+ Set<Integer> tuples) throws HyracksDataException {
processor.start();
- enter(ctx);
+ batchController.batchEnter(ctx, this, frameOpCallback);
+ boolean success = false;
try {
try {
processFrame(accessor, tuple, processor, tuples);
+ success = true;
frameOpCallback.frameCompleted();
} catch (Throwable th) {
processor.fail(th);
@@ -733,7 +748,7 @@
LOGGER.warn("Failed to process frame", e);
throw e;
} finally {
- exit(ctx);
+ batchController.batchExit(ctx, this, frameOpCallback, success);
ctx.logPerformanceCounters(accessor.getTupleCount());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index fb5984d..c768768 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -211,8 +212,9 @@
}
public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference tuple, IFrameTupleProcessor processor,
- IFrameOperationCallback frameOpCallback, Set<Integer> tuples) throws HyracksDataException {
- lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, tuples);
+ IFrameOperationCallback frameOpCallback, IBatchController batchController, Set<Integer> tuples)
+ throws HyracksDataException {
+ lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, batchController, tuples);
}
@Override