Merge "merge branch gerrit/trinity into gerrit/ionic" into ionic
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
index a101bed..bd9e329 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
@@ -35,6 +35,7 @@
import org.apache.asterix.optimizer.rules.pushdown.processor.DeltaTableFilterPushdownProcessor;
import org.apache.asterix.optimizer.rules.pushdown.processor.ExternalDatasetFilterPushdownProcessor;
import org.apache.asterix.optimizer.rules.pushdown.processor.InlineAndNormalizeFilterExpressionsProcessor;
+import org.apache.asterix.optimizer.rules.pushdown.processor.ParquetFilterPushdownProcessor;
import org.apache.asterix.optimizer.rules.pushdown.visitor.PushdownOperatorVisitor;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -120,6 +121,7 @@
// Performs prefix pushdowns
pushdownProcessorsExecutor.add(new ExternalDatasetFilterPushdownProcessor(pushdownContext, context));
pushdownProcessorsExecutor.add(new DeltaTableFilterPushdownProcessor(pushdownContext, context));
+ pushdownProcessorsExecutor.add(new ParquetFilterPushdownProcessor(pushdownContext, context));
pushdownProcessorsExecutor
.add(new ConsolidateProjectionAndFilterExpressionsProcessor(pushdownContext, context));
// Inlines AND/OR expression (must be last to run)
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
index 1622c6e..c4210a3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
@@ -26,7 +26,9 @@
import java.util.Set;
import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.optimizer.rules.pushdown.descriptor.DefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ParquetDatasetScanDefineDescriptor;
import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
import org.apache.asterix.optimizer.rules.pushdown.descriptor.UseDescriptor;
import org.apache.asterix.optimizer.rules.pushdown.visitor.FilterExpressionInlineVisitor;
@@ -84,8 +86,15 @@
public void registerScan(Dataset dataset, List<LogicalVariable> pkList, LogicalVariable recordVariable,
LogicalVariable metaVariable, AbstractScanOperator scanOperator) {
- ScanDefineDescriptor scanDefDesc =
- new ScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, metaVariable, scanOperator);
+ ScanDefineDescriptor scanDefDesc;
+ if (DatasetUtil.isParquetFormat(dataset)) {
+ scanDefDesc = new ParquetDatasetScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable,
+ metaVariable, scanOperator);
+ } else {
+ scanDefDesc = new ScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, metaVariable,
+ scanOperator);
+ }
+ new ScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, metaVariable, scanOperator);
defineChain.put(recordVariable, scanDefDesc);
useChain.put(recordVariable, new ArrayList<>());
if (metaVariable != null) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
index 023e4da..cf438d8 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
@@ -31,12 +31,14 @@
import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ParquetDatasetScanDefineDescriptor;
import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
import org.apache.asterix.optimizer.rules.pushdown.processor.IPushdownProcessor;
import org.apache.asterix.optimizer.rules.pushdown.visitor.ExpectedSchemaNodeToIATypeTranslatorVisitor;
import org.apache.asterix.runtime.projection.ColumnDatasetProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.asterix.runtime.projection.ParquetExternalDatasetProjectionFiltrationInfo;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -121,6 +123,12 @@
Map<String, String> configuration = ((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties();
boolean embedFilterValues = ExternalDataPrefix.containsComputedFields(configuration) && Boolean.parseBoolean(
configuration.getOrDefault(ExternalDataConstants.KEY_EMBED_FILTER_VALUES, ExternalDataConstants.TRUE));
+ if (DatasetUtil.isParquetFormat(dataset)) {
+ return new ParquetExternalDatasetProjectionFiltrationInfo(recordRequestedType, pathLocations,
+ scanDefineDescriptor.getFilterPaths(), scanDefineDescriptor.getFilterExpression(),
+ ((ParquetDatasetScanDefineDescriptor) scanDefineDescriptor).getRowGroupFilterExpression(),
+ embedFilterValues);
+ }
return new ExternalDatasetProjectionFiltrationInfo(recordRequestedType, pathLocations,
scanDefineDescriptor.getFilterPaths(), scanDefineDescriptor.getFilterExpression(), embedFilterValues);
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ParquetDatasetScanDefineDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ParquetDatasetScanDefineDescriptor.java
new file mode 100644
index 0000000..4b4d742
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ParquetDatasetScanDefineDescriptor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.descriptor;
+
+import java.util.List;
+
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+
+public class ParquetDatasetScanDefineDescriptor extends ScanDefineDescriptor {
+
+ private ILogicalExpression rowGroupFilterExpression;
+
+ public ParquetDatasetScanDefineDescriptor(int scope, Dataset dataset, List<LogicalVariable> primaryKeyVariables,
+ LogicalVariable recordVariable, LogicalVariable metaRecordVariable, ILogicalOperator operator) {
+ super(scope, dataset, primaryKeyVariables, recordVariable, metaRecordVariable, operator);
+ this.rowGroupFilterExpression = null;
+ }
+
+ public ILogicalExpression getRowGroupFilterExpression() {
+ return rowGroupFilterExpression;
+ }
+
+ public void setRowGroupFilterExpression(ILogicalExpression rowGroupFilterExpression) {
+ this.rowGroupFilterExpression = rowGroupFilterExpression;
+ }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
index 1ec64d5..0d79767 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
@@ -61,7 +61,7 @@
protected final ExpressionToExpectedSchemaNodeVisitor exprToNodeVisitor;
protected final ColumnFilterPathBuilderVisitor pathBuilderVisitor;
protected final Map<ILogicalExpression, ARecordType> paths;
- private final ArrayPathCheckerVisitor checkerVisitor;
+ protected final ArrayPathCheckerVisitor checkerVisitor;
public ColumnFilterPushdownProcessor(PushdownContext pushdownContext, IOptimizationContext context) {
super(pushdownContext, context);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java
new file mode 100644
index 0000000..25db9f9
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.processor;
+
+import static org.apache.asterix.metadata.utils.PushdownUtil.RANGE_FILTER_PUSHABLE_FUNCTIONS;
+
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ParquetDatasetScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ParquetFilterPushdownProcessor extends ColumnFilterPushdownProcessor {
+
+ public ParquetFilterPushdownProcessor(PushdownContext pushdownContext, IOptimizationContext context) {
+ super(pushdownContext, context);
+ }
+
+ @Override
+ protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws AlgebricksException {
+ return !DatasetUtil.isParquetFormat(scanDefineDescriptor.getDataset());
+ }
+
+ @Override
+ protected boolean isNotPushable(AbstractFunctionCallExpression expression) {
+ FunctionIdentifier fid = expression.getFunctionIdentifier();
+ return !RANGE_FILTER_PUSHABLE_FUNCTIONS.contains(expression.getFunctionIdentifier());
+ }
+
+ @Override
+ protected boolean handlePath(AbstractFunctionCallExpression expression) throws AlgebricksException {
+ IExpectedSchemaNode node = expression.accept(exprToNodeVisitor, null);
+ if (node == null || node.getType() != ExpectedSchemaNodeType.ANY) {
+ return false;
+ }
+
+ // The inferred path from the provided expression
+ ARecordType expressionPath = pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node);
+ paths.put(expression, expressionPath);
+ return true;
+ }
+
+ @Override
+ protected void putFilterInformation(ScanDefineDescriptor scanDefineDescriptor, ILogicalExpression inlinedExpr)
+ throws AlgebricksException {
+ if (checkerVisitor.containsMultipleArrayPaths(paths.values())) {
+ // Cannot pushdown a filter with multiple unnest
+ // TODO allow rewindable column readers for filters
+ // TODO this is a bit conservative (maybe too conservative) as we can push part of expression down
+ return;
+ }
+ ParquetDatasetScanDefineDescriptor scanDefDesc = (ParquetDatasetScanDefineDescriptor) scanDefineDescriptor;
+ ILogicalExpression filterExpr = scanDefDesc.getRowGroupFilterExpression();
+ if (filterExpr != null) {
+ filterExpr = andExpression(filterExpr, inlinedExpr);
+ scanDefDesc.setRowGroupFilterExpression(filterExpr);
+ } else {
+ scanDefDesc.setRowGroupFilterExpression(inlinedExpr);
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index c801283..f74a995 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -490,7 +490,7 @@
<id>asterix-gerrit-asterix-app</id>
<properties>
<test.excludes>
- **/CloudStorageTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java,
+ **/CloudStorageTest.java,**/CloudStorageCancellationTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java,
**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,
**/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,
**/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java,
@@ -611,6 +611,7 @@
<properties>
<test.includes>
**/CloudStorageTest.java,
+ **/CloudStorageCancellationTest.java,
**/SqlppSinglePointLookupExecutionTest.java, **/AwsS3*.java
</test.includes>
<failIfNoTests>false</failIfNoTests>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java
new file mode 100644
index 0000000..2e1f94f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.cloud_storage;
+
+import static org.apache.asterix.test.cloud_storage.CloudStorageTest.DELTA_RESULT_PATH;
+import static org.apache.asterix.test.cloud_storage.CloudStorageTest.EXCLUDED_TESTS;
+import static org.apache.asterix.test.cloud_storage.CloudStorageTest.ONLY_TESTS;
+import static org.apache.asterix.test.cloud_storage.CloudStorageTest.SUITE_TESTS;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.asterix.api.common.LocalCloudUtilAdobeMock;
+import org.apache.asterix.test.common.CancellationTestExecutor;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.Description;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Run tests in cloud deployment environment
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class CloudStorageCancellationTest {
+
+ private final TestCaseContext tcCtx;
+
+ public CloudStorageCancellationTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TestExecutor testExecutor = new CancellationTestExecutor(DELTA_RESULT_PATH);
+ CloudStorageTest.setupEnv(testExecutor);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+ LocalCloudUtilAdobeMock.shutdownSilently();
+ }
+
+ @Parameters(name = "CloudStorageCancellationTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
+ }
+
+ @Test
+ public void test() throws Exception {
+ List<TestCase.CompilationUnit> cu = tcCtx.getTestCase().getCompilationUnit();
+ Assume.assumeTrue(cu.size() > 1 || !EXCLUDED_TESTS.equals(getText(cu.get(0).getDescription())));
+ LangExecutionUtil.test(tcCtx);
+ }
+
+ private static String getText(Description description) {
+ return description == null ? "" : description.getValue();
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
index 498f060..3405838 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
@@ -57,16 +57,16 @@
private static final Logger LOGGER = LogManager.getLogger();
private final TestCaseContext tcCtx;
- private static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
- private static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
- private static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage.conf";
- private static final String DELTA_RESULT_PATH = "results_cloud";
- private static final String EXCLUDED_TESTS = "MP";
+ public static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
+ public static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
+ public static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage.conf";
+ public static final String DELTA_RESULT_PATH = "results_cloud";
+ public static final String EXCLUDED_TESTS = "MP";
- private static final String PLAYGROUND_CONTAINER = "playground";
- private static final String MOCK_SERVER_REGION = "us-west-2";
- private static final int MOCK_SERVER_PORT = 8001;
- private static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT;
+ public static final String PLAYGROUND_CONTAINER = "playground";
+ public static final String MOCK_SERVER_REGION = "us-west-2";
+ public static final int MOCK_SERVER_PORT = 8001;
+ public static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT;
public CloudStorageTest(TestCaseContext tcCtx) {
this.tcCtx = tcCtx;
@@ -74,8 +74,12 @@
@BeforeClass
public static void setUp() throws Exception {
- LocalCloudUtilAdobeMock.startS3CloudEnvironment(true);
TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH);
+ setupEnv(testExecutor);
+ }
+
+ public static void setupEnv(TestExecutor testExecutor) throws Exception {
+ LocalCloudUtilAdobeMock.startS3CloudEnvironment(true);
testExecutor.executorId = "cloud";
testExecutor.stripSubstring = "//DB:";
LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
index b4b7352..dac71ce 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -49,6 +49,14 @@
private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ public CancellationTestExecutor() {
+ super();
+ }
+
+ public CancellationTestExecutor(String deltaPath) {
+ super(deltaPath);
+ }
+
@Override
public InputStream executeQueryService(String str, TestCaseContext.OutputFormat fmt, URI uri,
List<TestCase.CompilationUnit.Parameter> params, boolean jsonEncoded, Charset responseCharset,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index c6d7ba9..29061ec 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -17,6 +17,7 @@
"cloud.max.read.requests.per.second" : 1500,
"cloud.max.write.requests.per.second" : 250,
"cloud.profiler.log.interval" : 5,
+ "cloud.requests.max.http.connections" : 1000,
"cloud.storage.allocation.percentage" : 0.8,
"cloud.storage.anonymous.auth" : false,
"cloud.storage.bucket" : "",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 562e195..f2ea15b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -17,6 +17,7 @@
"cloud.max.read.requests.per.second" : 1500,
"cloud.max.write.requests.per.second" : 250,
"cloud.profiler.log.interval" : 5,
+ "cloud.requests.max.http.connections" : 1000,
"cloud.storage.allocation.percentage" : 0.8,
"cloud.storage.anonymous.auth" : false,
"cloud.storage.bucket" : "",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 132fa5b..685d28b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -17,6 +17,7 @@
"cloud.max.read.requests.per.second" : 1500,
"cloud.max.write.requests.per.second" : 250,
"cloud.profiler.log.interval" : 5,
+ "cloud.requests.max.http.connections" : 1000,
"cloud.storage.allocation.percentage" : 0.8,
"cloud.storage.anonymous.auth" : false,
"cloud.storage.bucket" : "",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan
index b560a5b..805b9aa 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan
@@ -16,7 +16,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true row-group-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan
index 2bfad06..a57424c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan
@@ -16,7 +16,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true row-group-filter on: and(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan
index afb74f1..5e23176 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan
@@ -16,7 +16,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$d] <- test.Department embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$d] <- test.Department embed-filter-value: true row-group-filter on: or(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan
index fc48056..5832f4f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan
@@ -16,7 +16,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") embed-filter-value: true row-group-filter on: eq($$d.getField("name").getField("last"), "Jones") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan
index 8dcaa95..ecbd5ee 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan
@@ -16,7 +16,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") row-group-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan
index a99cc0c..35da948 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan
@@ -16,7 +16,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") row-group-filter on: and(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan
index a7c5727..23892f1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan
@@ -16,7 +16,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$d] <- test.Department [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$d] <- test.Department row-group-filter on: or(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan
index b48def6..9fb8ee1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan
@@ -16,7 +16,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") row-group-filter on: eq($$d.getField("name").getField("last"), "Jones") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan
index 2b41307..9dab9c1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan
@@ -14,7 +14,7 @@
-- STREAM_SELECT |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$p1] <- test.ParquetDataset1 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$p1] <- test.ParquetDataset1 row-group-filter on: gt($$p1.getField("id"), 10) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan
index 3993a4d..2fca35d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan
@@ -36,7 +36,7 @@
-- STREAM_SELECT |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:any},id:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:any},id:any}) row-group-filter on: gt($$p1.getField("id"), 10) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan
index 9b93e19..945f2da 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan
@@ -36,7 +36,7 @@
-- STREAM_SELECT |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:[{indices:any,text:any}]},id:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:[{indices:any,text:any}]},id:any}) row-group-filter on: gt($$p1.getField("id"), 10) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan
index cccf9a3..6941720 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan
@@ -44,7 +44,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any}]}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any}]}) row-group-filter on: or(eq(scan-collection($$p.getField("val1")).getField("x"), 1), eq(scan-collection($$p.getField("val1")).getField("x"), 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan
index cf265bf..316e465 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan
@@ -44,7 +44,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any,y:any}]}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any,y:any}]}) row-group-filter on: or(eq(scan-collection($$p.getField("val1")).getField("x"), 1), eq(scan-collection($$p.getField("val1")).getField("y"), 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-cloud/pom.xml b/asterixdb/asterix-cloud/pom.xml
index 6f952b0..419ac4d 100644
--- a/asterixdb/asterix-cloud/pom.xml
+++ b/asterixdb/asterix-cloud/pom.xml
@@ -247,6 +247,10 @@
<artifactId>aws-crt</artifactId>
</dependency>
<dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>apache-client</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index 0b9b15c..20727de 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -41,15 +41,16 @@
private final long tokenAcquireTimeout;
private final int readMaxRequestsPerSeconds;
private final int writeMaxRequestsPerSeconds;
+ private final int requestsMaxHttpConnections;
public S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
long profilerLogInterval, int writeBufferSize) {
- this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, 1, 0, 0);
+ this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, 1, 0, 0, 0);
}
private S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
long profilerLogInterval, int writeBufferSize, long tokenAcquireTimeout, int writeMaxRequestsPerSeconds,
- int readMaxRequestsPerSeconds) {
+ int readMaxRequestsPerSeconds, int requestsMaxHttpConnections) {
this.region = Objects.requireNonNull(region, "region");
this.endpoint = endpoint;
this.prefix = Objects.requireNonNull(prefix, "prefix");
@@ -59,6 +60,7 @@
this.tokenAcquireTimeout = tokenAcquireTimeout;
this.writeMaxRequestsPerSeconds = writeMaxRequestsPerSeconds;
this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds;
+ this.requestsMaxHttpConnections = requestsMaxHttpConnections;
}
public static S3ClientConfig of(CloudProperties cloudProperties) {
@@ -66,7 +68,7 @@
cloudProperties.getStoragePrefix(), cloudProperties.isStorageAnonymousAuth(),
cloudProperties.getProfilerLogInterval(), cloudProperties.getWriteBufferSize(),
cloudProperties.getTokenAcquireTimeout(), cloudProperties.getWriteMaxRequestsPerSecond(),
- cloudProperties.getReadMaxRequestsPerSecond());
+ cloudProperties.getReadMaxRequestsPerSecond(), cloudProperties.getRequestsMaxHttpConnections());
}
public static S3ClientConfig of(Map<String, String> configuration, int writeBufferSize) {
@@ -124,6 +126,10 @@
return readMaxRequestsPerSeconds;
}
+ public int getRequestsMaxHttpConnections() {
+ return requestsMaxHttpConnections;
+ }
+
private boolean isS3Mock() {
return endpoint != null && !endpoint.isEmpty();
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 224ede4..01a8b02 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -63,6 +63,7 @@
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
@@ -325,6 +326,10 @@
S3ClientBuilder builder = S3Client.builder();
builder.credentialsProvider(config.createCredentialsProvider());
builder.region(Region.of(config.getRegion()));
+ if (config.getRequestsMaxHttpConnections() > 0) {
+ builder.httpClientBuilder(
+ ApacheHttpClient.builder().maxConnections(config.getRequestsMaxHttpConnections()));
+ }
if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
URI uri;
try {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index 7b6fb6e..1e3fe75 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -64,7 +64,8 @@
CLOUD_WRITE_BUFFER_SIZE(
getRangedIntegerType(5, Integer.MAX_VALUE),
StorageUtil.getIntSizeInBytes(8, StorageUtil.StorageUnit.MEGABYTE)),
- CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD(POSITIVE_INTEGER, 50);
+ CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD(POSITIVE_INTEGER, 50),
+ CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS(POSITIVE_INTEGER, 1000);
private final IOptionType interpreter;
private final Object defaultValue;
@@ -96,6 +97,7 @@
case CLOUD_MAX_READ_REQUESTS_PER_SECOND:
case CLOUD_WRITE_BUFFER_SIZE:
case CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD:
+ case CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS:
return Section.COMMON;
default:
return Section.NC;
@@ -163,6 +165,8 @@
return "The write buffer size in bytes. (default: 8MB, min: 5MB)";
case CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD:
return "The number of cloud reads for re-evaluating an eviction plan. (default: 50)";
+ case CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS:
+ return "The maximum number of HTTP connections to use for cloud requests per node. (default: 1000)";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -258,4 +262,8 @@
public int getEvictionPlanReevaluationThreshold() {
return accessor.getInt(Option.CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD);
}
+
+ public int getRequestsMaxHttpConnections() {
+ return accessor.getInt(Option.CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 7f3642b..2803f21 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.asterix.common.api.IIOBlockingOperation;
import org.apache.asterix.common.transactions.ILogManager;
@@ -47,6 +48,8 @@
private final int datasetID;
private final ILogManager logManager;
private final LogRecord waitLog = new LogRecord();
+ private final AtomicInteger failedFlushes = new AtomicInteger();
+ private final AtomicInteger failedMerges = new AtomicInteger();
private int numActiveIOOps;
private int pendingFlushes;
private int pendingMerges;
@@ -317,4 +320,25 @@
public synchronized int getPendingReplications() {
return pendingReplications;
}
+
+ public int getFailedFlushes() {
+ return failedFlushes.get();
+ }
+
+ public int getFailedMerges() {
+ return failedMerges.get();
+ }
+
+ public void incrementFailedIoOp(ILSMIOOperation.LSMIOOperationType operation) {
+ switch (operation) {
+ case FLUSH:
+ failedFlushes.incrementAndGet();
+ break;
+ case MERGE:
+ failedMerges.incrementAndGet();
+ break;
+ default:
+ break;
+ }
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index fc27bf6..0128297 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -955,6 +955,8 @@
stats.addPendingFlushes(dsr.getDatasetInfo().getPendingFlushes());
stats.addPendingMerges(dsr.getDatasetInfo().getPendingMerges());
stats.addPendingReplications(dsr.getDatasetInfo().getPendingReplications());
+ stats.addFailedFlushes(dsr.getDatasetInfo().getFailedFlushes());
+ stats.addFailedMerges(dsr.getDatasetInfo().getFailedMerges());
}
return stats;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 52af329..6887782 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -143,6 +143,7 @@
@Override
public void afterFailure(ILSMIOOperation operation) {
+ dsInfo.incrementFailedIoOp(operation.getIOOperationType());
if (isMerge(operation)) {
try {
ioManager.delete(getOperationMaskFilePath(operation));
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/StorageIOStats.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/StorageIOStats.java
index b58f63d..2e2a4a7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/StorageIOStats.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/StorageIOStats.java
@@ -23,6 +23,8 @@
private int pendingFlushes;
private int pendingMerges;
private int pendingReplications;
+ private int failedFlushes;
+ private int failedMerges;
public void addPendingFlushes(int pending) {
pendingFlushes += pending;
@@ -36,6 +38,22 @@
pendingReplications += pending;
}
+ public void addFailedFlushes(int failed) {
+ failedFlushes += failed;
+ }
+
+ public void addFailedMerges(int failed) {
+ failedMerges += failed;
+ }
+
+ public int getFailedFlushes() {
+ return failedFlushes;
+ }
+
+ public int getFailedMerges() {
+ return failedMerges;
+ }
+
public int getPendingFlushes() {
return pendingFlushes;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ParquetFilterEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ParquetFilterEvaluatorFactory.java
new file mode 100644
index 0000000..774e908
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ParquetFilterEvaluatorFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+
+public class ParquetFilterEvaluatorFactory implements IExternalFilterEvaluatorFactory {
+ private static final long serialVersionUID = 1L;
+ private final FilterPredicate filterExpression;
+ private final IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory;
+
+ public ParquetFilterEvaluatorFactory(IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory,
+ FilterPredicate expression) {
+ this.externalFilterEvaluatorFactory = externalFilterEvaluatorFactory;
+ this.filterExpression = expression;
+ }
+
+ @Override
+ public IExternalFilterEvaluator create(IServiceContext serviceContext, IWarningCollector warningCollector)
+ throws HyracksDataException {
+ return externalFilterEvaluatorFactory.create(serviceContext, warningCollector);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public IExternalFilterValueEmbedder createValueEmbedder(IWarningCollector warningCollector) {
+ return externalFilterEvaluatorFactory.createValueEmbedder(warningCollector);
+ }
+
+ public FilterPredicate getFilterExpression() {
+ return filterExpression;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 2d92e10..f5dc7a2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -33,6 +33,7 @@
import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataPrefix;
@@ -44,6 +45,8 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
import com.amazonaws.SdkBaseException;
@@ -91,6 +94,13 @@
IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
configureAwsS3HdfsJobConf(appCtx, conf, configuration, numberOfPartitions);
configureHdfsConf(conf, configuration);
+ if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) {
+ FilterPredicate parquetFilterPredicate =
+ ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+ if (parquetFilterPredicate != null) {
+ ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate);
+ }
+ }
} catch (SdkException | SdkBaseException ex) {
throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
} catch (AlgebricksException ex) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index 530ce74..72c9977 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -32,6 +32,7 @@
import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataPrefix;
@@ -41,6 +42,8 @@
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobItem;
@@ -82,6 +85,13 @@
JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
configureAzureHdfsJobConf(conf, configuration, endPoint);
configureHdfsConf(conf, configuration);
+ if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) {
+ FilterPredicate parquetFilterPredicate =
+ ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+ if (parquetFilterPredicate != null) {
+ ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate);
+ }
+ }
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
index 4dedb08..1db4445 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
@@ -32,6 +32,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -40,6 +41,8 @@
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.models.PathItem;
@@ -69,6 +72,13 @@
JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
configureAzureHdfsJobConf(conf, configuration, endPoint);
configureHdfsConf(conf, configuration);
+ if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) {
+ FilterPredicate parquetFilterPredicate =
+ ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+ if (parquetFilterPredicate != null) {
+ ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate);
+ }
+ }
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
index 874c3bd..9469747 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataPrefix;
@@ -39,6 +40,8 @@
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
import com.google.cloud.storage.Blob;
@@ -76,6 +79,13 @@
int numberOfPartitions = getPartitionConstraint().getLocations().length;
GCSAuthUtils.configureHdfsJobConf(conf, configuration, numberOfPartitions);
configureHdfsConf(conf, configuration);
+ if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) {
+ FilterPredicate parquetFilterPredicate =
+ ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+ if (parquetFilterPredicate != null) {
+ ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate);
+ }
+ }
}
@Override
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 2551b86..0cd150e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -854,4 +854,9 @@
return dataset.getDatasetType() == DatasetType.EXTERNAL && ExternalDataUtils
.isDeltaTable(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
}
+
+ public static boolean isParquetFormat(Dataset dataset) {
+ return dataset.getDatasetType() == DatasetType.EXTERNAL && ExternalDataUtils
+ .isParquetFormat(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 5939290..dfefb8f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -19,6 +19,7 @@
package org.apache.asterix.metadata.utils;
import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static org.apache.asterix.external.util.ExternalDataUtils.isParquetFormat;
import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
@@ -43,6 +44,7 @@
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.input.filter.NoOpDeltaTableFilterEvaluatorFactory;
import org.apache.asterix.external.input.filter.NoOpExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.metadata.dataset.DatasetFormatInfo;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -53,6 +55,7 @@
import org.apache.asterix.metadata.utils.filter.ColumnRangeFilterBuilder;
import org.apache.asterix.metadata.utils.filter.DeltaTableFilterBuilder;
import org.apache.asterix.metadata.utils.filter.ExternalFilterBuilder;
+import org.apache.asterix.metadata.utils.filter.ParquetFilterBuilder;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.base.IAObject;
import org.apache.asterix.om.types.ARecordType;
@@ -61,6 +64,7 @@
import org.apache.asterix.runtime.projection.ColumnDatasetProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.asterix.runtime.projection.ParquetExternalDatasetProjectionFiltrationInfo;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -346,6 +350,25 @@
(ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo, context, typeEnv);
return builder.build();
}
+ } else if (isParquetFormat(properties)) {
+ if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) {
+ return NoOpDeltaTableFilterEvaluatorFactory.INSTANCE;
+ } else {
+ ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
+ ExternalDatasetProjectionFiltrationInfo pfi =
+ (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo;
+ IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory =
+ NoOpExternalFilterEvaluatorFactory.INSTANCE;
+ if (!prefix.getPaths().isEmpty()) {
+ ExternalFilterBuilder externalFilterBuilder =
+ new ExternalFilterBuilder(pfi, context, typeEnv, prefix);
+ externalFilterEvaluatorFactory = externalFilterBuilder.build();
+ }
+ ParquetFilterBuilder builder = new ParquetFilterBuilder(
+ (ParquetExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo, context, typeEnv);
+ return new ParquetFilterEvaluatorFactory(externalFilterEvaluatorFactory,
+ builder.buildFilterPredicate());
+ }
} else {
if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) {
return NoOpExternalFilterEvaluatorFactory.INSTANCE;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ParquetFilterBuilder.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ParquetFilterBuilder.java
new file mode 100644
index 0000000..38ad119
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ParquetFilterBuilder.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils.filter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.om.base.ADate;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AInt16;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AInt8;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.projection.ParquetExternalDatasetProjectionFiltrationInfo;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators;
+import org.apache.parquet.io.api.Binary;
+
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Predicate;
+
+public class ParquetFilterBuilder extends AbstractFilterBuilder {
+
+ private static final org.apache.logging.log4j.Logger LOGGER = LogManager.getLogger();
+
+ public ParquetFilterBuilder(ParquetExternalDatasetProjectionFiltrationInfo projectionFiltrationInfo,
+ JobGenContext context, IVariableTypeEnvironment typeEnv) {
+ super(projectionFiltrationInfo.getFilterPaths(), projectionFiltrationInfo.getParquetRowGroupFilterExpression(),
+ context, typeEnv);
+ }
+
+ public FilterPredicate buildFilterPredicate() throws AlgebricksException {
+ FilterPredicate parquetFilterPredicate = null;
+ if (filterExpression != null) {
+ try {
+ parquetFilterPredicate = createFilterExpression(filterExpression);
+ } catch (Exception e) {
+ LOGGER.error("Error creating Parquet row-group filter expression ", e);
+ }
+ }
+ if (parquetFilterPredicate != null && !(parquetFilterPredicate instanceof Predicate)) {
+ parquetFilterPredicate = null;
+ }
+ return parquetFilterPredicate;
+ }
+
+ private FilterPredicate createComparisonExpression(ILogicalExpression columnName, ILogicalExpression constValue,
+ FunctionIdentifier fid) throws AlgebricksException {
+ ConstantExpression constExpr = (ConstantExpression) constValue;
+ if (constExpr.getValue().isNull() || constExpr.getValue().isMissing()) {
+ throw new RuntimeException("Unsupported literal type: " + constExpr.getValue());
+ }
+ AsterixConstantValue constantValue = (AsterixConstantValue) constExpr.getValue();
+ String fieldName = createColumnExpression(columnName);
+ switch (constantValue.getObject().getType().getTypeTag()) {
+ case STRING:
+ return createComparisionFunction(FilterApi.binaryColumn(fieldName),
+ Binary.fromString(((AString) constantValue.getObject()).getStringValue()), fid);
+ case TINYINT:
+ return createComparisionFunction(FilterApi.intColumn(fieldName),
+ (int) ((AInt8) constantValue.getObject()).getByteValue(), fid);
+ case SMALLINT:
+ return createComparisionFunction(FilterApi.intColumn(fieldName),
+ (int) ((AInt16) constantValue.getObject()).getShortValue(), fid);
+ case INTEGER:
+ return createComparisionFunction(FilterApi.intColumn(fieldName),
+ ((AInt32) constantValue.getObject()).getIntegerValue(), fid);
+ case BOOLEAN:
+ if (!fid.equals(AlgebricksBuiltinFunctions.EQ)) {
+ throw new RuntimeException("Unsupported comparison function: " + fid);
+ }
+ return FilterApi.eq(FilterApi.booleanColumn(fieldName), constantValue.isTrue());
+ case BIGINT:
+ return createComparisionFunction(FilterApi.longColumn(fieldName),
+ ((AInt64) constantValue.getObject()).getLongValue(), fid);
+ case DOUBLE:
+ return createComparisionFunction(FilterApi.doubleColumn(fieldName),
+ ((ADouble) constantValue.getObject()).getDoubleValue(), fid);
+ case DATE:
+ return createComparisionFunction(FilterApi.intColumn(fieldName),
+ ((ADate) constantValue.getObject()).getChrononTimeInDays(), fid);
+ case DATETIME:
+ Long millis = ((ADateTime) constantValue.getObject()).getChrononTime();
+ return createComparisionFunction(FilterApi.longColumn(fieldName),
+ TimeUnit.MILLISECONDS.toMicros(millis), fid);
+ default:
+ throw new RuntimeException("Unsupported literal type: " + constantValue.getObject().getType());
+ }
+ }
+
+ @Override
+ protected IScalarEvaluatorFactory createValueAccessor(ILogicalExpression expression) {
+ return null;
+ }
+
+ private FilterPredicate createFilterExpression(ILogicalExpression expr) throws AlgebricksException {
+ if (expr == null || expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ throw new RuntimeException("Unsupported expression: " + expr);
+ }
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+ IFunctionDescriptor fd = resolveFunction(funcExpr);
+ FunctionIdentifier fid = fd.getIdentifier();
+ if (funcExpr.getArguments().size() != 2
+ && !(fid.equals(AlgebricksBuiltinFunctions.AND) || fid.equals(AlgebricksBuiltinFunctions.OR))) {
+ throw new RuntimeException("Unsupported function: " + funcExpr);
+ }
+ List<Mutable<ILogicalExpression>> args = funcExpr.getArguments();
+ if (fid.equals(AlgebricksBuiltinFunctions.AND) || fid.equals(AlgebricksBuiltinFunctions.OR)) {
+ return createAndOrPredicate(fid, args, 0);
+ } else {
+ return createComparisonExpression(args.get(0).getValue(), args.get(1).getValue(), fid);
+ }
+ }
+
+ private <T extends Comparable<T>, C extends Operators.Column<T> & Operators.SupportsLtGt> FilterPredicate createComparisionFunction(
+ C column, T value, FunctionIdentifier fid) {
+ if (fid.equals(AlgebricksBuiltinFunctions.EQ)) {
+ return FilterApi.eq(column, value);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.GE)) {
+ return FilterApi.gtEq(column, value);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.GT)) {
+ return FilterApi.gt(column, value);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.LE)) {
+ return FilterApi.ltEq(column, value);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.LT)) {
+ return FilterApi.lt(column, value);
+ } else {
+ throw new RuntimeException("Unsupported function: " + fid);
+ }
+ }
+
+ protected String createColumnExpression(ILogicalExpression expression) {
+ ARecordType path = filterPaths.get(expression);
+ if (path.getFieldNames().length != 1) {
+ throw new RuntimeException("Unsupported column expression: " + expression);
+ } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) {
+ // The field could be a nested field
+ List<String> fieldList = new ArrayList<>();
+ fieldList = createPathExpression(path, fieldList);
+ return String.join(".", fieldList);
+ } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) {
+ return path.getFieldNames()[0];
+ } else {
+ throw new RuntimeException("Unsupported column expression: " + expression);
+ }
+ }
+
+ private List<String> createPathExpression(ARecordType path, List<String> fieldList) {
+ if (path.getFieldNames().length != 1) {
+ throw new RuntimeException("Error creating column expression");
+ } else {
+ fieldList.add(path.getFieldNames()[0]);
+ }
+ if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) {
+ return createPathExpression((ARecordType) path.getFieldTypes()[0], fieldList);
+ } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) {
+ return fieldList;
+ } else {
+ throw new RuntimeException("Error creating column expression");
+ }
+ }
+
+ // Converts or(pred1, pred2, pred3) to or(pred1, or(pred2, pred3))
+ private FilterPredicate createAndOrPredicate(FunctionIdentifier function, List<Mutable<ILogicalExpression>> args,
+ int index) throws AlgebricksException {
+ if (index == args.size() - 2) {
+ if (function.equals(AlgebricksBuiltinFunctions.AND)) {
+ return FilterApi.and(createFilterExpression(args.get(0).getValue()),
+ createFilterExpression(args.get(1).getValue()));
+ } else {
+ return FilterApi.or(createFilterExpression(args.get(0).getValue()),
+ createFilterExpression(args.get(1).getValue()));
+ }
+ } else {
+ if (function.equals(AlgebricksBuiltinFunctions.AND)) {
+ return FilterApi.and(createFilterExpression(args.get(index).getValue()),
+ createAndOrPredicate(function, args, index + 1));
+ } else {
+ return FilterApi.or(createFilterExpression(args.get(index).getValue()),
+ createAndOrPredicate(function, args, index + 1));
+ }
+ }
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
index e0ea99a..f95483e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
@@ -43,7 +43,7 @@
protected final ARecordType projectedType;
protected final ILogicalExpression filterExpression;
protected final Map<String, FunctionCallInformation> functionCallInfoMap;
- private final boolean embedFilterValues;
+ protected final boolean embedFilterValues;
protected final Map<ILogicalExpression, ARecordType> filterPaths;
public ExternalDatasetProjectionFiltrationInfo(ARecordType projectedType,
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ParquetExternalDatasetProjectionFiltrationInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ParquetExternalDatasetProjectionFiltrationInfo.java
new file mode 100644
index 0000000..323d3ed
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ParquetExternalDatasetProjectionFiltrationInfo.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.projection;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksStringBuilderWriter;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+public class ParquetExternalDatasetProjectionFiltrationInfo extends ExternalDatasetProjectionFiltrationInfo {
+
+ private ILogicalExpression parquetRowGroupFilterExpression;
+
+ public ParquetExternalDatasetProjectionFiltrationInfo(ARecordType projectedType,
+ Map<String, FunctionCallInformation> sourceInformationMap, Map<ILogicalExpression, ARecordType> filterPaths,
+ ILogicalExpression filterExpression, ILogicalExpression parquetRowGroupFilterExpression,
+ boolean embedFilterValues) {
+ super(projectedType, sourceInformationMap, filterPaths, filterExpression, embedFilterValues);
+ this.parquetRowGroupFilterExpression = parquetRowGroupFilterExpression;
+ }
+
+ private ParquetExternalDatasetProjectionFiltrationInfo(ParquetExternalDatasetProjectionFiltrationInfo other) {
+ super(other.projectedType, other.functionCallInfoMap, other.filterPaths, other.filterExpression,
+ other.embedFilterValues);
+ this.parquetRowGroupFilterExpression = other.parquetRowGroupFilterExpression;
+ }
+
+ public ILogicalExpression getParquetRowGroupFilterExpression() {
+ return parquetRowGroupFilterExpression;
+ }
+
+ @Override
+ public void print(AlgebricksStringBuilderWriter writer) {
+ super.print(writer);
+ if (parquetRowGroupFilterExpression != null) {
+ writer.append(" row-group-filter on: ");
+ writer.append(parquetRowGroupFilterExpression.toString());
+ }
+ }
+
+ @Override
+ public void print(JsonGenerator generator) throws IOException {
+ super.print(generator);
+ if (parquetRowGroupFilterExpression != null) {
+ generator.writeStringField("row-group-filter-on", parquetRowGroupFilterExpression.toString());
+ }
+ }
+
+ @Override
+ public ParquetExternalDatasetProjectionFiltrationInfo createCopy() {
+ return new ParquetExternalDatasetProjectionFiltrationInfo(this);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ParquetExternalDatasetProjectionFiltrationInfo otherInfo = (ParquetExternalDatasetProjectionFiltrationInfo) o;
+ return super.equals(o)
+ && filterExpressionEquals(parquetRowGroupFilterExpression, otherInfo.parquetRowGroupFilterExpression);
+ }
+}
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index e90283d..35f9219 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -1635,6 +1635,11 @@
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
+ <artifactId>apache-client</artifactId>
+ <version>${awsjavasdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
<artifactId>http-client-spi</artifactId>
<version>${awsjavasdk.version}</version>
</dependency>
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
index 7a272cc..8f95e34 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
@@ -49,6 +49,11 @@
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
<artifactId>algebricks-common</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index f6d9cd1..560f817 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -38,6 +38,7 @@
import org.apache.hyracks.api.job.profiling.IOperatorStats;
import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
import org.apache.hyracks.api.job.profiling.OperatorStats;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -170,25 +171,21 @@
PipelineAssembler pa =
new PipelineAssembler(pipeline, inputArity, outputArity, null, pipelineOutputRecordDescriptor);
startOfPipeline = pa.assemblePipeline(writer, ctx, new HashMap<>());
- HyracksDataException exception = null;
+ Exception exception = null;
try {
startOfPipeline.open();
} catch (Exception e) {
- startOfPipeline.fail();
- exception = HyracksDataException.create(e);
+ exception = e;
} finally {
- try {
- startOfPipeline.close();
- } catch (Exception e) {
- if (exception == null) {
- exception = HyracksDataException.create(e);
- } else {
- exception.addSuppressed(e);
- }
+ if (exception != null) {
+ exception = InvokeUtil.tryUninterruptibleWithCleanups(exception, startOfPipeline::fail,
+ startOfPipeline::close);
+ } else {
+ exception = InvokeUtil.runUninterruptible(exception, startOfPipeline::close);
}
}
if (exception != null) {
- throw exception;
+ throw HyracksDataException.create(exception);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index ff0a3c5..9326427 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -285,6 +285,57 @@
}
}
+ @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", "UnreachableCode" })
+ // catching Throwable, instanceofs, false-positive unreachable code
+ public static Exception tryUninterruptibleWithCleanups(Exception root, ThrowingAction action,
+ ThrowingAction... cleanups) {
+ try {
+ tryUninterruptibleWithCleanups(action, cleanups);
+ } catch (Exception e) {
+ root = ExceptionUtils.suppress(root, e);
+ }
+ return root;
+ }
+
+ @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", "UnreachableCode" })
+ // catching Throwable, instanceofs, false-positive unreachable code
+ public static void tryUninterruptibleWithCleanups(ThrowingAction action, ThrowingAction... cleanups)
+ throws Exception {
+ Throwable savedT = null;
+ boolean suppressedInterrupted = false;
+ try {
+ runUninterruptible(action);
+ } catch (Throwable t) {
+ savedT = t;
+ } finally {
+ for (ThrowingAction cleanup : cleanups) {
+ try {
+ runUninterruptible(cleanup);
+ } catch (Throwable t) {
+ if (savedT != null) {
+ savedT.addSuppressed(t);
+ suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException;
+ } else {
+ savedT = t;
+ }
+ }
+ }
+ }
+ if (savedT == null) {
+ return;
+ }
+ if (suppressedInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ if (savedT instanceof Error) {
+ throw (Error) savedT;
+ } else if (savedT instanceof Exception) {
+ throw (Exception) savedT;
+ } else {
+ throw HyracksDataException.create(savedT);
+ }
+ }
+
// catching Throwable, instanceofs, false-positive unreachable code
public static void tryWithCleanups(ThrowingAction action, ThrowingConsumer<Throwable>... cleanups)
throws Exception {
@@ -426,6 +477,19 @@
* Runs the supplied action, after suspending any pending interruption. An error will be logged if
* the action is itself interrupted.
*/
+ public static Exception runUninterruptible(Exception root, ThrowingAction action) {
+ try {
+ runUninterruptible(action);
+ } catch (Exception e) {
+ root = ExceptionUtils.suppress(root, e);
+ }
+ return root;
+ }
+
+ /**
+ * Runs the supplied action, after suspending any pending interruption. An error will be logged if
+ * the action is itself interrupted.
+ */
public static void runUninterruptible(ThrowingAction action) throws Exception {
boolean interrupted = Thread.interrupted();
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
index c36e4a7..da9c8fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -166,8 +166,12 @@
LOGGER.warn("Lost suppressed interrupt during ICloudReturnableRequest", e);
Thread.currentThread().interrupt();
}
- if (Thread.currentThread().isInterrupted() || !retryPolicy.retry(e)) {
- throw HyracksDataException.create(e);
+ try {
+ if (Thread.currentThread().isInterrupted() || !retryPolicy.retry(e)) {
+ throw HyracksDataException.create(e);
+ }
+ } catch (InterruptedException interruptedEx) {
+ throw HyracksDataException.create(interruptedEx);
}
attempt++;
retry.beforeRetry();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 0bf74ee..614a226 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -63,8 +63,8 @@
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.api.resources.IDeallocatable;
import org.apache.hyracks.api.result.IResultPartitionManager;
-import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.common.job.PartitionState;
import org.apache.hyracks.control.common.job.profiling.StatsCollector;
@@ -420,30 +420,29 @@
IFrameReader reader = collector.getReader();
reader.open();
try {
- try {
- writer.open();
- VSizeFrame frame = new VSizeFrame(this);
- while (reader.nextFrame(frame)) {
- if (aborted) {
- return;
- }
- ByteBuffer buffer = frame.getBuffer();
- writer.nextFrame(buffer);
- buffer.compact();
+ writer.open();
+ VSizeFrame frame = new VSizeFrame(this);
+ while (reader.nextFrame(frame)) {
+ if (aborted) {
+ return;
}
- } catch (Exception e) {
- originalEx = e;
- CleanupUtils.fail(writer, originalEx);
- } finally {
- originalEx = CleanupUtils.closeSilently(writer, originalEx);
+ ByteBuffer buffer = frame.getBuffer();
+ writer.nextFrame(buffer);
+ buffer.compact();
}
+ } catch (Exception e) {
+ originalEx = e;
} finally {
- originalEx = CleanupUtils.closeSilently(reader, originalEx);
+ if (originalEx != null) {
+ InvokeUtil.tryUninterruptibleWithCleanups(writer::fail, writer::close, reader::close);
+ } else {
+ InvokeUtil.tryUninterruptibleWithCleanups(writer::close, reader::close);
+ }
}
} catch (Exception e) {
originalEx = ExceptionUtils.suppress(originalEx, e);
} finally {
- originalEx = CleanupUtils.closeSilently(collector, originalEx);
+ InvokeUtil.runUninterruptible(collector::close);
}
} catch (Exception e) {
originalEx = ExceptionUtils.suppress(originalEx, e);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
index 9740bc4..9dd8611 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
@@ -31,6 +31,7 @@
import org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
@@ -288,11 +289,15 @@
protected void closeCursors() throws HyracksDataException {
if (btreeCursors != null) {
+ Throwable failure = null;
for (int i = 0; i < numBTrees; ++i) {
if (btreeCursors[i] != null) {
- btreeCursors[i].close();
+ failure = ResourceReleaseUtils.close(btreeCursors[i], failure);
}
}
+ if (failure != null) {
+ throw HyracksDataException.create(failure);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index 2c97221..d53e686 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
@@ -127,8 +128,12 @@
}
if (rangeCursors != null) {
+ Throwable failure = null;
for (int i = 0; i < rangeCursors.length; i++) {
- rangeCursors[i].close();
+ failure = ResourceReleaseUtils.close(rangeCursors[i], failure);
+ }
+ if (failure != null) {
+ throw HyracksDataException.create(failure);
}
}
} finally {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 52bd07e..7ec4f86 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -127,12 +127,17 @@
if (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
IRetryPolicy policy = new ExponentialRetryPolicy();
while (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
- if (policy.retry(operation.getFailure())) {
- operation.setFailure(null);
- operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
- lsmHarness.flush(operation);
- } else {
- break;
+ try {
+ if (policy.retry(operation.getFailure())) {
+ operation.setFailure(null);
+ operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
+ lsmHarness.flush(operation);
+ } else {
+ break;
+ }
+ } catch (InterruptedException e) {
+ // in reality, this thread won't be interrupted
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index d8900ad..ee63547 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -98,12 +98,17 @@
if (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
IRetryPolicy policy = new ExponentialRetryPolicy();
while (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
- if (policy.retry(operation.getFailure())) {
- operation.setFailure(null);
- operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
- lsmHarness.flush(operation);
- } else {
- break;
+ try {
+ if (policy.retry(operation.getFailure())) {
+ operation.setFailure(null);
+ operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
+ lsmHarness.flush(operation);
+ } else {
+ break;
+ }
+ } catch (InterruptedException e) {
+ // in reality, this thread won't be interrupted
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
index 080b9ea..72a3bb0 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
@@ -84,17 +84,13 @@
}
@Override
- public boolean retry(Throwable failure) {
+ public boolean retry(Throwable failure) throws InterruptedException {
if (attempt < maxRetries) {
- try {
- long sleepTime = ThreadLocalRandom.current().nextLong(1 + delay);
- if (printDebugLines) {
- LOGGER.info("Retrying after {}ms, attempt: {}/{}", sleepTime, attempt + 1, maxRetries);
- }
- TimeUnit.MILLISECONDS.sleep(sleepTime);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ long sleepTime = ThreadLocalRandom.current().nextLong(1 + delay);
+ if (printDebugLines) {
+ LOGGER.info("Retrying after {}ms, attempt: {}/{}", sleepTime, attempt + 1, maxRetries);
}
+ TimeUnit.MILLISECONDS.sleep(sleepTime);
attempt++;
delay = delay > maxDelay / 2 ? maxDelay : delay * 2;
return true;
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
index 0d18a2b..2fd0191 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
@@ -24,6 +24,7 @@
* @param failure
* the cause of the failure (this cannot be null)
* @return true if one more attempt should be done
+ * @throws InterruptedException if the retry policy can be interrupted
*/
- boolean retry(Throwable failure);
+ boolean retry(Throwable failure) throws InterruptedException;
}