[ASTERIXDB-3503][EXT] Introduce Delta Lake Support for Google Cloud Storage (GCS)
- user model changes: no
- storage format changes: no
- interface changes: yes
Ext-ref: MB-64376
Change-Id: I4cd44ba31a22cc124e346b077a1c2798ba9ab747
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19140
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index d29cd40..ba0d0f4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -18,245 +18,34 @@
*/
package org.apache.asterix.external.input.record.reader.aws.delta;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
-import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+import static org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
-import org.apache.asterix.external.api.IExternalDataRuntimeContext;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.HDFSUtils;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.runtime.projection.FunctionCallInformation;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.hadoop.mapred.JobConf;
-import io.delta.kernel.Scan;
-import io.delta.kernel.Snapshot;
-import io.delta.kernel.data.FilteredColumnarBatch;
-import io.delta.kernel.data.Row;
-import io.delta.kernel.defaults.engine.DefaultEngine;
-import io.delta.kernel.engine.Engine;
-import io.delta.kernel.exceptions.KernelException;
-import io.delta.kernel.internal.InternalScanFileUtils;
-import io.delta.kernel.types.StructType;
-import io.delta.kernel.utils.CloseableIterator;
-import io.delta.kernel.utils.FileStatus;
-
-public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
-
+public class AwsS3DeltaReaderFactory extends DeltaReaderFactory {
private static final long serialVersionUID = 1L;
- private static final List<String> recordReaderNames =
+ private static final List<String> RECORD_READER_NAMES =
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
- private static final Logger LOGGER = LogManager.getLogger();
- private transient AlgebricksAbsolutePartitionConstraint locationConstraints;
- private String scanState;
- private Map<String, String> configuration;
- protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
- return locationConstraints;
+ protected void configureJobConf(JobConf conf, Map<String, String> configuration) {
+ configureAwsS3HdfsJobConf(conf, configuration);
}
@Override
- public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
- IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
- throws AlgebricksException, HyracksDataException {
- this.configuration = configuration;
- Configuration conf = new Configuration();
- applyConfiguration(configuration, conf);
- String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
- + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
- + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
-
- ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
-
- Engine engine = DefaultEngine.create(conf);
- io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
- Snapshot snapshot;
- try {
- snapshot = table.getLatestSnapshot(engine);
- } catch (KernelException e) {
- LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e);
- throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
- }
-
- List<Warning> warnings = new ArrayList<>();
- DeltaConverterContext converterContext = new DeltaConverterContext(configuration, warnings);
- AsterixTypeToDeltaTypeVisitor visitor = new AsterixTypeToDeltaTypeVisitor(converterContext);
- StructType requiredSchema;
- try {
- ARecordType expectedType = HDFSUtils.getExpectedType(conf);
- Map<String, FunctionCallInformation> functionCallInformationMap =
- HDFSUtils.getFunctionCallInformationMap(conf);
- StructType fileSchema = snapshot.getSchema(engine);
- requiredSchema = visitor.clipType(expectedType, fileSchema, functionCallInformationMap);
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (AsterixDeltaRuntimeException e) {
- throw e.getHyracksDataException();
- }
- Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
- scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
- CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine);
-
- List<Row> scanFiles = new ArrayList<>();
- while (iter.hasNext()) {
- FilteredColumnarBatch batch = iter.next();
- CloseableIterator<Row> rowIter = batch.getRows();
- while (rowIter.hasNext()) {
- Row row = rowIter.next();
- scanFiles.add(row);
- }
- }
- locationConstraints = configureLocationConstraints(appCtx, scanFiles);
- configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
- distributeFiles(scanFiles);
- issueWarnings(warnings, warningCollector);
- }
-
- private void issueWarnings(List<Warning> warnings, IWarningCollector warningCollector) {
- if (!warnings.isEmpty()) {
- for (Warning warning : warnings) {
- if (warningCollector.shouldWarn()) {
- warningCollector.warn(warning);
- }
- }
- }
- warnings.clear();
- }
-
- private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx,
- List<Row> scanFiles) {
- IClusterStateManager csm = appCtx.getClusterStateManager();
-
- String[] locations = csm.getClusterLocations().getLocations();
- if (scanFiles.size() == 0) {
- return AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
- } else if (locations.length > scanFiles.size()) {
- LOGGER.debug(
- "analytics partitions ({}) exceeds total partition count ({}); limiting ingestion partitions to total partition count",
- locations.length, scanFiles.size());
- final String[] locationCopy = locations.clone();
- ArrayUtils.shuffle(locationCopy);
- locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
- }
- return new AlgebricksAbsolutePartitionConstraint(locations);
- }
-
- private void distributeFiles(List<Row> scanFiles) {
- final int partitionsCount = getPartitionConstraint().getLocations().length;
- PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount,
- Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
-
- // Prepare the workloads based on the number of partitions
- for (int i = 0; i < partitionsCount; i++) {
- workloadQueue.add(new PartitionWorkLoadBasedOnSize());
- }
- for (Row scanFileRow : scanFiles) {
- PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
- FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow);
- workload.addScanFile(RowSerDe.serializeRowToJson(scanFileRow), fileStatus.getSize());
- workloadQueue.add(workload);
- }
- partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
- }
-
- public static void applyConfiguration(Map<String, String> configuration, Configuration conf) {
- conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
- conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
- if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
- conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
- }
- conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME));
- String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
- if (serviceEndpoint != null) {
- conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
- }
- conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
- configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
- conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
- configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, ""));
- }
-
- @Override
- public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException {
- try {
- int partition = context.getPartition();
- return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState,
- configuration, context);
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
- }
-
- @Override
- public Class<?> getRecordClass() throws AsterixException {
- return Row.class;
+ protected String getTablePath(Map<String, String> configuration) {
+ return S3Utils.getPath(configuration);
}
@Override
public List<String> getRecordReaderNames() {
- return recordReaderNames;
- }
-
- @Override
- public Set<String> getReaderSupportedFormats() {
- return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
- }
-
- public static class PartitionWorkLoadBasedOnSize implements Serializable {
- private static final long serialVersionUID = 1L;
- private final List<String> scanFiles = new ArrayList<>();
- private long totalSize = 0;
-
- public PartitionWorkLoadBasedOnSize() {
- }
-
- public List<String> getScanFiles() {
- return scanFiles;
- }
-
- public void addScanFile(String scanFile, long size) {
- this.scanFiles.add(scanFile);
- this.totalSize += size;
- }
-
- public long getTotalSize() {
- return totalSize;
- }
-
- @Override
- public String toString() {
- return "Files: " + scanFiles.size() + ", Total Size: " + totalSize;
- }
+ return RECORD_READER_NAMES;
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index a5b21b6..a094c22 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -19,24 +19,21 @@
package org.apache.asterix.external.input.record.reader.aws.delta;
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.util.IFeedLogManager;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
import io.delta.kernel.Scan;
import io.delta.kernel.data.ColumnarBatch;
@@ -69,20 +66,10 @@
private Row scanFile;
private CloseableIterator<Row> rows;
- public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, Map<String, String> conf,
- IExternalDataRuntimeContext context) {
- Configuration config = new Configuration();
- config.set(S3Constants.HADOOP_ACCESS_KEY_ID, conf.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
- config.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, conf.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
- if (conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
- config.set(S3Constants.HADOOP_SESSION_TOKEN, conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
- }
- config.set(S3Constants.HADOOP_REGION, conf.get(S3Constants.REGION_FIELD_NAME));
- String serviceEndpoint = conf.get(SERVICE_END_POINT_FIELD_NAME);
- if (serviceEndpoint != null) {
- config.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
- }
- this.engine = DefaultEngine.create(config);
+ public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, ConfFactory config)
+ throws HyracksDataException {
+ JobConf conf = config.getConf();
+ this.engine = DefaultEngine.create(conf);
this.scanFiles = new ArrayList<>();
for (String scanFile : serScanFiles) {
this.scanFiles.add(RowSerDe.deserializeRowFromJson(scanFile));
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
new file mode 100644
index 0000000..dc4c310
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.delta.kernel.Scan;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelException;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.FileStatus;
+
+public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = LogManager.getLogger();
+ private transient AlgebricksAbsolutePartitionConstraint locationConstraints;
+ private String scanState;
+ protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
+ protected ConfFactory confFactory;
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ return locationConstraints;
+ }
+
+ protected abstract void configureJobConf(JobConf conf, Map<String, String> configuration)
+ throws AlgebricksException;
+
+ protected abstract String getTablePath(Map<String, String> configuration) throws AlgebricksException;
+
+ @Override
+ public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AlgebricksException, HyracksDataException {
+ JobConf conf = new JobConf();
+ ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
+ configureJobConf(conf, configuration);
+ confFactory = new ConfFactory(conf);
+ String tableMetadataPath = getTablePath(configuration);
+ Engine engine = DefaultEngine.create(conf);
+ io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
+ Snapshot snapshot;
+ try {
+ snapshot = table.getLatestSnapshot(engine);
+ } catch (KernelException e) {
+ LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e);
+ throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
+ }
+
+ List<Warning> warnings = new ArrayList<>();
+ DeltaConverterContext converterContext = new DeltaConverterContext(configuration, warnings);
+ AsterixTypeToDeltaTypeVisitor visitor = new AsterixTypeToDeltaTypeVisitor(converterContext);
+ StructType requiredSchema;
+ try {
+ ARecordType expectedType = HDFSUtils.getExpectedType(conf);
+ Map<String, FunctionCallInformation> functionCallInformationMap =
+ HDFSUtils.getFunctionCallInformationMap(conf);
+ StructType fileSchema = snapshot.getSchema(engine);
+ requiredSchema = visitor.clipType(expectedType, fileSchema, functionCallInformationMap);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (AsterixDeltaRuntimeException e) {
+ throw e.getHyracksDataException();
+ }
+ Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
+ scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
+ CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine);
+
+ List<Row> scanFiles = new ArrayList<>();
+ while (iter.hasNext()) {
+ FilteredColumnarBatch batch = iter.next();
+ CloseableIterator<Row> rowIter = batch.getRows();
+ while (rowIter.hasNext()) {
+ Row row = rowIter.next();
+ scanFiles.add(row);
+ }
+ }
+ locationConstraints = configureLocationConstraints(appCtx, scanFiles);
+ configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
+ distributeFiles(scanFiles);
+ issueWarnings(warnings, warningCollector);
+ }
+
+ private void issueWarnings(List<Warning> warnings, IWarningCollector warningCollector) {
+ if (!warnings.isEmpty()) {
+ for (Warning warning : warnings) {
+ if (warningCollector.shouldWarn()) {
+ warningCollector.warn(warning);
+ }
+ }
+ }
+ warnings.clear();
+ }
+
+ private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx,
+ List<Row> scanFiles) {
+ IClusterStateManager csm = appCtx.getClusterStateManager();
+
+ String[] locations = csm.getClusterLocations().getLocations();
+ if (scanFiles.size() == 0) {
+ return AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
+ } else if (locations.length > scanFiles.size()) {
+ LOGGER.debug(
+ "configured partitions ({}) exceeds total partition count ({}); limiting configured partitions to total partition count",
+ locations.length, scanFiles.size());
+ final String[] locationCopy = locations.clone();
+ ArrayUtils.shuffle(locationCopy);
+ locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
+ }
+ return new AlgebricksAbsolutePartitionConstraint(locations);
+ }
+
+ private void distributeFiles(List<Row> scanFiles) {
+ final int partitionsCount = getPartitionConstraint().getLocations().length;
+ PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount,
+ Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
+
+ // Prepare the workloads based on the number of partitions
+ for (int i = 0; i < partitionsCount; i++) {
+ workloadQueue.add(new PartitionWorkLoadBasedOnSize());
+ }
+ for (Row scanFileRow : scanFiles) {
+ PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
+ FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow);
+ workload.addScanFile(RowSerDe.serializeRowToJson(scanFileRow), fileStatus.getSize());
+ workloadQueue.add(workload);
+ }
+ partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
+ }
+
+ @Override
+ public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException {
+ try {
+ int partition = context.getPartition();
+ return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState,
+ confFactory);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public Class<?> getRecordClass() throws AsterixException {
+ return Row.class;
+ }
+
+ @Override
+ public Set<String> getReaderSupportedFormats() {
+ return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
+ }
+
+ public static class PartitionWorkLoadBasedOnSize implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final List<String> scanFiles = new ArrayList<>();
+ private long totalSize = 0;
+
+ public PartitionWorkLoadBasedOnSize() {
+ }
+
+ public List<String> getScanFiles() {
+ return scanFiles;
+ }
+
+ public void addScanFile(String scanFile, long size) {
+ this.scanFiles.add(scanFile);
+ this.totalSize += size;
+ }
+
+ public long getTotalSize() {
+ return totalSize;
+ }
+
+ @Override
+ public String toString() {
+ return "Files: " + scanFiles.size() + ", Total Size: " + totalSize;
+ }
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
new file mode 100644
index 0000000..ee88569
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.gcs.delta;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.google.gcs.GCSUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+public class GCSDeltaReaderFactory extends DeltaReaderFactory {
+ private static final long serialVersionUID = 1L;
+ private static final List<String> RECORD_READER_NAMES =
+ Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_GCS);
+
+ @Override
+ protected void configureJobConf(JobConf conf, Map<String, String> configuration) throws AlgebricksException {
+ GCSUtils.configureHdfsJobConf(conf, configuration);
+ }
+
+ @Override
+ protected String getTablePath(Map<String, String> configuration) throws AlgebricksException {
+ return GCSUtils.getPath(configuration);
+ }
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return RECORD_READER_NAMES;
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index c7deb7c..6767f93 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -29,8 +29,10 @@
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START;
+import static org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties;
import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties;
+import static org.apache.asterix.external.util.google.gcs.GCSUtils.configureHdfsJobConf;
import static org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties;
import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
@@ -72,14 +74,15 @@
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
-import org.apache.asterix.external.input.record.reader.aws.delta.AwsS3DeltaReaderFactory;
import org.apache.asterix.external.library.JavaLibrary;
import org.apache.asterix.external.library.msgpack.MessagePackUtils;
import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.asterix.external.util.azure.blob_storage.AzureConstants;
import org.apache.asterix.external.util.google.gcs.GCSConstants;
+import org.apache.asterix.external.util.google.gcs.GCSUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.AUnionType;
@@ -90,6 +93,7 @@
import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -515,21 +519,22 @@
}
}
- public static void validateDeltaTableExists(Map<String, String> configuration) throws CompilationException {
- Configuration conf = new Configuration();
+ public static void validateDeltaTableExists(Map<String, String> configuration) throws AlgebricksException {
String tableMetadataPath = null;
+ JobConf conf = new JobConf();
if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
.equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
- AwsS3DeltaReaderFactory.applyConfiguration(configuration, conf);
- tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
- + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
- + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+ configureAwsS3HdfsJobConf(conf, configuration);
+ tableMetadataPath = S3Utils.getPath(configuration);
+ } else if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
+ .equals(ExternalDataConstants.KEY_ADAPTER_NAME_GCS)) {
+ configureHdfsJobConf(conf, configuration);
+ tableMetadataPath = GCSUtils.getPath(configuration);
} else {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
"Delta format is not supported for the external source type: "
+ configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE));
}
-
Engine engine = DefaultEngine.create(conf);
io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
try {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index f36d25d..45988e8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -333,6 +333,10 @@
* @param configuration properties
* @param numberOfPartitions number of partitions in the cluster
*/
+ public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration) {
+ configureAwsS3HdfsJobConf(conf, configuration, 0);
+ }
+
public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration,
int numberOfPartitions) {
String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
@@ -371,7 +375,9 @@
/*
* Set the size of S3 connection pool to be the number of partitions
*/
- conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
+ if (numberOfPartitions != 0) {
+ conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
+ }
if (serviceEndpoint != null) {
// Validation of the URL should be done at hadoop-aws level
@@ -470,7 +476,11 @@
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
}
if (isDeltaTable(configuration)) {
- validateDeltaTableExists(configuration);
+ try {
+ validateDeltaTableExists(configuration);
+ } catch (AlgebricksException e) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, e);
+ }
}
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index a2b50e1..d8dd478 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -292,4 +292,10 @@
allObjects.put("folders", folders);
return allObjects;
}
+
+ public static String getPath(Map<String, String> configuration) {
+ return S3Constants.HADOOP_S3_PROTOCOL + "://"
+ + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+ + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index 74a664d..bfd35fc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -22,6 +22,8 @@
import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
+import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
import static org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
@@ -140,9 +142,11 @@
*/
public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
IWarningCollector collector) throws CompilationException {
-
+ if (isDeltaTable(configuration)) {
+ validateDeltaTableProperties(configuration);
+ }
// check if the format property is present
- if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+ else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
}
@@ -224,6 +228,11 @@
* @param configuration properties
* @param numberOfPartitions number of partitions in the cluster
*/
+ public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration)
+ throws AlgebricksException {
+ configureHdfsJobConf(conf, configuration, 0);
+ }
+
public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration, int numberOfPartitions)
throws AlgebricksException {
String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
@@ -267,4 +276,10 @@
conf.set(HADOOP_ENDPOINT, endpoint);
}
}
+
+ public static String getPath(Map<String, String> configuration) {
+ return GCSConstants.HADOOP_GCS_PROTOCOL + "://"
+ + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+ + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index 2c15b5a..1f25c4b 100644
--- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -28,4 +28,5 @@
org.apache.asterix.external.input.record.reader.azure.datalake.AzureDataLakeReaderFactory
org.apache.asterix.external.input.record.reader.azure.parquet.AzureBlobParquetReaderFactory
org.apache.asterix.external.input.record.reader.azure.parquet.AzureDataLakeParquetReaderFactory
-org.apache.asterix.external.input.record.reader.gcs.parquet.GCSParquetReaderFactory
\ No newline at end of file
+org.apache.asterix.external.input.record.reader.gcs.parquet.GCSParquetReaderFactory
+org.apache.asterix.external.input.record.reader.gcs.delta.GCSDeltaReaderFactory
\ No newline at end of file