Merge branch 'gerrit/neo'
Change-Id: Ib75414ead94a7da9d8b5216c7b28d8ee198f31b5
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index f532352..1a89168 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -224,7 +224,7 @@
new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
final String nodeId = getServiceContext().getNodeId();
- final Set<Integer> nodePartitions = metadataProperties.getNodeActivePartitions(nodeId);
+ final Set<Integer> nodePartitions = metadataProperties.getNodePartitions(nodeId);
replicaManager = new ReplicaManager(this, nodePartitions);
isShuttingdown = false;
activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index 7c4b59c..7b52df6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -19,7 +19,6 @@
package org.apache.asterix.app.nc;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -31,7 +30,6 @@
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.replication.IPartitionReplica;
import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.storage.ReplicaIdentifier;
@@ -188,11 +186,8 @@
}
private void setNodeOwnedPartitions(INcApplicationContext appCtx) {
- ClusterPartition[] clusterPartitions =
- appCtx.getMetadataProperties().getNodePartitions().get(appCtx.getServiceContext().getNodeId());
- if (clusterPartitions != null) {
- nodeOwnedPartitions.addAll(Arrays.stream(clusterPartitions).map(ClusterPartition::getPartitionId)
- .collect(Collectors.toList()));
- }
+ Set<Integer> nodePartitions =
+ appCtx.getMetadataProperties().getNodePartitions(appCtx.getServiceContext().getNodeId());
+ nodeOwnedPartitions.addAll(nodePartitions);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
index fe579ad..17eff4a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
@@ -36,7 +36,7 @@
private static final Logger LOGGER = LogManager.getLogger();
private static final long serialVersionUID = 2L;
private final NodeStatus status;
- private Set<Integer> activePartitions;
+ private final Set<Integer> activePartitions;
public UpdateNodeStatusTask(NodeStatus status, Set<Integer> activePartitions) {
this.status = status;
@@ -61,6 +61,6 @@
@Override
public String toString() {
- return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }";
+ return "UpdateNodeStatusTask{" + "status=" + status + ", activePartitions=" + activePartitions + '}';
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 22a0a84..06005a9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -168,8 +168,9 @@
protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state,
Set<Integer> activePartitions) {
- LOGGER.info("Building registration tasks for node {} with status {} and system state: {}", nodeId, nodeStatus,
- state);
+ LOGGER.info(
+ "Building registration tasks for node {} with status {} and system state: {} and active partitions {}",
+ nodeId, nodeStatus, state, activePartitions);
final boolean isMetadataNode = nodeId.equals(metadataNodeId);
switch (nodeStatus) {
case ACTIVE:
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 1309369..b2a2dd7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -76,4 +76,10 @@
public Set<Integer> getActivePartitions() {
return activePartitions;
}
+
+ @Override
+ public String toString() {
+ return "NCLifecycleTaskReportMessage{" + "nodeId='" + nodeId + '\'' + ", success=" + success + ", exception="
+ + exception + ", localCounters=" + localCounters + ", activePartitions=" + activePartitions + '}';
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index fb50b3e..9e95ac6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.app.replication.message;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -39,18 +38,18 @@
private static final Logger LOGGER = LogManager.getLogger();
private static final long serialVersionUID = 2L;
- protected final SystemState state;
- protected final String nodeId;
- protected final NodeStatus nodeStatus;
- protected final Map<String, Object> secrets;
- protected final Set<Integer> activePartitions;
+ private final SystemState state;
+ private final String nodeId;
+ private final NodeStatus nodeStatus;
+ private final Map<String, Object> secrets;
+ private final Set<Integer> activePartitions;
public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state,
Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) {
this.state = state;
this.nodeId = nodeId;
this.nodeStatus = nodeStatus;
- this.secrets = new HashMap<>(secretsEphemeral);
+ this.secrets = secretsEphemeral;
this.activePartitions = activePartitions;
}
@@ -59,6 +58,7 @@
try {
RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
systemState, secretsEphemeral, activePartitions);
+ LOGGER.info("sending {} to CC", msg);
((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg);
} catch (Exception e) {
LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e);
@@ -95,4 +95,10 @@
public Set<Integer> getActivePartitions() {
return activePartitions;
}
+
+ @Override
+ public String toString() {
+ return "RegistrationTasksRequestMessage{" + "state=" + state + ", nodeId='" + nodeId + '\'' + ", nodeStatus="
+ + nodeStatus + ", activePartitions=" + activePartitions + '}';
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index f0a4a7c..f0f0470 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -59,13 +59,9 @@
Throwable exception = null;
try {
for (INCLifecycleTask task : tasks) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.log(Level.INFO, "Starting startup task: " + task);
- }
+ LOGGER.log(Level.INFO, "Starting startup task: {}", task);
task.perform(getCcId(), cs);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.log(Level.INFO, "Completed startup task: " + task);
- }
+ LOGGER.log(Level.INFO, "Completed startup task: {}", task);
}
} catch (Throwable e) { //NOSONAR all startup failures should be reported to CC
LOGGER.log(Level.ERROR, "Failed during startup task", e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 438397c..eb6c779 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -1003,7 +1003,7 @@
throws AlgebricksException {
ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl();
Map<String, String> properties = externalDetails.getProperties();
- ExternalDataUtils.validateType(properties, (ARecordType) itemType.getDatatype());
+ ExternalDataUtils.validateParquetTypeAndConfiguration(properties, (ARecordType) itemType.getDatatype());
return properties;
}
@@ -1331,11 +1331,8 @@
List<List<String>> keyFieldNames = new ArrayList<>(indexedElementsCount);
List<IAType> keyFieldTypes = new ArrayList<>(indexedElementsCount);
List<Integer> keyFieldSourceIndicators = new ArrayList<>(indexedElementsCount);
- if (isSecondaryPrimary) {
- // BACK-COMPAT: secondary primary index has one source indicator
- // which is set to META_RECORD_INDICATOR
- keyFieldSourceIndicators.add(Index.META_RECORD_INDICATOR);
- } else {
+ // secondary primary indexes do not have search keys (no SKs), and thus no equivalent indicators
+ if (!isSecondaryPrimary) {
for (int i = 0; i < indexedElementsCount; i++) {
CreateIndexStatement.IndexedElement indexedElement = indexedElements.get(i);
keyFieldNames.add(indexedElement.getProjectList().get(0).first);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index be1cc7c..f60349f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -300,12 +300,12 @@
final NodeStatus currentStatus = ncs.getNodeStatus();
final SystemState systemState = isPendingStartupTasks(currentStatus, ncs.getPrimaryCcId(), ccId)
? getCurrentSystemState() : SystemState.HEALTHY;
- final Map httpSecrets =
+ final Map<String, Object> httpSecrets =
apiServer != null ? Collections.singletonMap(SYS_AUTH_HEADER, apiServer.ctx().get(SYS_AUTH_HEADER))
: Collections.emptyMap();
RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
currentStatus, systemState, httpSecrets,
- runtimeContext.getMetadataProperties().getNodeActivePartitions(nodeId));
+ runtimeContext.getMetadataProperties().getNodePartitions(nodeId));
}
@Override
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
index 094c1db..316d261 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
@@ -18,9 +18,9 @@
*/
package org.apache.asterix.test.external_dataset;
-import static org.apache.asterix.test.external_dataset.BinaryFileConverterUtil.BINARY_GEN_BASEDIR;
import static org.apache.asterix.test.external_dataset.aws.AwsS3ExternalDatasetTest.BOM_FILE_CONTAINER;
import static org.apache.asterix.test.external_dataset.aws.AwsS3ExternalDatasetTest.FIXED_DATA_CONTAINER;
+import static org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil.BINARY_GEN_BASEDIR;
import java.io.BufferedWriter;
import java.io.File;
@@ -30,6 +30,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
+import org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil;
import org.apache.asterix.testframework.context.TestCaseContext;
import org.apache.commons.io.FilenameUtils;
import org.apache.logging.log4j.LogManager;
@@ -258,18 +259,19 @@
}
private static void loadParquetFiles() {
- String dataBasePath = BINARY_GEN_BASEDIR;
+ String generatedDataBasePath = BINARY_GEN_BASEDIR;
String definition = PARQUET_DEFINITION;
// Normal format
String definitionSegment = "";
- loadData(dataBasePath, "", "dummy_tweet.parquet", definition, definitionSegment, false, false);
- loadData(dataBasePath, "", "id_age.parquet", definition, definitionSegment, false, false);
- loadData(dataBasePath, "", "id_age-string.parquet", definition, definitionSegment, false, false);
- loadData(dataBasePath, "", "id_name.parquet", definition, definitionSegment, false, false);
- loadData(dataBasePath, "", "id_name_comment.parquet", definition, definitionSegment, false, false);
- loadData(dataBasePath, "", "heterogeneous_1.parquet", definition, definitionSegment, false, false);
- loadData(dataBasePath, "", "heterogeneous_2.parquet", definition, definitionSegment, false, false);
+ loadData(generatedDataBasePath, "", "dummy_tweet.parquet", definition, definitionSegment, false, false);
+ loadData(generatedDataBasePath, "", "id_age.parquet", definition, definitionSegment, false, false);
+ loadData(generatedDataBasePath, "", "id_age-string.parquet", definition, definitionSegment, false, false);
+ loadData(generatedDataBasePath, "", "id_name.parquet", definition, definitionSegment, false, false);
+ loadData(generatedDataBasePath, "", "id_name_comment.parquet", definition, definitionSegment, false, false);
+ loadData(generatedDataBasePath, "", "heterogeneous_1.parquet", definition, definitionSegment, false, false);
+ loadData(generatedDataBasePath, "", "heterogeneous_2.parquet", definition, definitionSegment, false, false);
+ loadData(generatedDataBasePath, "", "parquetTypes.parquet", definition, definitionSegment, false, false);
}
private static void loadData(String fileBasePath, String filePathSegment, String filename, String definition,
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
index 05b0d0b..785e676 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
@@ -18,10 +18,10 @@
*/
package org.apache.asterix.test.external_dataset.aws;
-import static org.apache.asterix.test.external_dataset.BinaryFileConverterUtil.DEFAULT_PARQUET_SRC_PATH;
import static org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createBinaryFiles;
import static org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.setDataPaths;
import static org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.setUploaders;
+import static org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil.DEFAULT_PARQUET_SRC_PATH;
import static org.apache.hyracks.util.file.FileUtil.joinPath;
import java.io.ByteArrayOutputStream;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetTest.java
index 894b4bc..7de2d7e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetTest.java
@@ -22,8 +22,8 @@
import static org.apache.asterix.test.common.TestConstants.Azure.AZURITE_ACCOUNT_NAME_DEFAULT;
import static org.apache.asterix.test.common.TestConstants.Azure.BLOB_ENDPOINT_PLACEHOLDER;
import static org.apache.asterix.test.common.TestConstants.Azure.sasToken;
-import static org.apache.asterix.test.external_dataset.BinaryFileConverterUtil.BINARY_GEN_BASEDIR;
import static org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.PARQUET_DEFINITION;
+import static org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil.BINARY_GEN_BASEDIR;
import static org.apache.hyracks.util.file.FileUtil.joinPath;
import java.io.ByteArrayInputStream;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/BinaryFileConverterUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/parquet/BinaryFileConverterUtil.java
similarity index 87%
rename from asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/BinaryFileConverterUtil.java
rename to asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/parquet/BinaryFileConverterUtil.java
index d3865d3..96a8703 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/BinaryFileConverterUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/parquet/BinaryFileConverterUtil.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.test.external_dataset;
+package org.apache.asterix.test.external_dataset.parquet;
import java.io.File;
import java.io.FileInputStream;
@@ -64,13 +64,15 @@
Path outputPath = new Path(destPath.getAbsolutePath(), fileName);
writeParquetFile(jsonFile, outputPath);
}
+ //Write parquet example that contains the specialized types
+ ParquetFileExampleGeneratorUtil.writeExample();
}
private static void writeParquetFile(File jsonInputPath, Path parquetOutputPath) throws IOException {
- final FileInputStream schemaInputStream = new FileInputStream(jsonInputPath);
- final FileInputStream jsonInputStream = new FileInputStream(jsonInputPath);
+ FileInputStream schemaInputStream = new FileInputStream(jsonInputPath);
+ FileInputStream jsonInputStream = new FileInputStream(jsonInputPath);
//Infer Avro schema
- final Schema inputSchema = JsonUtil.inferSchema(schemaInputStream, "parquet_schema", NUM_OF_RECORDS_SCHEMA);
+ Schema inputSchema = JsonUtil.inferSchema(schemaInputStream, "parquet_schema", NUM_OF_RECORDS_SCHEMA);
try (JSONFileReader<Record> reader = new JSONFileReader<>(jsonInputStream, inputSchema, Record.class)) {
reader.initialize();
try (AvroParquetWriter<Record> writer = new AvroParquetWriter<>(parquetOutputPath, inputSchema)) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/parquet/ParquetFileExampleGeneratorUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/parquet/ParquetFileExampleGeneratorUtil.java
new file mode 100644
index 0000000..501fb27
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/parquet/ParquetFileExampleGeneratorUtil.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.external_dataset.parquet;
+
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.AUUIDSerializerDeserializer;
+import org.apache.asterix.om.base.AMutableUUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.NanoTime;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * A generator of a parquet file that contains different specialized type
+ * Adopted from:
+ *
+ * @see <a href="https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java">TestParquetWriter</a>
+ */
+public class ParquetFileExampleGeneratorUtil {
+ //Jan 1st 2022 01:00:00 UTC
+ private static final long TIME_MILLIS = TimeUnit.SECONDS.toMillis(1640998800);
+ private static final int TIME_DAYS = (int) TimeUnit.MILLISECONDS.toDays(TIME_MILLIS);
+ private static final int SINCE_MIDNIGHT_MILLIS = getSecondsSinceMidnight();
+
+ private static final int PST_OFFSET = TimeZone.getTimeZone("PST").getRawOffset();
+ private static final long PST_TIME_MILLIS = TimeUnit.SECONDS.toMillis(1640998800) + PST_OFFSET;
+ private static final int PST_TIME_DAYS = (int) TimeUnit.MILLISECONDS.toDays(PST_TIME_MILLIS);
+ private static final int PST_SINCE_MIDNIGHT_MILLIS = SINCE_MIDNIGHT_MILLIS + PST_OFFSET;
+ private static final int JULIAN_DAY_OF_EPOCH = 2440588;
+
+ private static final String FILE_NAME = "parquetTypes.parquet";
+
+ private static final String SCHEMA = "message test { \n" + " required boolean boolean_field;\n"
+ + " required int32 int8_field (INTEGER(8,true));\n"
+ + " required int32 int16_field (INTEGER(16,true));\n" + " required int32 int32_field;\n"
+ + " required int64 int64_field;\n" + " required int32 uint8_field (INTEGER(8,false));\n"
+ + " required int32 uint16_field (INTEGER(16,false));\n"
+ + " required int32 uint32_field (INTEGER(32,false));\n"
+ + " required int64 uint64_field (INTEGER(64,false));\n"
+ + " required int64 overflowed_uint64_field (INTEGER(64,false));\n" + " required float float_field;\n"
+ + " required double double_field;\n" + " required int32 decimal32_field (DECIMAL(5, 4));\n"
+ + " required int64 decimal64_field (DECIMAL(12, 9));\n"
+ + " required fixed_len_byte_array(10) decimal_fixed80_field (DECIMAL(22,21));\n"
+ + " required binary decimal_arbitrary_length_field (DECIMAL(22,21));\n"
+ + " required binary binary_field;\n" + " required binary string_field (UTF8);\n"
+ + " required binary enum_field (ENUM);\n" + " required binary json_field (JSON);\n"
+ + " required int32 date_field (DATE);\n" + " required int32 time32_millis_field (TIME(MILLIS, true));\n"
+ + " required int64 time64_micros_field (TIME(MICROS, true));\n"
+ + " required int64 time64_nanos_field (TIME(NANOS, true));\n"
+ + " required int32 time32_millis_pst_field (TIME(MILLIS, false));\n"
+ + " required int64 time64_micros_pst_field (TIME(MICROS, false));\n"
+ + " required int64 time64_nanos_pst_field (TIME(NANOS, false));\n"
+ + " required int64 timestamp64_millis_field (TIMESTAMP(MILLIS, true));\n"
+ + " required int64 timestamp64_micros_field (TIMESTAMP(MICROS, true));\n"
+ + " required int64 timestamp64_nanos_field (TIMESTAMP(NANOS, true));\n"
+ + " required int64 timestamp64_millis_pst_field (TIMESTAMP(MILLIS, false));\n"
+ + " required int64 timestamp64_micros_pst_field (TIMESTAMP(MICROS, false));\n"
+ + " required int64 timestamp64_nanos_pst_field (TIMESTAMP(NANOS, false));\n"
+ + " required int96 timestamp96_field;\n" + " required fixed_len_byte_array(16) uuid_field (UUID);"
+ + " required group mapField (MAP) {\n" + " repeated group key_value {\n"
+ + " required int32 key;\n" + " required int32 value;\n" + " }\n" + " }" + "}";
+
+ private ParquetFileExampleGeneratorUtil() {
+ }
+
+ public static void writeExample() throws IOException {
+ Configuration conf = new Configuration();
+ Path root = new Path(BinaryFileConverterUtil.BINARY_GEN_BASEDIR);
+ MessageType schema = parseMessageType(SCHEMA);
+ GroupWriteSupport.setSchema(schema, conf);
+ Path file = new Path(root, FILE_NAME);
+ ParquetWriter<Group> writer = ExampleParquetWriter.builder(new TestOutputFile(file, conf))
+ .withCompressionCodec(UNCOMPRESSED).withRowGroupSize(1024).withPageSize(1024)
+ .withDictionaryPageSize(512).enableDictionaryEncoding().withValidation(false)
+ .withWriterVersion(WriterVersion.PARQUET_2_0).withConf(conf).build();
+ SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);
+ Group message = groupFactory.newGroup().append("boolean_field", true).append("int8_field", 8)
+ .append("int16_field", 16).append("int32_field", 32).append("int64_field", 64L)
+ .append("uint8_field", Byte.MAX_VALUE + 1).append("uint16_field", Short.MAX_VALUE + 1)
+ .append("uint32_field", Integer.MAX_VALUE + 1).append("uint64_field", 151L)
+ .append("overflowed_uint64_field", Long.MAX_VALUE + 1).append("float_field", 1.0F)
+ .append("double_field", 1.0D).append("decimal32_field", getDecimal32())
+ .append("decimal64_field", getDecimal64()).append("decimal_fixed80_field", getDecimal80())
+ .append("decimal_arbitrary_length_field", getDecimal80()).append("binary_field", createConstantBinary())
+ .append("string_field", "stringVal").append("enum_field", "enumVal").append("json_field", "[1,2,3]")
+ .append("date_field", TIME_DAYS).append("time32_millis_field", SINCE_MIDNIGHT_MILLIS)
+ .append("time64_micros_field", TimeUnit.MILLISECONDS.toMicros(SINCE_MIDNIGHT_MILLIS))
+ .append("time64_nanos_field", TimeUnit.MILLISECONDS.toNanos(SINCE_MIDNIGHT_MILLIS))
+ .append("time32_millis_pst_field", PST_SINCE_MIDNIGHT_MILLIS)
+ .append("time64_micros_pst_field", TimeUnit.MILLISECONDS.toMicros(PST_SINCE_MIDNIGHT_MILLIS))
+ .append("time64_nanos_pst_field", TimeUnit.MILLISECONDS.toNanos(PST_SINCE_MIDNIGHT_MILLIS))
+ .append("timestamp64_millis_field", TIME_MILLIS)
+ .append("timestamp64_micros_field", TimeUnit.MILLISECONDS.toMicros(TIME_MILLIS))
+ .append("timestamp64_nanos_field", TimeUnit.MILLISECONDS.toNanos(TIME_MILLIS))
+ .append("timestamp64_millis_pst_field", PST_TIME_MILLIS)
+ .append("timestamp64_micros_pst_field", TimeUnit.MILLISECONDS.toMicros(PST_TIME_MILLIS))
+ .append("timestamp64_nanos_pst_field", TimeUnit.MILLISECONDS.toNanos(PST_TIME_MILLIS))
+ .append("timestamp96_field",
+ new NanoTime(PST_TIME_DAYS + JULIAN_DAY_OF_EPOCH,
+ TimeUnit.MILLISECONDS.toNanos(PST_SINCE_MIDNIGHT_MILLIS)))
+ .append("uuid_field", createUUIDBinary());
+ Group mapField = message.addGroup("mapField");
+ mapField.addGroup("key_value").append("key", 1).append("value", 1);
+ writer.write(message);
+ writer.close();
+ }
+
+ private static int getSecondsSinceMidnight() {
+ Instant instant = Instant.ofEpochMilli(TIME_MILLIS);
+ Instant midnight = LocalDate.ofInstant(instant, ZoneOffset.UTC).atStartOfDay().toInstant(ZoneOffset.UTC);
+ return (int) Duration.between(midnight, instant).toMillis();
+ }
+
+ private static int getDecimal32() {
+ BigDecimal decimal = new BigDecimal("1.1000");
+ return decimal.unscaledValue().intValue();
+ }
+
+ private static long getDecimal64() {
+ BigDecimal decimal = new BigDecimal("154.000000001");
+ return decimal.unscaledValue().longValue();
+ }
+
+ private static Binary getDecimal80() {
+ BigDecimal decimal = new BigDecimal("9.223372036854775800001");
+ return Binary.fromConstantByteArray(decimal.unscaledValue().toByteArray());
+ }
+
+ private static Binary createConstantBinary() {
+ byte[] binaryBytes = { 0x00, 0x01, 0x02 };
+ return Binary.fromConstantByteArray(binaryBytes);
+ }
+
+ private static Binary createUUIDBinary() throws HyracksDataException {
+ char[] digit = "123e4567-e89b-12d3-a456-426614174000".toCharArray();
+ AMutableUUID uuid = new AMutableUUID();
+ uuid.parseUUIDString(digit, 0, digit.length);
+ ArrayBackedValueStorage storage = new ArrayBackedValueStorage();
+ AUUIDSerializerDeserializer.INSTANCE.serialize(uuid, storage.getDataOutput());
+ return Binary.fromConstantByteArray(storage.getByteArray(), 0, storage.getLength());
+ }
+
+ private static class TestOutputFile implements OutputFile {
+
+ private final OutputFile outputFile;
+
+ TestOutputFile(Path path, Configuration conf) throws IOException {
+ outputFile = HadoopOutputFile.fromPath(path, conf);
+ }
+
+ @Override
+ public PositionOutputStream create(long blockSizeHint) throws IOException {
+ return outputFile.create(blockSizeHint);
+ }
+
+ @Override
+ public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
+ return outputFile.createOrOverwrite(blockSizeHint);
+ }
+
+ @Override
+ public boolean supportsBlockSize() {
+ return outputFile.supportsBlockSize();
+ }
+
+ @Override
+ public long defaultBlockSize() {
+ return outputFile.defaultBlockSize();
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/HDFSCluster.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/HDFSCluster.java
index 4c30d2b..19a9253 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/HDFSCluster.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/HDFSCluster.java
@@ -18,14 +18,14 @@
*/
package org.apache.asterix.test.runtime;
-import static org.apache.asterix.test.external_dataset.BinaryFileConverterUtil.BINARY_GEN_BASEDIR;
-import static org.apache.asterix.test.external_dataset.BinaryFileConverterUtil.DEFAULT_PARQUET_SRC_PATH;
+import static org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil.BINARY_GEN_BASEDIR;
+import static org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil.DEFAULT_PARQUET_SRC_PATH;
import java.io.File;
import java.io.IOException;
import org.apache.asterix.external.dataset.adapter.GenericAdapter;
-import org.apache.asterix.test.external_dataset.BinaryFileConverterUtil;
+import org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/window/win_opt_02/win_opt_02_1.sqlpp
similarity index 64%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
copy to asterixdb/asterix-app/src/test/resources/optimizerts/queries/window/win_opt_02/win_opt_02_1.sqlpp
index 946b10c..def3a02 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/window/win_opt_02/win_opt_02_1.sqlpp
@@ -16,19 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
-
-import org.apache.hyracks.data.std.api.IValueReference;
-
-/**
- * This interface is intended to extend {@link org.apache.parquet.io.api.Converter} to get the field name or the
- * index of associated with a value.
+/*
+ * Description : Test fix for ASTERIXDB-3007
+ * Expected Res : SUCCESS
*/
-interface IFieldValue {
- IValueReference getFieldName();
- /**
- * @return the index of the value as appeared in the schema
- */
- int getIndex();
-}
+with ds1 as (
+ select r as t, r*r as x
+ from range(1, 10) r
+)
+
+select t, x, dt, dx, int(v) as v, int(a) as a
+from ds1
+let dt = t - lag(t) over (order by t),
+ dx = x - lag(x) over (order by t),
+ v = dx/dt,
+ a = v - lag(v) over (order by t)
+order by t;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_02/win_opt_02_1.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_02/win_opt_02_1.plan
new file mode 100644
index 0000000..931e417
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_02/win_opt_02_1.plan
@@ -0,0 +1,23 @@
+-- DISTRIBUTE_RESULT |LOCAL|
+ -- ONE_TO_ONE_EXCHANGE |LOCAL|
+ -- STREAM_PROJECT |LOCAL|
+ -- ASSIGN |LOCAL|
+ -- STREAM_PROJECT |LOCAL|
+ -- ASSIGN |LOCAL|
+ -- STREAM_PROJECT |LOCAL|
+ -- WINDOW |LOCAL|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- WINDOW |LOCAL|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- WINDOW_STREAM |LOCAL|
+ -- ONE_TO_ONE_EXCHANGE |LOCAL|
+ -- STABLE_SORT [$$r(ASC)] |LOCAL|
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ -- UNNEST |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/decimal/decimal.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/decimal/decimal.1.ddl.sqlpp
new file mode 100644
index 0000000..ecf866e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/decimal/decimal.1.ddl.sqlpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+
+CREATE TYPE ParquetType as {
+};
+
+CREATE EXTERNAL DATASET ParquetDataset(ParquetType) USING %adapter%
+(
+ %template%,
+ ("container"="playground"),
+ ("definition"="parquet-data/reviews"),
+ ("include"="*parquetTypes.parquet"),
+ ("format" = "parquet"),
+ ("decimal-to-double" = "true")
+);
+
+CREATE EXTERNAL DATASET ParquetDataset2(ParquetType) USING %adapter%
+(
+ %template%,
+ ("container"="playground"),
+ ("definition"="parquet-data/reviews"),
+ ("include"="*parquetTypes.parquet"),
+ ("format" = "parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/decimal/decimal.2.query.sqlpp
similarity index 63%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/decimal/decimal.2.query.sqlpp
index 946b10c..1bfd2df 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/decimal/decimal.2.query.sqlpp
@@ -16,19 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
+/*
+* Description : Test Parquet's decimal types
+* Expected Res : Success with a warning about precision loss
+* Date : Jan 27th 2022
+*/
-import org.apache.hyracks.data.std.api.IValueReference;
+-- param max-warnings:json=1000
-/**
- * This interface is intended to extend {@link org.apache.parquet.io.api.Converter} to get the field name or the
- * index of associated with a value.
- */
-interface IFieldValue {
- IValueReference getFieldName();
+USE test;
- /**
- * @return the index of the value as appeared in the schema
- */
- int getIndex();
-}
+SELECT p.decimal32_field,
+ p.decimal64_field,
+ p.decimal_fixed80_field,
+ p.decimal_arbitrary_length_field
+FROM ParquetDataset p
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/decimal/decimal.3.query.sqlpp
similarity index 63%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/decimal/decimal.3.query.sqlpp
index 946b10c..5cdff29 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/decimal/decimal.3.query.sqlpp
@@ -16,19 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
+/*
+* Description : Test Parquet's decimal types
+* Expected Res : Error decimal-to-double is not enabled
+* Date : Jan 27th 2022
+*/
-import org.apache.hyracks.data.std.api.IValueReference;
+-- param max-warnings:json=1000
-/**
- * This interface is intended to extend {@link org.apache.parquet.io.api.Converter} to get the field name or the
- * index of associated with a value.
- */
-interface IFieldValue {
- IValueReference getFieldName();
+USE test;
- /**
- * @return the index of the value as appeared in the schema
- */
- int getIndex();
-}
+SELECT p.decimal32_field
+FROM ParquetDataset2 p
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/disable-json-parsing/disable-json-parsing.1.ddl.sqlpp
similarity index 63%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/disable-json-parsing/disable-json-parsing.1.ddl.sqlpp
index 946b10c..03af660 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/disable-json-parsing/disable-json-parsing.1.ddl.sqlpp
@@ -16,19 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
-import org.apache.hyracks.data.std.api.IValueReference;
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
-/**
- * This interface is intended to extend {@link org.apache.parquet.io.api.Converter} to get the field name or the
- * index of associated with a value.
- */
-interface IFieldValue {
- IValueReference getFieldName();
+USE test;
- /**
- * @return the index of the value as appeared in the schema
- */
- int getIndex();
-}
+
+CREATE TYPE ParquetType as {
+};
+
+CREATE EXTERNAL DATASET ParquetDataset(ParquetType) USING %adapter%
+(
+ %template%,
+ ("container"="playground"),
+ ("definition"="parquet-data/reviews"),
+ ("include"="*parquetTypes.parquet"),
+ ("format" = "parquet"),
+ ("parse-json-string" = "false")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/disable-json-parsing/disable-json-parsing.2.query.sqlpp
similarity index 63%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/disable-json-parsing/disable-json-parsing.2.query.sqlpp
index 946b10c..310c9ed 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/disable-json-parsing/disable-json-parsing.2.query.sqlpp
@@ -16,19 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
+/*
+* Description : Test Parquet's json type
+* Expected Res : Success
+* Date : Jan 27th 2022
+*/
-import org.apache.hyracks.data.std.api.IValueReference;
+-- param max-warnings:json=1000
-/**
- * This interface is intended to extend {@link org.apache.parquet.io.api.Converter} to get the field name or the
- * index of associated with a value.
- */
-interface IFieldValue {
- IValueReference getFieldName();
+USE test;
- /**
- * @return the index of the value as appeared in the schema
- */
- int getIndex();
-}
+SELECT p.json_field
+FROM ParquetDataset p
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/invalid-timezone/temporal.1.ddl.sqlpp
similarity index 63%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/invalid-timezone/temporal.1.ddl.sqlpp
index 946b10c..5c202c3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/invalid-timezone/temporal.1.ddl.sqlpp
@@ -16,19 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
-import org.apache.hyracks.data.std.api.IValueReference;
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
-/**
- * This interface is intended to extend {@link org.apache.parquet.io.api.Converter} to get the field name or the
- * index of associated with a value.
- */
-interface IFieldValue {
- IValueReference getFieldName();
+USE test;
- /**
- * @return the index of the value as appeared in the schema
- */
- int getIndex();
-}
+
+CREATE TYPE ParquetType as {
+};
+
+CREATE EXTERNAL DATASET ParquetDataset(ParquetType) USING %adapter%
+(
+ %template%,
+ ("container"="playground"),
+ ("definition"="parquet-data/reviews"),
+ ("include"="*parquetTypes.parquet"),
+ ("format" = "parquet"),
+ ("timezone" = "invalid-timezone")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/temporal/temporal.1.ddl.sqlpp
similarity index 63%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/temporal/temporal.1.ddl.sqlpp
index 946b10c..220eb91 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/temporal/temporal.1.ddl.sqlpp
@@ -16,19 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
-import org.apache.hyracks.data.std.api.IValueReference;
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
-/**
- * This interface is intended to extend {@link org.apache.parquet.io.api.Converter} to get the field name or the
- * index of associated with a value.
- */
-interface IFieldValue {
- IValueReference getFieldName();
+USE test;
- /**
- * @return the index of the value as appeared in the schema
- */
- int getIndex();
-}
+
+CREATE TYPE ParquetType as {
+};
+
+CREATE EXTERNAL DATASET ParquetDataset(ParquetType) USING %adapter%
+(
+ %template%,
+ ("container"="playground"),
+ ("definition"="parquet-data/reviews"),
+ ("include"="*parquetTypes.parquet"),
+ ("format" = "parquet"),
+ ("timezone" = "PST")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/temporal/tempral.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/temporal/tempral.2.query.sqlpp
new file mode 100644
index 0000000..3d19e92
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/temporal/tempral.2.query.sqlpp
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Test Parquet's temporal types (UTC values are adjusted to PST)
+* Expected Res : Success
+* Date : Jan 27th 2022
+*/
+
+-- param max-warnings:json=1000
+
+USE test;
+
+SELECT p.date_field,
+ p.time32_millis_field,
+ p.time64_micros_field,
+ p.time64_nanos_field,
+ p.time32_millis_pst_field,
+ p.time64_micros_pst_field,
+ p.time64_nanos_pst_field,
+ p.timestamp64_millis_field,
+ p.timestamp64_micros_field,
+ p.timestamp64_nanos_field,
+ p.timestamp64_millis_pst_field,
+ p.timestamp64_micros_pst_field,
+ p.timestamp64_nanos_pst_field,
+ p.timestamp96_field
+FROM ParquetDataset p
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/unset-flags/unset-flags.1.ddl.sqlpp
similarity index 63%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/unset-flags/unset-flags.1.ddl.sqlpp
index 946b10c..5f5f661 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/unset-flags/unset-flags.1.ddl.sqlpp
@@ -16,19 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
-import org.apache.hyracks.data.std.api.IValueReference;
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
-/**
- * This interface is intended to extend {@link org.apache.parquet.io.api.Converter} to get the field name or the
- * index of associated with a value.
- */
-interface IFieldValue {
- IValueReference getFieldName();
+USE test;
- /**
- * @return the index of the value as appeared in the schema
- */
- int getIndex();
-}
+
+CREATE TYPE ParquetType as {
+};
+
+CREATE EXTERNAL DATASET ParquetDataset(ParquetType) USING %adapter%
+(
+ %template%,
+ ("container"="playground"),
+ ("definition"="parquet-data/reviews"),
+ ("include"="*parquetTypes.parquet"),
+ ("format" = "parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/unset-flags/unset-flags.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/unset-flags/unset-flags.2.query.sqlpp
new file mode 100644
index 0000000..80a4cc2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/parquet-types/unset-flags/unset-flags.2.query.sqlpp
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Test Parquet types except for decimals
+* Expected Res : Success with warnings
+* Date : November 1st 2021
+*/
+
+-- param max-warnings:json=1000
+
+USE test;
+
+SELECT p.boolean_field,
+ p.int8_field,
+ p.int16_field,
+ p.int32_field,
+ p.int64_field,
+ p.uint8_field,
+ p.uint16_field,
+ p.uint32_field,
+ p.uint64_field,
+ p.overflowed_uint64_field,
+ p.float_field,
+ p.double_field,
+ p.binary_field,
+ p.string_field,
+ p.enum_field,
+ p.json_field,
+ p.date_field,
+ p.time32_millis_field,
+ p.time64_micros_field,
+ p.time64_nanos_field,
+ p.time32_millis_pst_field,
+ p.time64_micros_pst_field,
+ p.time64_nanos_pst_field,
+ p.timestamp64_millis_field,
+ p.timestamp64_micros_field,
+ p.timestamp64_nanos_field,
+ p.timestamp64_millis_pst_field,
+ p.timestamp64_micros_pst_field,
+ p.timestamp64_nanos_pst_field,
+ p.timestamp96_field,
+ p.uuid_field
+FROM ParquetDataset p
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.10.query.sqlpp
similarity index 64%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.10.query.sqlpp
index 946b10c..a6f448c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.10.query.sqlpp
@@ -16,19 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
-
-import org.apache.hyracks.data.std.api.IValueReference;
-
-/**
- * This interface is intended to extend {@link org.apache.parquet.io.api.Converter} to get the field name or the
- * index of associated with a value.
+/*
+ * Description : Test fix for ASTERIXDB-3007
+ * Expected Res : SUCCESS
*/
-interface IFieldValue {
- IValueReference getFieldName();
- /**
- * @return the index of the value as appeared in the schema
- */
- int getIndex();
-}
+use test;
+
+with ds1 as (
+ select r as t, r*r as x
+ from range(1, 10) r
+)
+
+select t, x, dt, dx, int(v) as v, int(a) as a
+from ds1
+let dt = t - lag(t) over (order by t),
+ dx = x - lag(x) over (order by t),
+ v = dx/dt,
+ a = v - lag(v) over (order by t)
+order by t;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/parquet-types/decimal/decimal.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/parquet-types/decimal/decimal.02.adm
new file mode 100644
index 0000000..9f8b991
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/parquet-types/decimal/decimal.02.adm
@@ -0,0 +1 @@
+{ "decimal32_field": 1.1, "decimal64_field": 154.000000001, "decimal_fixed80_field": 9.223372036854776, "decimal_arbitrary_length_field": 9.223372036854776 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/parquet-types/disable-json-parsing/disable-json-parsing.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/parquet-types/disable-json-parsing/disable-json-parsing.02.adm
new file mode 100644
index 0000000..86fc647
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/parquet-types/disable-json-parsing/disable-json-parsing.02.adm
@@ -0,0 +1 @@
+{ "json_field": "[1,2,3]" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/parquet-types/temporal/temporal.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/parquet-types/temporal/temporal.02.adm
new file mode 100644
index 0000000..b1ddb13
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/parquet-types/temporal/temporal.02.adm
@@ -0,0 +1 @@
+{ "date_field": date("2022-01-01"), "time32_millis_field": time("01:00:00.000"), "time64_micros_field": time("01:00:00.000"), "time64_nanos_field": time("01:00:00.000"), "time32_millis_pst_field": time("17:00:00.000"), "time64_micros_pst_field": time("17:00:00.000"), "time64_nanos_pst_field": time("17:00:00.000"), "timestamp64_millis_field": datetime("2021-12-31T17:00:00.000"), "timestamp64_micros_field": datetime("2021-12-31T17:00:00.000"), "timestamp64_nanos_field": datetime("2021-12-31T17:00:00.000"), "timestamp64_millis_pst_field": datetime("2021-12-31T17:00:00.000"), "timestamp64_micros_pst_field": datetime("2021-12-31T17:00:00.000"), "timestamp64_nanos_pst_field": datetime("2021-12-31T17:00:00.000"), "timestamp96_field": datetime("2021-12-30T17:00:00.000") }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/parquet-types/unset-flags/unset-flags.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/parquet-types/unset-flags/unset-flags.02.adm
new file mode 100644
index 0000000..c42147b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/parquet-types/unset-flags/unset-flags.02.adm
@@ -0,0 +1 @@
+{ "boolean_field": true, "int8_field": 8, "int16_field": 16, "int32_field": 32, "int64_field": 64, "uint8_field": 128, "uint16_field": 32768, "uint32_field": 2147483648, "uint64_field": 151, "float_field": 1.0, "double_field": 1.0, "binary_field": hex("000102"), "string_field": "stringVal", "enum_field": "enumVal", "json_field": [ 1, 2, 3 ], "date_field": date("2022-01-01"), "time32_millis_field": time("01:00:00.000"), "time64_micros_field": time("01:00:00.000"), "time64_nanos_field": time("01:00:00.000"), "time32_millis_pst_field": time("17:00:00.000"), "time64_micros_pst_field": time("17:00:00.000"), "time64_nanos_pst_field": time("17:00:00.000"), "timestamp64_millis_field": datetime("2022-01-01T01:00:00.000"), "timestamp64_micros_field": datetime("2022-01-01T01:00:00.000"), "timestamp64_nanos_field": datetime("2022-01-01T01:00:00.000"), "timestamp64_millis_pst_field": datetime("2021-12-31T17:00:00.000"), "timestamp64_micros_pst_field": datetime("2021-12-31T17:00:00.000"), "timestamp64_nanos_pst_field": datetime("2021-12-31T17:00:00.000"), "timestamp96_field": datetime("2021-12-30T17:00:00.000"), "uuid_field": uuid("123e4567-e89b-12d3-a456-426614174000") }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.10.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.10.adm
new file mode 100644
index 0000000..29322df
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.10.adm
@@ -0,0 +1,10 @@
+{ "t": 1, "x": 1, "dt": null, "dx": null, "v": null, "a": null }
+{ "t": 2, "x": 4, "dt": 1, "dx": 3, "v": 3, "a": null }
+{ "t": 3, "x": 9, "dt": 1, "dx": 5, "v": 5, "a": 2 }
+{ "t": 4, "x": 16, "dt": 1, "dx": 7, "v": 7, "a": 2 }
+{ "t": 5, "x": 25, "dt": 1, "dx": 9, "v": 9, "a": 2 }
+{ "t": 6, "x": 36, "dt": 1, "dx": 11, "v": 11, "a": 2 }
+{ "t": 7, "x": 49, "dt": 1, "dx": 13, "v": 13, "a": 2 }
+{ "t": 8, "x": 64, "dt": 1, "dx": 15, "v": 15, "a": 2 }
+{ "t": 9, "x": 81, "dt": 1, "dx": 17, "v": 17, "a": 2 }
+{ "t": 10, "x": 100, "dt": 1, "dx": 19, "v": 19, "a": 2 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index bacc23b..a8786e2 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -172,6 +172,46 @@
<expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
</compilation-unit>
</test-case>
+ <test-case FilePath="external-dataset" check-warnings="true">
+ <compilation-unit name="common/parquet/parquet-types/unset-flags">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">common/parquet/parquet-types/unset-flags</output-dir>
+ <source-location>false</source-location>
+ <expected-warn>Parquet file(s) contain unsigned integer that is larger than the 'bigint' range</expected-warn>
+ <expected-warn>Parquet file(s) contain values of the temporal type 'datetime' that are adjusted to UTC. Recreate the external dataset and set the option 'timezone' to get the local-adjusted 'datetime' value</expected-warn>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset" check-warnings="true">
+ <compilation-unit name="common/parquet/parquet-types/decimal">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">common/parquet/parquet-types/decimal</output-dir>
+ <source-location>false</source-location>
+ <expected-error>ASX0054: Parquet type 'required int32 decimal32_field (DECIMAL(5,4))' is not supported by default. To enable type conversion, recreate the external dataset with the option 'decimal-to-double' enabled</expected-error>
+ <expected-warn>Parquet decimal precision loss: precision '22' is greater than the maximum supported precision '20'</expected-warn>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset" check-warnings="true">
+ <compilation-unit name="common/parquet/parquet-types/temporal">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">common/parquet/parquet-types/temporal</output-dir>
+ <source-location>false</source-location>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/parquet/parquet-types/invalid-timezone">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">common/parquet/parquet-types/invalid-timezone</output-dir>
+ <source-location>false</source-location>
+ <expected-error>Provided timezone is invalid: 'invalid-timezone'</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset" check-warnings="true">
+ <compilation-unit name="common/parquet/parquet-types/disable-json-parsing">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">common/parquet/parquet-types/disable-json-parsing</output-dir>
+ <source-location>false</source-location>
+ </compilation-unit>
+ </test-case>
<!-- Parquet Tests End -->
<test-case FilePath="external-dataset">
<compilation-unit name="common/empty-string-definition">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java
index 31708d3..252017f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java
@@ -124,8 +124,8 @@
return accessor.getClusterPartitions();
}
- public Set<Integer> getNodeActivePartitions(String nodeId) {
- return accessor.getActivePartitions(nodeId);
+ public Set<Integer> getNodePartitions(String nodeId) {
+ return accessor.getNodePartitions(nodeId);
}
public Map<String, String> getTransactionLogDirs() {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
index afb103d..522cabd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
@@ -46,8 +46,7 @@
STARTING_PARTITION_ID(
OptionTypes.INTEGER,
-1,
- "The first partition id to assign to iodevices on this node (-1 == auto-assign)"),
- ACTIVE_PARTITIONS(OptionTypes.STRING_ARRAY, null, "List of node active partitions");
+ "The first partition id to assign to iodevices on this node (-1 == auto-assign)");
private final IOptionType type;
private final Object defaultValue;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
index 5ba378d..80f9a17 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
@@ -41,6 +41,7 @@
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.utils.PrintUtil;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.config.IApplicationConfig;
@@ -69,7 +70,7 @@
/**
* Constructor which wraps an IApplicationConfig.
*/
- private PropertiesAccessor(IApplicationConfig cfg) throws AsterixException, IOException {
+ private PropertiesAccessor(IApplicationConfig cfg) throws AsterixException {
this.cfg = cfg;
nodePartitionsMap = new ConcurrentHashMap<>();
clusterPartitions = Collections.synchronizedSortedMap(new TreeMap<>());
@@ -80,6 +81,7 @@
for (String ncName : cfg.getNCNames()) {
configureNc(configManager, ncName, uniquePartitionId);
}
+ LOGGER.info("configured partitions: {} from config {}", () -> PrintUtil.toString(nodePartitionsMap), () -> cfg);
for (String section : cfg.getSectionNames()) {
if (section.startsWith(AsterixProperties.SECTION_PREFIX_EXTENSION)) {
String className = AsterixProperties.getSectionId(AsterixProperties.SECTION_PREFIX_EXTENSION, section);
@@ -194,22 +196,16 @@
return clusterPartitions;
}
- public Set<Integer> getActivePartitions(String nodeId) {
- // by default, node actives partitions are the partitions assigned to the node
- String[] activePartitions = cfg.getStringArray(NodeProperties.Option.ACTIVE_PARTITIONS);
- if (activePartitions == null) {
- ClusterPartition[] nodeClusterPartitions = nodePartitionsMap.get(nodeId);
- return Arrays.stream(nodeClusterPartitions).map(ClusterPartition::getPartitionId)
- .collect(Collectors.toSet());
- }
- return Arrays.stream(activePartitions).map(Integer::parseInt).collect(Collectors.toSet());
+ public Set<Integer> getNodePartitions(String nodeId) {
+ ClusterPartition[] nodeClusterPartitions = nodePartitionsMap.get(nodeId);
+ return Arrays.stream(nodeClusterPartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
}
public List<AsterixExtension> getExtensions() {
return extensions;
}
- public static PropertiesAccessor getInstance(IApplicationConfig cfg) throws IOException, AsterixException {
+ public static PropertiesAccessor getInstance(IApplicationConfig cfg) throws AsterixException {
PropertiesAccessor accessor = instances.get(cfg);
if (accessor == null) {
accessor = new PropertiesAccessor(cfg);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index fb001a0..b0d8e02 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -51,6 +51,7 @@
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
+import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.annotations.NotThreadSafe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -132,6 +133,11 @@
throw new IllegalStateException(
"Can't request a flush on an index with active operations: " + numActiveOperations.get());
}
+ if (indexes.isEmpty()) {
+ LOGGER.debug("no open indexes on dataset {} and partition {}... skipping flush",
+ dsInfo.getDatasetID(), partition);
+ return;
+ }
for (ILSMIndex lsmIndex : indexes) {
if (lsmIndex.isPrimaryIndex()) {
if (lsmIndex.isCurrentMutableComponentEmpty()) {
@@ -145,8 +151,10 @@
}
}
if (primaryLsmIndex == null) {
- throw new IllegalStateException("Primary index not found in dataset " + dsInfo.getDatasetID()
- + " and partition " + partition + " open indexes " + indexes);
+ LOGGER.fatal(
+ "Primary index not found in dataset {} and partition {} open indexes {}; halting to clear memory state",
+ dsInfo.getDatasetID(), partition, indexes);
+ ExitUtil.halt(ExitUtil.EC_INCONSISTENT_STORAGE_REFERENCES);
}
for (ILSMIndex lsmIndex : indexes) {
ILSMOperationTracker opTracker = lsmIndex.getOperationTracker();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 8939ce0..068c125 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -80,6 +80,11 @@
INVALID_PARAM(50),
INCOMPARABLE_TYPES(51),
ILLEGAL_STATE(52),
+ UNSUPPORTED_PARQUET_TYPE(53),
+ PARQUET_SUPPORTED_TYPE_WITH_OPTION(54),
+ PARQUET_DECIMAL_TO_DOUBLE_PRECISION_LOSS(55),
+ PARQUET_TIME_ZONE_ID_IS_NOT_SET(56),
+ PARQUET_CONTAINS_OVERFLOWED_BIGINT(57),
UNSUPPORTED_JRE(100),
@@ -257,6 +262,7 @@
COMPILATION_SUBQUERY_COERCION_ERROR(1169),
S3_REGION_NOT_SUPPORTED(1170),
COMPILATION_SET_OPERATION_ERROR(1171),
+ INVALID_TIMEZONE(1172),
// Feed errors
DATAFLOW_ILLEGAL_STATE(3001),
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
index 5cf6724..8956b93 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
@@ -59,4 +59,10 @@
public long getMaxJobId() {
return maxJobId;
}
+
+ @Override
+ public String toString() {
+ return "NcLocalCounters{" + "maxResourceId=" + maxResourceId + ", maxTxnId=" + maxTxnId + ", maxJobId="
+ + maxJobId + '}';
+ }
}
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 5120908f..faaf8d5 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -87,6 +87,11 @@
50 = Invalid parameter \"%1$s\"
#51 is used
52 = Illegal state. %1$s
+53 = Unsupported Parquet type '%1$s'
+54 = Parquet type '%1$s' is not supported by default. To enable type conversion, recreate the external dataset with the option '%2$s' enabled
+55 = Parquet decimal precision loss: precision '%1$s' is greater than the maximum supported precision '%2$s'
+56 = Parquet file(s) contain values of the temporal type '%1$s' that are adjusted to UTC. Recreate the external dataset and set the option '%2$s' to get the local-adjusted '%1$s' value
+57 = Parquet file(s) contain unsigned integer that is larger than the '%1$s' range
100 = Unsupported JRE: %1$s
@@ -259,6 +264,7 @@
1169 = Unable to do subquery coercion. %1$s
1170 = Provided S3 region is not supported: '%1$s'
1171 = Unable to process %1$s clause. %2$s
+1172 = Provided timezone is invalid: '%1$s'
# Feed Errors
3001 = Illegal state.
diff --git a/asterixdb/asterix-doc/src/site/markdown/sqlpp/parquet.md b/asterixdb/asterix-doc/src/site/markdown/sqlpp/parquet.md
new file mode 100644
index 0000000..c31ca50
--- /dev/null
+++ b/asterixdb/asterix-doc/src/site/markdown/sqlpp/parquet.md
@@ -0,0 +1,363 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+
+# Querying Parquet Files #
+
+## <a id="toc">Table of Contents</a> ##
+
+* [Overview](#Overview)
+* [DDL](#DDL)
+* [Query Parquet Files](#QueryParquetFiles)
+* [Type Compatibility](#TypeCompatibility)
+* [Parquet Type Flags](#ParquetTypeFlags)
+
+## <a id="Overview">Overview</a> <font size="4"><a href="#toc">[Back to TOC]</a></font> ##
+
+[Apache Parquet](https://parquet.apache.org/) is a columnar file format for storing semi-structured data (like JSON).
+Apache AsterixDB supports running queries against Parquet files that are stored in Amazon S3 and Microsoft Azure Blob
+Storage as [External Datasets](../aql/externaldata.html).
+
+## <a id="DDL">DDL</a> <font size="4"><a href="#toc">[Back to TOC]</a></font> ##
+
+To start, an end-user needs to create a type as follows:
+
+ -- The type should not contain any declared fields
+ CREATE TYPE ParquetType AS {
+ }
+
+Note that the created type does not have any declared fields. The reason is that Parquet files embed the schema within
+each file. Thus, no type is needed to be declared, and it is up to AsterixDB to read each file's schema. If the created
+type contains any declared type, AsterixDB will throw an error:
+
+ Type 'ParquetType' contains declared fields, which is not supported for 'parquet' format
+
+Next, the user can create an external dataset - using the declared type - as follows:
+
+### Amazon S3
+
+ CREATE EXTERNAL DATASET ParquetDataset(ParquetType) USING S3
+ (
+ -- Replace <ACCESS-KEY> with your access key
+ ("accessKeyId"="<ACCESS-KEY>"),
+
+ -- Replace <SECRET-ACCESS-KEY> with your access key
+ ("secretAccessKey" = "<SECRET-ACCESS-KEY>"),
+
+ -- S3 bucket
+ ("container"="parquetBucket"),
+
+ -- Path to the parquet files within the bucket
+ ("definition"="path/to/parquet/files"),
+
+ -- Specifying the format as parquet
+ ("format" = "parquet")
+ );
+
+### Microsoft Azure Blob Storage
+
+ CREATE EXTERNAL DATASET ParquetDataset(ParquetType) USING AZUREBLOB
+ (
+ -- Replace <ACCOUNT-NAME> with your account name
+ ("accountName"="<ACCOUNT-NAME>"),
+
+ -- Replace <ACCOUNT-KEY> with your account key
+ ("accountKey"="<ACCOUNT-KEY>"),
+
+ -- Azure Blob container
+ ("container"="parquetContainer"),
+
+ -- Path to the parquet files within the bucket
+ ("definition"="path/to/parquet/files"),
+
+ -- Specifying the format as parquet
+ ("format" = "parquet")
+ );
+
+<i><b>Additional setting/properties could be set as detailed later in [Parquet Type Flags](#ParquetTypeFlags)</b></i>
+
+## <a id="QueryParquetFiles">Query Parquet Files</a> <font size="4"><a href="#toc">[Back to TOC]</a></font> ##
+
+To query the data stored in Parquet files, one can simply write a query against the created External Dataset. For
+example:
+
+ SELECT COUNT(*)
+ FROM ParquetDataset;
+
+Another example:
+
+ SELECT pd.age, COUNT(*) cnt
+ FROM ParquetDataset pd
+ GROUP BY pd.age;
+
+## <a id="TypeCompatibility">Type Compatibility</a> <font size="4"><a href="#toc">[Back to TOC]</a></font> ##
+
+AsterixDB supports Parquet's generic types such `STRING`, `INT` and `DOUBLE`. However, Parquet files could
+contain [additional types](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md) such as `DATE` and
+`DATETIME` like types. The following table show the type mapping between Apache Parquet and AsterixDB:
+
+<table>
+ <thead>
+ <tr>
+ <th>Parquet</th>
+ <th>AsterixDB</th>
+ <th>Value Examples</th>
+ <th>Comment</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><code>BOOLEAN</code></td>
+ <td><code>BOOLEAN</code></td>
+ <td><code>true</code> / <code>false</code></td>
+ <td>-</td>
+ </tr>
+ <tr>
+ <td><code>INT_8</code></td>
+ <td rowspan="8"><code>BIGINT</code></td>
+ <td rowspan="8">
+ AsterixDB <code>BIGINT</code> Range:
+ <ul>
+ <li><b>Min</b>:-9,223,372,036,854,775,808</li>
+ <li><b>Max</b>: 9,223,372,036,854,775,807</li>
+ </ul>
+ </td>
+ <td rowspan="7">-</td>
+ </tr>
+ <tr>
+ <td><code>INT_16</code></td>
+ </tr>
+ <tr>
+ <td><code>INT_32</code></td>
+ </tr>
+ <tr>
+ <td><code>INT_64</code></td>
+ </tr>
+ <tr>
+ <td><code>UNIT_8</code></td>
+ </tr>
+ <tr>
+ <td><code>UINT_16</code></td>
+ </tr>
+ <tr>
+ <td><code>UINT_32</code></td>
+ </tr>
+ <tr>
+ <td><code>UINT_64</code></td>
+ <td>There is a possibility that a value overflows. A warning will be issued in case of an overflow and
+ <code>MISSING</code> would be returned.
+ </td>
+ </tr>
+ <tr>
+ <td><code>FLOAT</code></td>
+ <td rowspan="4"><code>DOUBLE</code></td>
+ <td rowspan="4">
+ AsterixDB <code>DOUBLE</code> Range:
+ <ul>
+ <li><b>Min Positive Value</b>: 2^-1074</li>
+ <li><b>Max Positive Value</b>: 2^1023</li>
+ </ul>
+ </td>
+ <td rowspan="2">-</td>
+ </tr>
+ <tr>
+ <td><code>DOUBLE</code></td>
+ </tr>
+ <tr>
+ <td><code>FIXED_LEN_BYTE_ARRAY (DECIMAL)</code></td>
+ <td rowspan="2">
+ Parquet <code>DECIMAL</code> values are converted to doubles, with the possibility of precision loss.
+ The flag <code>decimal-to-double</code> must be set upon creating the dataset.
+ <ul><li><i>See <a href ="#ParquetTypeFlags">Parquet Type Flags</a></i></li></ul>
+ </td>
+ </tr>
+ <tr>
+ <td><code>BINARY (DECIMAL)</code></td>
+ </tr>
+ <tr>
+ <td><code>BINARY (ENUM)</code></td>
+ <td><code>"Fruit"</code></td>
+ <td>Parquet Enum values are parsed as Strings</td>
+ </tr>
+ <tr>
+ <td><code>BINARY (UTF8)</code></td>
+ <td><code>STRING</code></td>
+ <td><code>"Hello World"</code></td>
+ <td>-</td>
+ </tr>
+ <tr>
+ <td><code>FIXED_LEN_BYTE_ARRAY (UUID)</code></td>
+ <td><code>UUID</code></td>
+ <td><code>uuid("123e4567-e89b-12d3-a456-426614174000")</code></td>
+ <td>-</td>
+ </tr>
+ <tr>
+ <td><code>INT_32 (DATE)</code></td>
+ <td><code>DATE</code></td>
+ <td><code>date("2021-11-01")</code></td>
+ <td>-</td>
+ </tr>
+ <tr>
+ <td><code>INT_32 (TIME)</code></td>
+ <td><code>TIME</code></td>
+ <td rowspan="2"><code>time("00:00:00.000")</code></td>
+ <td>Time in milliseconds.</td>
+ </tr>
+ <tr>
+ <td><code>INT_64 (TIME)</code></td>
+ <td><code>TIME</code></td>
+ <td>Time in micro/nano seconds.</td>
+ </tr>
+ <tr>
+ <td><code>INT_64 (TIMESTAMP)</code></td>
+ <td rowspan="2"><code>DATETIME</code></td>
+ <td rowspan="2"><code>datetime("2021-11-01T21:37:13.738")"</code></td>
+ <td>Timestamp in milli/micro/nano seconds. Parquet also can store the timestamp values with the option
+ <code>isAdjustedToUTC = true</code>. To get the local timestamp value, the user can set the time zone ID
+ by setting the value using the option <code>timezone</code> to get the local <code>DATETIME</code> value.
+ <ul><li><i>See <a href ="#ParquetTypeFlags">Parquet Type Flags</a></i></li></ul>
+ </td>
+ </tr>
+ <tr>
+ <td><code>INT96</code></td>
+ <td>A timestamp values that separate days and time to form a timestamp. INT96 is always in localtime.</td>
+ </tr>
+ <tr>
+ <td><code>BINARY (JSON)</code></td>
+ <td>any type</td>
+ <td>
+ <ul>
+ <li><code>{"name": "John"}</code></li>
+ <li><code>[1, 2, 3]</code></li>
+ </ul>
+ </td>
+ <td>
+ Parse JSON string into internal AsterixDB value.
+ The flag <code>parse-json-string</code> is set by default. To get the string value (i.e., not parsed as
+ AsterixDB value), unset the flag <code>parse-json-string</code>.
+ <ul><li><i>See <a href ="#ParquetTypeFlags">Parquet Type Flags</a></i></li></ul>
+ </td>
+ </tr>
+ <tr>
+ <td><code>BINARY</code></td>
+ <td rowspan="2"><code>BINARY</code></td>
+ <td><code>hex("0101FF")</code></td>
+ <td>-</td>
+ </tr>
+ <tr>
+ <td><code>BSON</code></td>
+ <td>N/A</td>
+ <td>BSON values will be returned as <code>BINARY</code></td>
+ </tr>
+ <tr>
+ <td><code>LIST</code></td>
+ <td><code>ARRAY</code></td>
+ <td><code>[1, 2, 3]</code></td>
+ <td>Parquet's <code>LIST</code> type is converted into <code>ARRAY</code></td>
+ </tr>
+ <tr>
+ <td><code>MAP</code></td>
+ <td><code>ARRAY</code> of <code>OBJECT</code></td>
+ <td><code>[{"key":1, "value":1}, {"key":2, "value":2}]</code></td>
+ <td>Parquet's <code>MAP</code> types are converted into an <code>ARRAY</code> of <code>OBJECT</code>. Each
+ <code>OBJECT</code> value consists of two fields: <code>key</code> and <code>value</code>
+ </td>
+ </tr>
+ <tr>
+ <td><code>FIXED_LEN_BYTE_ARRAY (INTERVAL)</code></td>
+ <td>-</td>
+ <td>N/A</td>
+ <td><code>INTERVAL</code> is not supported. A warning will be issued and <code>MISSING</code> value
+ will be returned.
+ </td>
+ </tr>
+ </tbody>
+</table>
+
+## <a id="ParquetTypeFlags">Parquet Type Flags</a> <font size="4"><a href="#toc">[Back to TOC]</a></font> ##
+
+The table in [Type Compatibility](#TypeCompatibility) shows the type mapping between Parquet and AsterixDB. Some of the
+Parquet types are not parsed by default as those type are not natively supported in AsterixDB. However, the user can set
+a flag to convert some of those types into a supported AsterixDB type.
+
+##### DECIMAL TYPE
+
+The user can enable parsing `DECIMAL` Parquet values by enabling a certain flag as in the following example:
+
+ CREATE EXTERNAL DATASET ParquetDataset(ParquetType) USING S3
+ (
+ -- Credintials and path to Parquet files
+ ...
+
+ -- Enable converting decimal values to double
+ ("decimal-to-double" = "true")
+ );
+
+This flag will enable parsing/converting `DECIMAL` values/types into `DOUBLE`. For example, if the flag
+`decimal-to-double` is not set and a Parquet file contains a `DECIMAL` value, the following error will be thrown when
+running a query that request a `DECIMAL` value:
+
+ Parquet type "optional fixed_len_byte_array(16) decimalType (DECIMAL(38,18))" is not supported by default. To enable type conversion, recreate the external dataset with the option "decimal-to-double" enabled
+
+and the returned value will be `MISSING`. If the flag `decimal-to-double` is set, the converted `DOUBLE` value will be
+returned.
+
+##### TEMPORAL TYPES
+
+For the temporal types (namely `DATETIME`), their values could be stored in Parquet with the option
+`isAdjustedToUTC = true`. Hence, the user has to provide the timezone ID to adjust their values to the local value by
+setting the flag `timezone`. To do so, a user can set the timezone ID to "<b>PST</b>" upon creating a dataset as in the
+following example:
+
+ CREATE EXTERNAL DATASET ParquetDataset(ParquetType) USING S3
+ (
+ -- Credintials and path to Parquet files
+ ...
+
+ -- Converting UTC time to PST time
+ ("timezone" = "PST")
+ );
+
+If the flag `timezone` is not set, a warning will appear when running a query:
+
+ Parquet file(s) contain "datetime" values that are adjusted to UTC. Recreate the external dataset and set "timezone" to get the local "datetime" value.
+
+and the UTC `DATETIME` will be returned.
+
+##### JSON TYPE
+
+By default, we parse the JSON values into AsterixDB values, where a user can process those values using `SQL++` queries.
+However, one could disable the parsing of JSON string values (which stored as `STRING`) by unsetting the flag
+`parseJsonString` as in the following example:
+
+ CREATE EXTERNAL DATASET ParquetDataset(ParquetType) USING S3
+ (
+ -- Credintials and path to Parquet files
+ ...
+
+ -- Stop parsing JSON string values
+ ("parse-json-string" = "false")
+ );
+
+And the returned value will be of type `STRING`.
+
+##### INTERVAL TYPE
+
+Currently, AsterixDB do not support Parquet's `INTERVAL` type. When a query requests (or projects) an `INTERVAL` value,
+a warning will be issued and `MISSING` value will be returned instead.
+
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixParquetRuntimeException.java
similarity index 64%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
copy to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixParquetRuntimeException.java
index 946b10c..0ee342a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixParquetRuntimeException.java
@@ -18,17 +18,17 @@
*/
package org.apache.asterix.external.input.record.reader.hdfs.parquet;
-import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
-/**
- * This interface is intended to extend {@link org.apache.parquet.io.api.Converter} to get the field name or the
- * index of associated with a value.
- */
-interface IFieldValue {
- IValueReference getFieldName();
+public class AsterixParquetRuntimeException extends RuntimeException {
+ private static final long serialVersionUID = 6896076874677689992L;
+ private final HyracksDataException hyracksDataException;
- /**
- * @return the index of the value as appeared in the schema
- */
- int getIndex();
+ public AsterixParquetRuntimeException(HyracksDataException e) {
+ this.hyracksDataException = e;
+ }
+
+ public HyracksDataException getHyracksDataException() {
+ return hyracksDataException;
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
index c0a47d5..7258359 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
@@ -18,10 +18,16 @@
*/
package org.apache.asterix.external.input.record.reader.hdfs.parquet;
-import java.util.ArrayList;
-import java.util.List;
+import static org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.PrimitiveConverterProvider.MISSING;
+
import java.util.Map;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.DecimalConverter;
+import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.AUnionType;
@@ -30,11 +36,19 @@
import org.apache.asterix.om.types.IATypeVisitor;
import org.apache.asterix.runtime.projection.DataProjectionInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.exceptions.Warning;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
+import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
+import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation;
+import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
+import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
+import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
@@ -43,14 +57,12 @@
*/
public class AsterixTypeToParquetTypeVisitor implements IATypeVisitor<Type, Type> {
public static final MessageType EMPTY_PARQUET_MESSAGE = Types.buildMessage().named("EMPTY");
- public static final PrimitiveType MISSING =
- Types.optional(PrimitiveType.PrimitiveTypeName.BOOLEAN).named("MISSING");
- private final List<Warning> warnings;
+ private final ParquetConverterContext context;
private Map<String, FunctionCallInformation> funcInfo;
- public AsterixTypeToParquetTypeVisitor() {
- warnings = new ArrayList<>();
+ public AsterixTypeToParquetTypeVisitor(ParquetConverterContext context) {
+ this.context = context;
}
public MessageType clipType(ARecordType rootType, MessageType fileSchema,
@@ -66,10 +78,6 @@
return builder.named(fileSchema.getName());
}
- public List<Warning> getWarnings() {
- return warnings;
- }
-
@Override
public Type visit(ARecordType recordType, Type arg) {
//No LogicalTypeAnnotation for Object types
@@ -86,8 +94,7 @@
/**
* There are two ways for representing arrays using ProtoBuf schema see the example in
- * {@link org.apache.asterix.external.input.record.reader.hdfs.parquet.AbstractComplexConverter} for more
- * information.
+ * {@link AbstractComplexConverter} for more information.
*/
@Override
public Type visit(AbstractCollectionType collectionType, Type arg) {
@@ -99,7 +106,7 @@
Type childType = arrayType.getType(0);
if ("array".equals(childType.getName()) || childType.asGroupType().getFieldCount() > 1) {
//Handle Avro-like schema
- return handleHandleAvroArray(collectionType, arrayType);
+ return handleAvroArray(collectionType, arrayType);
}
//Handling spark-like schema
Types.ListBuilder<GroupType> builder = Types.list(arg.getRepetition());
@@ -128,7 +135,7 @@
return numberOfAddedFields;
}
- private Type handleHandleAvroArray(AbstractCollectionType collectionType, GroupType groupType) {
+ private Type handleAvroArray(AbstractCollectionType collectionType, GroupType groupType) {
Types.GroupBuilder<GroupType> builder =
Types.buildGroup(groupType.getRepetition()).as(groupType.getLogicalTypeAnnotation());
//There is only one child
@@ -157,47 +164,63 @@
if (type == MISSING) {
return true;
}
- ATypeTag actualType = mapType(type);
+ //typeName is unique
+ FunctionCallInformation info = funcInfo.get(node.getTypeName());
+ ATypeTag actualType = mapType(type, context, info.getSourceLocation());
ATypeTag expectedType = node.getTypeTag();
boolean isNotExpected = actualType != expectedType;
if (isNotExpected) {
- //typeName is unique
- FunctionCallInformation info = funcInfo.get(node.getTypeName());
//If no warning is created, then it means it has been reported
- Warning warning = info.createTypeMismatchWarning(expectedType, actualType);
+ Warning warning = null;
+ if (actualType != ATypeTag.SYSTEM_NULL) {
+ warning = info.createTypeMismatchWarning(expectedType, actualType);
+ }
if (warning != null) {
//New warning that we saw for the first time. We should report it.
- warnings.add(warning);
+ context.getWarnings().add(warning);
}
}
return isNotExpected;
}
- private static ATypeTag mapType(Type parquetType) {
+ /* ****************************************
+ * Type checking methods
+ * ****************************************
+ */
+
+ public static ATypeTag mapType(Type parquetType, ParquetConverterContext context, SourceLocation sourceLocation) {
LogicalTypeAnnotation typeAnnotation = parquetType.getLogicalTypeAnnotation();
- if (!parquetType.isPrimitive() && typeAnnotation == null) {
- return ATypeTag.OBJECT;
- } else if (typeAnnotation == LogicalTypeAnnotation.listType()) {
- return ATypeTag.ARRAY;
- } else if (typeAnnotation == LogicalTypeAnnotation.stringType()) {
- return ATypeTag.STRING;
+ if (!parquetType.isPrimitive()) {
+ if (typeAnnotation == null) {
+ return ATypeTag.OBJECT;
+ } else if (typeAnnotation == LogicalTypeAnnotation.listType()
+ || typeAnnotation == LogicalTypeAnnotation.mapType()) {
+ return ATypeTag.ARRAY;
+ }
} else {
//Check other primitive types
- PrimitiveType.PrimitiveTypeName primitiveTypeName = parquetType.asPrimitiveType().getPrimitiveTypeName();
- switch (primitiveTypeName) {
+ PrimitiveType primitiveType = parquetType.asPrimitiveType();
+ switch (primitiveType.getPrimitiveTypeName()) {
case BOOLEAN:
return ATypeTag.BOOLEAN;
- case INT32:
- case INT64:
- return ATypeTag.BIGINT;
case FLOAT:
case DOUBLE:
return ATypeTag.DOUBLE;
- default:
- throw new IllegalStateException("Unsupported type " + parquetType);
+ case INT32:
+ case INT64:
+ return handleInt32Int64(primitiveType, context, sourceLocation);
+ case INT96:
+ return ATypeTag.DATETIME;
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return handleBinary(primitiveType, context, sourceLocation);
}
}
+
+ warnUnsupportedType(context, sourceLocation, parquetType);
+ //Use SYSTEM_NULL for unsupported types
+ return ATypeTag.SYSTEM_NULL;
}
private static Type getType(GroupType groupType, String fieldName) {
@@ -206,4 +229,93 @@
}
return MISSING;
}
+
+ private static ATypeTag handleInt32Int64(PrimitiveType type, ParquetConverterContext context,
+ SourceLocation sourceLocation) {
+ LogicalTypeAnnotation logicalType = type.getLogicalTypeAnnotation();
+ ATypeTag inferredTypeTag = ATypeTag.SYSTEM_NULL;
+ if (logicalType == null || logicalType instanceof IntLogicalTypeAnnotation) {
+ inferredTypeTag = ATypeTag.BIGINT;
+ } else if (logicalType instanceof DateLogicalTypeAnnotation) {
+ inferredTypeTag = ATypeTag.DATE;
+ } else if (logicalType instanceof TimeLogicalTypeAnnotation) {
+ inferredTypeTag = ATypeTag.TIME;
+ } else if (logicalType instanceof TimestampLogicalTypeAnnotation
+ && checkDatetime(type, context, sourceLocation)) {
+ TimestampLogicalTypeAnnotation tsType = (TimestampLogicalTypeAnnotation) logicalType;
+ warnIfUTCAdjustedAndZoneIdIsNotSet(context, sourceLocation, tsType.isAdjustedToUTC());
+ inferredTypeTag = ATypeTag.DATETIME;
+ } else if (logicalType instanceof DecimalLogicalTypeAnnotation) {
+ ensureDecimalToDoubleEnabled(type, context, sourceLocation);
+ inferredTypeTag = ATypeTag.DOUBLE;
+ }
+
+ //Unsupported type
+ return inferredTypeTag;
+ }
+
+ private static ATypeTag handleBinary(PrimitiveType type, ParquetConverterContext context,
+ SourceLocation sourceLocation) {
+ LogicalTypeAnnotation logicalType = type.getLogicalTypeAnnotation();
+ ATypeTag inferredTypeTag = ATypeTag.SYSTEM_NULL;
+ if (logicalType == null || logicalType == LogicalTypeAnnotation.bsonType()) {
+ inferredTypeTag = ATypeTag.BINARY;
+ } else if (logicalType == LogicalTypeAnnotation.stringType()
+ || logicalType == LogicalTypeAnnotation.enumType()) {
+ inferredTypeTag = ATypeTag.STRING;
+ } else if (logicalType == LogicalTypeAnnotation.jsonType()) {
+ //Parsing JSON could be of any type. if parseJson is disabled, return as String
+ inferredTypeTag = context.isParseJsonEnabled() ? ATypeTag.ANY : ATypeTag.STRING;
+ } else if (logicalType instanceof DecimalLogicalTypeAnnotation) {
+ ensureDecimalToDoubleEnabled(type, context, sourceLocation);
+ inferredTypeTag = ATypeTag.DOUBLE;
+ } else if (logicalType instanceof UUIDLogicalTypeAnnotation) {
+ inferredTypeTag = ATypeTag.UUID;
+ }
+
+ //Unsupported type
+ return inferredTypeTag;
+ }
+
+ private static boolean checkDatetime(PrimitiveType type, ParquetConverterContext context,
+ SourceLocation sourceLocation) {
+ if (type.getPrimitiveTypeName() == PrimitiveTypeName.INT32) {
+ //Only INT64 and INT96 are supported per parquet specification
+ warnUnsupportedType(context, sourceLocation, type);
+ return false;
+ }
+ return true;
+ }
+
+ private static void ensureDecimalToDoubleEnabled(PrimitiveType type, ParquetConverterContext context,
+ SourceLocation sourceLocation) {
+ if (!context.isDecimalToDoubleEnabled()) {
+ throw new AsterixParquetRuntimeException(
+ new RuntimeDataException(ErrorCode.PARQUET_SUPPORTED_TYPE_WITH_OPTION, sourceLocation,
+ type.toString(), ParquetOptions.DECIMAL_TO_DOUBLE));
+ }
+
+ DecimalLogicalTypeAnnotation decimalLogicalType =
+ (DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation();
+ int precision = decimalLogicalType.getPrecision();
+ if (precision > DecimalConverter.LONG_MAX_PRECISION) {
+ context.getWarnings().add(Warning.of(null, ErrorCode.PARQUET_DECIMAL_TO_DOUBLE_PRECISION_LOSS, precision,
+ DecimalConverter.LONG_MAX_PRECISION));
+ }
+ }
+
+ public static void warnUnsupportedType(ParquetConverterContext context, SourceLocation sourceLocation,
+ Type parquetType) {
+ context.getWarnings()
+ .add(Warning.of(sourceLocation, ErrorCode.UNSUPPORTED_PARQUET_TYPE, parquetType.toString()));
+ }
+
+ private static void warnIfUTCAdjustedAndZoneIdIsNotSet(ParquetConverterContext context,
+ SourceLocation sourceLocation, boolean adjustedToUTC) {
+ if (adjustedToUTC && context.getTimeZoneId().isEmpty()) {
+ Warning warning = Warning.of(sourceLocation, ErrorCode.PARQUET_TIME_ZONE_ID_IS_NOT_SET, ATypeTag.DATETIME,
+ ParquetOptions.TIMEZONE);
+ context.getWarnings().add(warning);
+ }
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AtomicConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AtomicConverter.java
deleted file mode 100644
index 7219bdd..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AtomicConverter.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.asterix.external.input.stream.StandardUTF8ToModifiedUTF8DataOutput;
-import org.apache.asterix.external.parser.jackson.ParserContext;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.io.api.PrimitiveConverter;
-
-/**
- * Currently, only JSON types are supported (string, number, boolean)
- */
-class AtomicConverter extends PrimitiveConverter implements IFieldValue {
- private final AbstractComplexConverter parent;
- private final IValueReference fieldName;
- private final int index;
- private final ParserContext context;
-
- public AtomicConverter(AbstractComplexConverter parent, int index, ParserContext context) {
- this(parent, null, index, context);
- }
-
- public AtomicConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
- ParserContext context) {
- this.parent = parent;
- this.fieldName = fieldName;
- this.index = index;
- this.context = context;
- }
-
- @Override
- public void addBinary(Binary value) {
- final DataOutput out = parent.getDataOutput();
- final StandardUTF8ToModifiedUTF8DataOutput stringOut = context.getModifiedUTF8DataOutput();
- stringOut.setDataOutput(out);
- try {
- out.writeByte(ATypeTag.STRING.serialize());
- value.writeTo(stringOut);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- parent.addValue(this);
- }
-
- @Override
- public void addBoolean(boolean value) {
- final DataOutput out = parent.getDataOutput();
- try {
- out.writeByte(ATypeTag.BOOLEAN.serialize());
- out.writeBoolean(value);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- parent.addValue(this);
- }
-
- @Override
- public void addFloat(float value) {
- addDouble(value);
- }
-
- @Override
- public void addDouble(double value) {
- final DataOutput out = parent.getDataOutput();
- try {
- out.writeByte(ATypeTag.DOUBLE.serialize());
- out.writeDouble(value);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- parent.addValue(this);
- }
-
- @Override
- public void addInt(int value) {
- addLong(value);
- }
-
- @Override
- public void addLong(long value) {
- final DataOutput out = parent.getDataOutput();
- try {
- out.writeByte(ATypeTag.BIGINT.serialize());
- out.writeLong(value);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- parent.addValue(this);
- }
-
- @Override
- public IValueReference getFieldName() {
- return fieldName;
- }
-
- @Override
- public int getIndex() {
- return index;
- }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java
index cc9b34c..9c1d70a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java
@@ -58,7 +58,11 @@
@SuppressWarnings("unchecked")
@Override
protected RecordReader<Void, V> getRecordReader(int splitIndex) throws IOException {
- reader = (RecordReader<Void, V>) inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
+ try {
+ reader = (RecordReader<Void, V>) inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
+ } catch (AsterixParquetRuntimeException e) {
+ throw e.getHyracksDataException();
+ }
if (value == null) {
value = reader.createValue();
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
index aac293d..797a2b2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
@@ -19,10 +19,12 @@
package org.apache.asterix.external.input.record.reader.hdfs.parquet;
import java.io.IOException;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.RootConverter;
import org.apache.asterix.external.util.HDFSUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
@@ -39,25 +41,27 @@
@Override
public ReadContext init(InitContext context) {
MessageType requestedSchema = getRequestedSchema(context);
- return new ReadContext(requestedSchema, Collections.emptyMap());
+ return new ReadContext(requestedSchema);
}
@Override
public RecordMaterializer<IValueReference> prepareForRead(Configuration configuration,
Map<String, String> keyValueMetaData, MessageType fileSchema, ReadContext readContext) {
- return new ADMRecordMaterializer(readContext);
+ return new ADMRecordMaterializer(configuration, readContext);
}
- private static MessageType getRequestedSchema(InitContext context) {
- Configuration configuration = context.getConfiguration();
- MessageType fileSchema = context.getFileSchema();
- AsterixTypeToParquetTypeVisitor visitor = new AsterixTypeToParquetTypeVisitor();
+ private static MessageType getRequestedSchema(InitContext initContext) {
+ Configuration configuration = initContext.getConfiguration();
+ MessageType fileSchema = initContext.getFileSchema();
+
+ List<Warning> warnings = new ArrayList<>();
+ ParquetConverterContext context = new ParquetConverterContext(configuration, warnings);
+ AsterixTypeToParquetTypeVisitor visitor = new AsterixTypeToParquetTypeVisitor(context);
try {
ARecordType expectedType = HDFSUtils.getExpectedType(configuration);
Map<String, FunctionCallInformation> functionCallInformationMap =
HDFSUtils.getFunctionCallInformationMap(configuration);
MessageType requestedType = visitor.clipType(expectedType, fileSchema, functionCallInformationMap);
- List<Warning> warnings = visitor.getWarnings();
if (!warnings.isEmpty()) {
//New warnings were created, set the warnings in hadoop configuration to be reported
@@ -73,13 +77,26 @@
private static class ADMRecordMaterializer extends RecordMaterializer<IValueReference> {
private final RootConverter rootConverter;
+ private final List<Warning> warnings;
+ private final Configuration configuration;
- public ADMRecordMaterializer(ReadContext readContext) {
- rootConverter = new RootConverter(readContext.getRequestedSchema());
+ public ADMRecordMaterializer(Configuration configuration, ReadContext readContext) {
+ warnings = new ArrayList<>();
+ rootConverter = new RootConverter(readContext.getRequestedSchema(), configuration, warnings);
+ this.configuration = configuration;
}
@Override
public IValueReference getCurrentRecord() {
+ try {
+ if (!warnings.isEmpty()) {
+ //Issue all pending warnings
+ HDFSUtils.setWarnings(warnings, configuration);
+ warnings.clear();
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
return rootConverter.getRecord();
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/IFieldValue.java
similarity index 96%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/IFieldValue.java
index 946b10c..c0ee37b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/IFieldValue.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/IFieldValue.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter;
import org.apache.hyracks.data.std.api.IValueReference;
@@ -24,7 +24,7 @@
* This interface is intended to extend {@link org.apache.parquet.io.api.Converter} to get the field name or the
* index of associated with a value.
*/
-interface IFieldValue {
+public interface IFieldValue {
IValueReference getFieldName();
/**
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/ParquetConverterContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/ParquetConverterContext.java
new file mode 100644
index 0000000..4982ca5
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/ParquetConverterContext.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.ABinarySerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import org.apache.asterix.external.input.stream.StandardUTF8ToModifiedUTF8DataOutput;
+import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ABinary;
+import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.ADate;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AMutableDate;
+import org.apache.asterix.om.base.AMutableDateTime;
+import org.apache.asterix.om.base.AMutableDouble;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.base.AMutableTime;
+import org.apache.asterix.om.base.ATime;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+import org.apache.parquet.io.api.Binary;
+
+public class ParquetConverterContext extends ParserContext {
+ /*
+ * ************************************************************************
+ * Serializers/Deserializers
+ * ************************************************************************
+ */
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ABoolean> booleanSerDer =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN);
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<AInt64> int64SerDer =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ADouble> doubleSerDer =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ABinary> binarySerDer =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABINARY);
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ADate> dateSerDer =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATE);
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ATime> timeSerDer =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ATIME);
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ADateTime> datetimeSerDer =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME);
+
+ //Issued warnings
+ private final List<Warning> warnings;
+
+ /*
+ * ************************************************************************
+ * Binary values members
+ * ************************************************************************
+ */
+ private final StandardUTF8ToModifiedUTF8DataOutput modifiedUTF8DataOutput;
+ private byte[] lengthBytes;
+
+ /*
+ * ************************************************************************
+ * Mutable Values
+ * ************************************************************************
+ */
+
+ private final AMutableInt64 mutableInt64 = new AMutableInt64(0);
+ private final AMutableDouble mutableDouble = new AMutableDouble(0.0);
+ private final AMutableDate mutableDate = new AMutableDate(0);
+ private final AMutableTime mutableTime = new AMutableTime(0);
+ private final AMutableDateTime mutableDateTime = new AMutableDateTime(0);
+
+ /*
+ * ************************************************************************
+ * Type knobs
+ * ************************************************************************
+ */
+ private final boolean parseJson;
+ private final boolean decimalToDouble;
+
+ /*
+ * ************************************************************************
+ * Temporal Configuration
+ * ************************************************************************
+ */
+ private final String timeZoneId;
+ private final int timeZoneOffset;
+
+ public ParquetConverterContext(Configuration configuration, List<Warning> warnings) {
+ this.warnings = warnings;
+ modifiedUTF8DataOutput = new StandardUTF8ToModifiedUTF8DataOutput(
+ new AStringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader()));
+
+ parseJson = configuration.getBoolean(ParquetOptions.HADOOP_PARSE_JSON_STRING, false);
+ decimalToDouble = configuration.getBoolean(ParquetOptions.HADOOP_DECIMAL_TO_DOUBLE, false);
+
+ String configuredTimeZoneId = configuration.get(ParquetOptions.HADOOP_TIMEZONE);
+ if (!configuredTimeZoneId.isEmpty()) {
+ timeZoneId = configuredTimeZoneId;
+ timeZoneOffset = TimeZone.getTimeZone(timeZoneId).getRawOffset();
+ } else {
+ timeZoneId = "";
+ timeZoneOffset = 0;
+ }
+ }
+
+ public List<Warning> getWarnings() {
+ return warnings;
+ }
+
+ public boolean isParseJsonEnabled() {
+ return parseJson;
+ }
+
+ public boolean isDecimalToDoubleEnabled() {
+ return decimalToDouble;
+ }
+
+ public String getTimeZoneId() {
+ return timeZoneId;
+ }
+
+ public int getTimeZoneOffset() {
+ return timeZoneOffset;
+ }
+
+ /*
+ * ************************************************************************
+ * Serialization methods
+ * All methods throws IllegalStateException as Parquet's converters methods
+ * do not throw any exceptions
+ * ************************************************************************
+ */
+
+ public void serializeBoolean(boolean value, DataOutput output) {
+ try {
+ booleanSerDer.serialize(value ? ABoolean.TRUE : ABoolean.FALSE, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void serializeInt64(long value, DataOutput output) {
+ try {
+ mutableInt64.setValue(value);
+ int64SerDer.serialize(mutableInt64, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void serializeDouble(double value, DataOutput output) {
+ try {
+ mutableDouble.setValue(value);
+ doubleSerDer.serialize(mutableDouble, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * String here is a binary UTF-8 String (not Java string) and not a modified-UTF8
+ *
+ * @param value Parquet binary value
+ * @param output output to write the converted string
+ */
+ public void serializeString(Binary value, DataOutput output) {
+ //Set the destination to where to write the final modified UTF-8
+ modifiedUTF8DataOutput.setDataOutput(output);
+ try {
+ //Write the type tag
+ output.writeByte(ATypeTag.STRING.serialize());
+ //Write the binary UTF-8 string as
+ value.writeTo(modifiedUTF8DataOutput);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void serializeUUID(Binary value, DataOutput output) {
+ try {
+ output.writeByte(ATypeTag.UUID.serialize());
+ value.writeTo(output);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * To avoid object creation when writing a binary value, we do not use {@link ABinarySerializerDeserializer}
+ * as calls to {@link Binary#getBytes()} could create new buffer each time we call this method
+ *
+ * @param value Parquet binary value
+ * @param output output to write the binary value
+ */
+ public void serializeBinary(Binary value, DataOutput output) {
+ try {
+ output.writeByte(ATypeTag.BINARY.serialize());
+ writeLength(value.length(), output);
+ value.writeTo(output);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void serializeDate(int value, DataOutput output) {
+ try {
+ mutableDate.setValue(value);
+ dateSerDer.serialize(mutableDate, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void serializeTime(int value, DataOutput output) {
+ try {
+ mutableTime.setValue(value);
+ timeSerDer.serialize(mutableTime, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void serializeDateTime(long timestamp, DataOutput output) {
+ try {
+ mutableDateTime.setValue(timestamp);
+ datetimeSerDer.serialize(mutableDateTime, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private void writeLength(int length, DataOutput out) throws IOException {
+ int requiredLength = VarLenIntEncoderDecoder.getBytesRequired(length);
+ if (lengthBytes == null || requiredLength > lengthBytes.length) {
+ lengthBytes = new byte[requiredLength];
+ }
+ VarLenIntEncoderDecoder.encode(length, lengthBytes, 0);
+ out.write(lengthBytes, 0, requiredLength);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AbstractComplexConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/AbstractComplexConverter.java
similarity index 73%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AbstractComplexConverter.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/AbstractComplexConverter.java
index 363d2d2..e6b80d0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AbstractComplexConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/AbstractComplexConverter.java
@@ -16,15 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested;
import java.io.DataOutput;
-import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.Type;
@@ -35,15 +37,16 @@
private final IValueReference fieldName;
private final int index;
private final Converter[] converters;
- protected final ParserContext context;
+ protected final ParquetConverterContext context;
protected IMutableValueStorage tempStorage;
- AbstractComplexConverter(AbstractComplexConverter parent, int index, GroupType parquetType, ParserContext context) {
+ AbstractComplexConverter(AbstractComplexConverter parent, int index, GroupType parquetType,
+ ParquetConverterContext context) {
this(parent, null, index, parquetType, context);
}
AbstractComplexConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
- GroupType parquetType, ParserContext context) {
+ GroupType parquetType, ParquetConverterContext context) {
this.parent = parent;
this.fieldName = fieldName;
this.index = index;
@@ -51,14 +54,14 @@
converters = new Converter[parquetType.getFieldCount()];
for (int i = 0; i < parquetType.getFieldCount(); i++) {
final Type type = parquetType.getType(i);
- if (type == AsterixTypeToParquetTypeVisitor.MISSING) {
- converters[i] = MissingConverter.INSTANCE;
- } else if (type.isPrimitive()) {
+ if (type.isPrimitive()) {
converters[i] = createAtomicConverter(parquetType, i);
} else if (LogicalTypeAnnotation.listType().equals(type.getLogicalTypeAnnotation())) {
converters[i] = createArrayConverter(parquetType, i);
} else if (type.getRepetition() == Repetition.REPEATED) {
converters[i] = createRepeatedConverter(parquetType, i);
+ } else if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.mapType()) {
+ converters[i] = createArrayConverter(parquetType, i);
} else {
converters[i] = createObjectConverter(parquetType, i);
}
@@ -70,13 +73,13 @@
*
* @param value Child value
*/
- protected abstract void addValue(IFieldValue value);
+ public abstract void addValue(IFieldValue value);
- protected abstract AtomicConverter createAtomicConverter(GroupType type, int index);
+ protected abstract PrimitiveConverter createAtomicConverter(GroupType type, int index);
- protected abstract ArrayConverter createArrayConverter(GroupType type, int index);
+ protected abstract AbstractComplexConverter createArrayConverter(GroupType type, int index);
- protected abstract ObjectConverter createObjectConverter(GroupType type, int index);
+ protected abstract AbstractComplexConverter createObjectConverter(GroupType type, int index);
/**
* Parquet file created by (old) Avro writer treat repeated values differently from files created by Spark.
@@ -104,12 +107,22 @@
* }
* }
*
+ * Map type:
+ * required group mapField (MAP) {
+ * repeated group key_value {
+ * required int32 key;
+ * required int32 value;
+ * }
+ * }
+ *
* @formatter:on
*/
- private AbstractComplexConverter createRepeatedConverter(GroupType type, int index) {
+ protected AbstractComplexConverter createRepeatedConverter(GroupType type, int index) {
GroupType repeatedType = type.getType(index).asGroupType();
- //The name "array" is used by Avro to represent group element (array of objects)
- if (repeatedType.getFieldCount() > 1 || "array".equals(repeatedType.getName())) {
+ String name = repeatedType.getName();
+ if (repeatedType.getFieldCount() > 1 || "array".equals(name) || "key_value".equals(name)) {
+ //The name "array" and "key_value" are reserved names to represent array of objects
+ //"key_value" are for MAP type
return new ObjectConverter(this, index, repeatedType, context);
}
return new RepeatedConverter(this, index, repeatedType, context);
@@ -130,7 +143,7 @@
return converters[fieldIndex];
}
- protected DataOutput getDataOutput() {
+ public DataOutput getDataOutput() {
tempStorage.reset();
return tempStorage.getDataOutput();
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ArrayConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ArrayConverter.java
similarity index 76%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ArrayConverter.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ArrayConverter.java
index 9e8da77..7eacc87 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ArrayConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ArrayConverter.java
@@ -16,26 +16,31 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested;
import java.io.IOException;
import org.apache.asterix.builders.IAsterixListBuilder;
-import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.PrimitiveConverterProvider;
import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.PrimitiveType;
class ArrayConverter extends AbstractComplexConverter {
private IAsterixListBuilder builder;
- public ArrayConverter(AbstractComplexConverter parent, int index, GroupType parquetType, ParserContext context) {
+ public ArrayConverter(AbstractComplexConverter parent, int index, GroupType parquetType,
+ ParquetConverterContext context) {
super(parent, index, parquetType, context);
}
public ArrayConverter(AbstractComplexConverter parent, IValueReference fieldName, int index, GroupType parquetType,
- ParserContext context) {
+ ParquetConverterContext context) {
super(parent, fieldName, index, parquetType, context);
}
@@ -59,7 +64,7 @@
}
@Override
- protected void addValue(IFieldValue value) {
+ public void addValue(IFieldValue value) {
try {
builder.addItem(tempStorage);
} catch (HyracksDataException e) {
@@ -68,8 +73,9 @@
}
@Override
- protected AtomicConverter createAtomicConverter(GroupType type, int index) {
- return new AtomicConverter(this, index, context);
+ protected PrimitiveConverter createAtomicConverter(GroupType type, int index) {
+ PrimitiveType primitiveType = type.getType(index).asPrimitiveType();
+ return PrimitiveConverterProvider.createPrimitiveConverter(primitiveType, this, index, context);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ObjectConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectConverter.java
similarity index 77%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ObjectConverter.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectConverter.java
index 8736e7d..3c8bfcc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ObjectConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectConverter.java
@@ -16,26 +16,31 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested;
import java.io.IOException;
import org.apache.asterix.builders.IARecordBuilder;
-import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.PrimitiveConverterProvider;
import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.PrimitiveType;
class ObjectConverter extends AbstractComplexConverter {
private IARecordBuilder builder;
- public ObjectConverter(AbstractComplexConverter parent, int index, GroupType parquetType, ParserContext context) {
+ public ObjectConverter(AbstractComplexConverter parent, int index, GroupType parquetType,
+ ParquetConverterContext context) {
super(parent, index, parquetType, context);
}
public ObjectConverter(AbstractComplexConverter parent, IValueReference fieldName, int index, GroupType parquetType,
- ParserContext context) {
+ ParquetConverterContext context) {
super(parent, fieldName, index, parquetType, context);
}
@@ -59,7 +64,7 @@
}
@Override
- protected void addValue(IFieldValue value) {
+ public void addValue(IFieldValue value) {
try {
builder.addField(value.getFieldName(), getValue());
} catch (HyracksDataException e) {
@@ -68,9 +73,11 @@
}
@Override
- protected AtomicConverter createAtomicConverter(GroupType type, int index) {
+ protected PrimitiveConverter createAtomicConverter(GroupType type, int index) {
try {
- return new AtomicConverter(this, context.getSerializedFieldName(type.getFieldName(index)), index, context);
+ PrimitiveType primitiveType = type.getType(index).asPrimitiveType();
+ IValueReference fieldName = context.getSerializedFieldName(type.getFieldName(index));
+ return PrimitiveConverterProvider.createPrimitiveConverter(primitiveType, this, fieldName, index, context);
} catch (IOException e) {
throw new IllegalStateException(e);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/RepeatedConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RepeatedConverter.java
similarity index 67%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/RepeatedConverter.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RepeatedConverter.java
index d46d84c..09a104b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/RepeatedConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RepeatedConverter.java
@@ -16,15 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested;
import java.io.DataOutput;
-import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.PrimitiveConverterProvider;
+import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.PrimitiveType;
class RepeatedConverter extends AbstractComplexConverter {
- public RepeatedConverter(AbstractComplexConverter parent, int index, GroupType parquetType, ParserContext context) {
+ public RepeatedConverter(AbstractComplexConverter parent, int index, GroupType parquetType,
+ ParquetConverterContext context) {
super(parent, index, parquetType, context);
}
@@ -39,13 +44,14 @@
}
@Override
- protected void addValue(IFieldValue value) {
+ public void addValue(IFieldValue value) {
parent.addValue(value);
}
@Override
- protected AtomicConverter createAtomicConverter(GroupType type, int index) {
- return new AtomicConverter(this, index, context);
+ protected PrimitiveConverter createAtomicConverter(GroupType type, int index) {
+ PrimitiveType primitiveType = type.getType(index).asPrimitiveType();
+ return PrimitiveConverterProvider.createPrimitiveConverter(primitiveType, this, index, context);
}
@Override
@@ -60,7 +66,7 @@
}
@Override
- protected DataOutput getDataOutput() {
+ public DataOutput getDataOutput() {
return getParentDataOutput();
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/RootConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RootConverter.java
similarity index 71%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/RootConverter.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RootConverter.java
index 76f4342..24a531a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/RootConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RootConverter.java
@@ -16,20 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested;
import java.io.DataOutput;
+import java.util.List;
-import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.parquet.schema.GroupType;
-class RootConverter extends ObjectConverter {
+public class RootConverter extends ObjectConverter {
private final ArrayBackedValueStorage rootBuffer;
- public RootConverter(GroupType parquetType) {
- super(null, -1, parquetType, new ParserContext(true));
+ public RootConverter(GroupType parquetType, Configuration configuration, List<Warning> warnings) {
+ super(null, -1, parquetType, new ParquetConverterContext(configuration, warnings));
this.rootBuffer = new ArrayBackedValueStorage();
}
@@ -39,7 +42,7 @@
return rootBuffer.getDataOutput();
}
- protected IValueReference getRecord() {
+ public IValueReference getRecord() {
return rootBuffer;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MissingConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/BinaryConverter.java
similarity index 60%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MissingConverter.java
copy to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/BinaryConverter.java
index e38056b..15c1d2e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MissingConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/BinaryConverter.java
@@ -16,45 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
+import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.io.api.PrimitiveConverter;
-class MissingConverter extends PrimitiveConverter {
- protected static final MissingConverter INSTANCE = new MissingConverter();
+public class BinaryConverter extends GenericPrimitiveConverter {
- private MissingConverter() {
+ BinaryConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
+ ParquetConverterContext context) {
+ super(parent, fieldName, index, context);
}
@Override
public void addBinary(Binary value) {
- //NoOp
- }
-
- @Override
- public void addBoolean(boolean value) {
- //NoOp
- }
-
- @Override
- public void addFloat(float value) {
- //NoOp
- }
-
- @Override
- public void addDouble(double value) {
- //NoOp
- }
-
- @Override
- public void addInt(int value) {
- //NoOp
- }
-
- @Override
- public void addLong(long value) {
- //NoOp
+ context.serializeBinary(value, parent.getDataOutput());
+ parent.addValue(this);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DateConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DateConverter.java
new file mode 100644
index 0000000..c8737cd
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DateConverter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+class DateConverter extends GenericPrimitiveConverter {
+ DateConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
+ ParquetConverterContext context) {
+ super(parent, fieldName, index, context);
+ }
+
+ @Override
+ public void addInt(int value) {
+ context.serializeDate(value, parent.getDataOutput());
+ parent.addValue(this);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DecimalConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DecimalConverter.java
new file mode 100644
index 0000000..e93bcf7
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DecimalConverter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * The decimal converter relies on java {@link BigDecimal} to convert decimal values. The converter could pressure
+ * the GC as we need to create {@link BigDecimal} object / value
+ */
+public class DecimalConverter extends GenericPrimitiveConverter {
+ public static final int LONG_MAX_PRECISION = 20;
+ private final int precision;
+ private final int scale;
+
+ DecimalConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
+ ParquetConverterContext context, int precision, int scale) {
+ super(parent, fieldName, index, context);
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ @Override
+ public void addInt(int value) {
+ addLong(value);
+ }
+
+ @Override
+ public void addLong(long value) {
+ addConvertedDouble(BigDecimal.valueOf(value, scale).doubleValue());
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ if (precision <= LONG_MAX_PRECISION) {
+ addLong(getUnscaledLong(value.toByteBuffer()));
+ } else {
+ //Unlimited precision
+ addConvertedDouble(new BigDecimal(new BigInteger(value.getBytes()), scale).doubleValue());
+ }
+ }
+
+ private void addConvertedDouble(double value) {
+ context.serializeDouble(value, parent.getDataOutput());
+ parent.addValue(this);
+ }
+
+ private static long getUnscaledLong(ByteBuffer buffer) {
+ byte[] bytes = buffer.array();
+ int start = buffer.arrayOffset() + buffer.position();
+ int end = buffer.arrayOffset() + buffer.limit();
+
+ long value = 0L;
+ for (int i = start; i < end; i++) {
+ value = (value << 8) | (bytes[i] & 0xFF);
+ }
+ int bits = 8 * (end - start);
+ return (value << (64 - bits)) >> (64 - bits);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/GenericPrimitiveConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/GenericPrimitiveConverter.java
new file mode 100644
index 0000000..e0b0392
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/GenericPrimitiveConverter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.PrimitiveConverter;
+
+public class GenericPrimitiveConverter extends PrimitiveConverter implements IFieldValue {
+
+ protected final AbstractComplexConverter parent;
+ protected final IValueReference fieldName;
+ protected final int index;
+ protected final ParquetConverterContext context;
+
+ GenericPrimitiveConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
+ ParquetConverterContext context) {
+ this.parent = parent;
+ this.fieldName = fieldName;
+ this.index = index;
+ this.context = context;
+ }
+
+ @Override
+ public final IValueReference getFieldName() {
+ return fieldName;
+ }
+
+ @Override
+ public final int getIndex() {
+ return index;
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ context.serializeString(value, parent.getDataOutput());
+ parent.addValue(this);
+ }
+
+ @Override
+ public void addBoolean(boolean value) {
+ context.serializeBoolean(value, parent.getDataOutput());
+ parent.addValue(this);
+ }
+
+ @Override
+ public void addFloat(float value) {
+ addDouble(value);
+ }
+
+ @Override
+ public void addDouble(double value) {
+ context.serializeDouble(value, parent.getDataOutput());
+ parent.addValue(this);
+ }
+
+ @Override
+ public void addInt(int value) {
+ addLong(value);
+ }
+
+ @Override
+ public void addLong(long value) {
+ context.serializeInt64(value, parent.getDataOutput());
+ parent.addValue(this);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/JsonStringConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/JsonStringConverter.java
new file mode 100644
index 0000000..258a10a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/JsonStringConverter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
+import org.apache.asterix.external.parser.JSONDataParser;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ByteArrayAccessibleInputStream;
+import org.apache.parquet.io.api.Binary;
+
+import com.fasterxml.jackson.core.JsonFactory;
+
+class JsonStringConverter extends GenericPrimitiveConverter {
+ private static final byte[] EMPTY = new byte[0];
+ private final JSONDataParser parser;
+ private final ByteArrayAccessibleInputStream in;
+
+ JsonStringConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
+ ParquetConverterContext context) {
+ super(parent, fieldName, index, context);
+ parser = new JSONDataParser(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE, new JsonFactory());
+ in = new ByteArrayAccessibleInputStream(EMPTY, 0, 0);
+ try {
+ parser.setInputStream(in);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ byte[] bytes = value.getBytes();
+ in.setContent(bytes, 0, value.length());
+
+ DataOutput out = parent.getDataOutput();
+ try {
+ if (parser.parseAnyValue(out)) {
+ parent.addValue(this);
+ } else {
+ resetParser();
+ }
+ } catch (HyracksDataException e) {
+ resetParser();
+ }
+ }
+
+ private void resetParser() {
+ try {
+ parser.reset(in);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MissingConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/MissingConverter.java
similarity index 98%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MissingConverter.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/MissingConverter.java
index e38056b..996731d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MissingConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/MissingConverter.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.PrimitiveConverter;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
new file mode 100644
index 0000000..38c441a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.AsterixTypeToParquetTypeVisitor;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
+import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation;
+import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
+import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
+import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+
+public class PrimitiveConverterProvider {
+ public static final PrimitiveType MISSING =
+ Types.optional(PrimitiveType.PrimitiveTypeName.BOOLEAN).named("MISSING");
+
+ private PrimitiveConverterProvider() {
+ }
+
+ public static PrimitiveConverter createPrimitiveConverter(PrimitiveType type, AbstractComplexConverter parent,
+ int index, ParquetConverterContext context) {
+ return createPrimitiveConverter(type, parent, null, index, context);
+ }
+
+ public static PrimitiveConverter createPrimitiveConverter(PrimitiveType type, AbstractComplexConverter parent,
+ IValueReference fieldName, int index, ParquetConverterContext context) {
+
+ if (type == MISSING) {
+ return MissingConverter.INSTANCE;
+ }
+
+ ATypeTag mappedType = AsterixTypeToParquetTypeVisitor.mapType(type, context, null);
+ switch (mappedType) {
+ case BOOLEAN:
+ case STRING:
+ return new GenericPrimitiveConverter(parent, fieldName, index, context);
+ case BIGINT:
+ return getIntConverter(type, parent, fieldName, index, context);
+ case DOUBLE:
+ return getDoubleConverter(type, parent, fieldName, index, context);
+ case BINARY:
+ return new BinaryConverter(parent, fieldName, index, context);
+ case UUID:
+ return new UUIDConverter(parent, fieldName, index, context);
+ case DATE:
+ return new DateConverter(parent, fieldName, index, context);
+ case TIME:
+ return getTimeConverter(type, parent, fieldName, index, context);
+ case DATETIME:
+ return getTimeStampConverter(type, parent, fieldName, index, context);
+ case ANY:
+ return new JsonStringConverter(parent, fieldName, index, context);
+ default:
+ return MissingConverter.INSTANCE;
+ }
+ }
+
+ private static PrimitiveConverter getIntConverter(PrimitiveType type, AbstractComplexConverter parent,
+ IValueReference fieldName, int index, ParquetConverterContext context) {
+ IntLogicalTypeAnnotation intType = (IntLogicalTypeAnnotation) type.getLogicalTypeAnnotation();
+ if (intType != null && !intType.isSigned()) {
+ return new UnsignedIntegerConverter(parent, fieldName, index, context);
+ }
+ return new GenericPrimitiveConverter(parent, fieldName, index, context);
+ }
+
+ private static PrimitiveConverter getDoubleConverter(PrimitiveType type, AbstractComplexConverter parent,
+ IValueReference fieldName, int index, ParquetConverterContext context) {
+ LogicalTypeAnnotation logicalType = type.getLogicalTypeAnnotation();
+ if (logicalType instanceof DecimalLogicalTypeAnnotation) {
+ DecimalLogicalTypeAnnotation decimalLogicalType = (DecimalLogicalTypeAnnotation) logicalType;
+ return new DecimalConverter(parent, fieldName, index, context, decimalLogicalType.getPrecision(),
+ decimalLogicalType.getScale());
+
+ }
+ return new GenericPrimitiveConverter(parent, fieldName, index, context);
+ }
+
+ private static PrimitiveConverter getTimeConverter(PrimitiveType type, AbstractComplexConverter parent,
+ IValueReference fieldName, int index, ParquetConverterContext context) {
+ TimeLogicalTypeAnnotation timeLogicalType = (TimeLogicalTypeAnnotation) type.getLogicalTypeAnnotation();
+ return new TimeConverter(parent, fieldName, index, context, timeLogicalType.getUnit());
+ }
+
+ private static PrimitiveConverter getTimeStampConverter(PrimitiveType type, AbstractComplexConverter parent,
+ IValueReference fieldName, int index, ParquetConverterContext context) {
+ TimestampLogicalTypeAnnotation tsType = (TimestampLogicalTypeAnnotation) type.getLogicalTypeAnnotation();
+ if (tsType != null) {
+ int offset = tsType.isAdjustedToUTC() ? context.getTimeZoneOffset() : 0;
+ return new TimestampConverter(parent, fieldName, index, context, tsType.getUnit(), offset);
+ }
+ //INT96: the converter will convert the value to millis
+ return new TimestampConverter(parent, fieldName, index, context, TimeUnit.MILLIS, 0);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimeConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimeConverter.java
new file mode 100644
index 0000000..fa9f36c
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimeConverter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+
+public class TimeConverter extends GenericPrimitiveConverter {
+ private final LogicalTypeAnnotation.TimeUnit timeUnit;
+
+ TimeConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
+ ParquetConverterContext context, LogicalTypeAnnotation.TimeUnit timeUnit) {
+ super(parent, fieldName, index, context);
+ this.timeUnit = timeUnit;
+ }
+
+ @Override
+ public void addInt(int value) {
+ addLong(value);
+ }
+
+ @Override
+ public void addLong(long value) {
+ int convertedTime = (int) getConvertedTime(timeUnit, value);
+ context.serializeTime(convertedTime, parent.getDataOutput());
+ parent.addValue(this);
+ }
+
+ public static long getConvertedTime(LogicalTypeAnnotation.TimeUnit timeUnit, long value) {
+ final long convertedTime;
+ switch (timeUnit) {
+ case MICROS:
+ convertedTime = TimeUnit.MICROSECONDS.toMillis(value);
+ break;
+ case NANOS:
+ convertedTime = TimeUnit.NANOSECONDS.toMillis(value);
+ break;
+ default:
+ //Millis
+ convertedTime = value;
+ }
+
+ return convertedTime;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimestampConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimestampConverter.java
new file mode 100644
index 0000000..136febe
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimestampConverter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+
+class TimestampConverter extends GenericPrimitiveConverter {
+ private static final long JULIAN_DAY_OF_EPOCH = 2440588;
+ private static final long MILLIS_PER_DAY = 86400000L;
+ private static final long NANOS_PER_MILLIS = 1000000L;
+
+ private final LogicalTypeAnnotation.TimeUnit timeUnit;
+ private final int timeZoneOffset;
+
+ TimestampConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
+ ParquetConverterContext context, LogicalTypeAnnotation.TimeUnit timeUnit, int timeZoneOffset) {
+ super(parent, fieldName, index, context);
+ this.timeUnit = timeUnit;
+ this.timeZoneOffset = timeZoneOffset;
+ }
+
+ /**
+ * Timestamp is an INT96 (Little Endian)
+ * INT96 timestamps are not adjusted to UTC and always considered as local timestamp
+ *
+ * @param value binary representation of INT96
+ */
+ @Override
+ public void addBinary(Binary value) {
+ ByteBuffer buffer = value.toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
+ long timeOfDayNanos = buffer.getLong();
+ int julianDay = buffer.getInt();
+ long timestamp = fromJulian(julianDay, timeOfDayNanos);
+ addLong(timestamp);
+ }
+
+ /**
+ * Timestamp is an INT64
+ *
+ * @param value long value
+ */
+ @Override
+ public void addLong(long value) {
+ long convertedTime = TimeConverter.getConvertedTime(timeUnit, value);
+ context.serializeDateTime(convertedTime + timeZoneOffset, parent.getDataOutput());
+ parent.addValue(this);
+ }
+
+ private static long fromJulian(int days, long nanos) {
+ return (days - JULIAN_DAY_OF_EPOCH) * MILLIS_PER_DAY + nanos / NANOS_PER_MILLIS;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MissingConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UUIDConverter.java
similarity index 60%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MissingConverter.java
copy to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UUIDConverter.java
index e38056b..ec07c60 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MissingConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UUIDConverter.java
@@ -16,45 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.hdfs.parquet;
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
+import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.io.api.PrimitiveConverter;
-class MissingConverter extends PrimitiveConverter {
- protected static final MissingConverter INSTANCE = new MissingConverter();
-
- private MissingConverter() {
+public class UUIDConverter extends GenericPrimitiveConverter {
+ UUIDConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
+ ParquetConverterContext context) {
+ super(parent, fieldName, index, context);
}
@Override
public void addBinary(Binary value) {
- //NoOp
+ context.serializeUUID(value, parent.getDataOutput());
+ parent.addValue(this);
}
-
- @Override
- public void addBoolean(boolean value) {
- //NoOp
- }
-
- @Override
- public void addFloat(float value) {
- //NoOp
- }
-
- @Override
- public void addDouble(double value) {
- //NoOp
- }
-
- @Override
- public void addInt(int value) {
- //NoOp
- }
-
- @Override
- public void addLong(long value) {
- //NoOp
- }
-
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UnsignedIntegerConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UnsignedIntegerConverter.java
new file mode 100644
index 0000000..763b8c8
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UnsignedIntegerConverter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class UnsignedIntegerConverter extends GenericPrimitiveConverter {
+ private boolean overflowed;
+
+ UnsignedIntegerConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
+ ParquetConverterContext context) {
+ super(parent, fieldName, index, context);
+ overflowed = false;
+ }
+
+ @Override
+ public void addInt(int value) {
+ addLong(value & 0x00000000ffffffffL);
+ }
+
+ @Override
+ public void addLong(long value) {
+ if (value < 0) {
+ if (!overflowed) {
+ Warning warning = Warning.of(null, ErrorCode.PARQUET_CONTAINS_OVERFLOWED_BIGINT, ATypeTag.BIGINT);
+ context.getWarnings().add(warning);
+ //Ensure this warning to be issued once
+ overflowed = true;
+ }
+ return;
+ }
+ super.addLong(value);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/ParserContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/ParserContext.java
index ef9ff08..1a4bbee 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/ParserContext.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/ParserContext.java
@@ -27,8 +27,6 @@
import org.apache.asterix.builders.IAsterixListBuilder;
import org.apache.asterix.builders.ListBuilderFactory;
import org.apache.asterix.builders.RecordBuilderFactory;
-import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
-import org.apache.asterix.external.input.stream.StandardUTF8ToModifiedUTF8DataOutput;
import org.apache.asterix.external.parser.AbstractNestedDataParser;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.om.base.AMutableString;
@@ -41,8 +39,6 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.util.string.UTF8StringReader;
-import org.apache.hyracks.util.string.UTF8StringWriter;
/**
* A state class that helps parsers of class {@link AbstractNestedDataParser} to maintain
@@ -67,15 +63,8 @@
private final ISerializerDeserializer<AString> stringSerDe;
private final AMutableString aString;
- //For parquet
- private final StandardUTF8ToModifiedUTF8DataOutput modifiedUTF8DataOutput;
-
- public ParserContext() {
- this(false);
- }
-
@SuppressWarnings("unchecked")
- public ParserContext(boolean allocateModfiedUTF8Writer) {
+ public ParserContext() {
objectBuilderPool = new SoftObjectPool<>(new RecordBuilderFactory());
arrayBuilderPool = new ObjectPool<>(new ListBuilderFactory(), ATypeTag.ARRAY);
tempBufferPool = new SoftObjectPool<>(new AbvsBuilderFactory());
@@ -83,11 +72,6 @@
serializedFieldNames = new LRUMap<>(SERIALIZED_FIELDNAME_MAP_MAX_SIZE);
stringSerDe = SerializerDeserializerProvider.INSTANCE.getAStringSerializerDeserializer();
aString = new AMutableString("");
- modifiedUTF8DataOutput =
- allocateModfiedUTF8Writer
- ? new StandardUTF8ToModifiedUTF8DataOutput(
- new AStringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader()))
- : null;
}
public IMutableValueStorage enterObject() {
@@ -155,9 +139,4 @@
tempBufferPool.recycle(tempBuffer);
arrayBuilderPool.recycle(builder);
}
-
- public StandardUTF8ToModifiedUTF8DataOutput getModifiedUTF8DataOutput() {
- return modifiedUTF8DataOutput;
- }
-
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 89d1132..f0b9c90 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -21,9 +21,11 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import java.util.TimeZone;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.util.StorageUtil;
public class ExternalDataConstants {
@@ -296,6 +298,42 @@
public static final String DEFINITION_FIELD_NAME = "definition";
public static final String CONTAINER_NAME_FIELD_NAME = "container";
+ public static class ParquetOptions {
+ private ParquetOptions() {
+ }
+
+ //Prefix for hadoop configurations
+ private static final String ASTERIX_HADOOP_PREFIX = "org.apache.asterix.";
+
+ /**
+ * Parse Parquet's String JSON type into ADM
+ * Default: false
+ */
+ public static final String PARSE_JSON_STRING = "parse-json-string";
+ public static final String HADOOP_PARSE_JSON_STRING = ASTERIX_HADOOP_PREFIX + PARSE_JSON_STRING;
+
+ /**
+ * Rebase Decimal and parse it as {@link ATypeTag#DOUBLE}
+ * Default: false
+ */
+ public static final String DECIMAL_TO_DOUBLE = "decimal-to-double";
+ public static final String HADOOP_DECIMAL_TO_DOUBLE = ASTERIX_HADOOP_PREFIX + DECIMAL_TO_DOUBLE;
+
+ /**
+ * Time Zone ID to convert UTC time and timestamp {@link ATypeTag#TIME} and {@link ATypeTag#DATETIME}
+ * Default: ""
+ * Note: If a UTC adjusted time and/or timestamp exist in the parquet file, and no time zone id is provided,
+ * then we will return the UTC time and issue a warning about that.
+ */
+ public static final String TIMEZONE = "timezone";
+ public static final String HADOOP_TIMEZONE = ASTERIX_HADOOP_PREFIX + TIMEZONE;
+
+ /**
+ * Valid time zones that are supported by Java
+ */
+ public static final Set<String> VALID_TIME_ZONES = Set.of(TimeZone.getAvailableIDs());
+ }
+
public static class AwsS3 {
private AwsS3() {
throw new AssertionError("do not instantiate");
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index b38f21d..22040e2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -104,6 +104,7 @@
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.library.JavaLibrary;
+import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.AUnionType;
@@ -823,15 +824,21 @@
}
/**
- * Validate the dataset type declared with a given type
+ * Validate Parquet dataset's declared type and configuration
*
* @param properties external dataset configuration
* @param datasetRecordType dataset declared type
*/
- public static void validateType(Map<String, String> properties, ARecordType datasetRecordType)
- throws CompilationException {
- if (isParquetFormat(properties) && datasetRecordType.getFieldTypes().length != 0) {
- throw new CompilationException(ErrorCode.UNSUPPORTED_TYPE_FOR_PARQUET, datasetRecordType.getTypeName());
+ public static void validateParquetTypeAndConfiguration(Map<String, String> properties,
+ ARecordType datasetRecordType) throws CompilationException {
+ if (isParquetFormat(properties)) {
+ if (datasetRecordType.getFieldTypes().length != 0) {
+ throw new CompilationException(ErrorCode.UNSUPPORTED_TYPE_FOR_PARQUET, datasetRecordType.getTypeName());
+ } else if (properties.containsKey(ParquetOptions.TIMEZONE)
+ && !ParquetOptions.VALID_TIME_ZONES.contains(properties.get(ParquetOptions.TIMEZONE))) {
+ //Ensure the configured time zone id is correct
+ throw new CompilationException(ErrorCode.INVALID_TIMEZONE, properties.get(ParquetOptions.TIMEZONE));
+ }
}
}
@@ -1770,7 +1777,8 @@
* Builds the Azure Blob storage client using the provided configuration
*
* @param configuration properties
- * @see <a href="https://docs.microsoft.com/en-us/azure/databricks/data/data-sources/azure/azure-storage">Azure Blob storage</a>
+ * @see <a href="https://docs.microsoft.com/en-us/azure/databricks/data/data-sources/azure/azure-storage">Azure
+ * Blob storage</a>
*/
public static void configureAzureHdfsJobConf(JobConf conf, Map<String, String> configuration, String endPoint) {
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 28a0766..9e49d86 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -40,6 +40,7 @@
import org.apache.asterix.external.input.record.reader.hdfs.parquet.MapredParquetInputFormat;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.ParquetReadSupport;
import org.apache.asterix.external.input.stream.HDFSInputStream;
+import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.runtime.projection.DataProjectionInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
@@ -224,24 +225,40 @@
}
if (ExternalDataConstants.CLASS_NAME_PARQUET_INPUT_FORMAT.equals(formatClassName)) {
- //Parquet configurations
- conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, ParquetReadSupport.class.getName());
- //Get requested values
- String requestedValues = configuration.get(ExternalDataConstants.KEY_REQUESTED_FIELDS);
- if (requestedValues == null) {
- //No value is requested, return the entire record
- requestedValues = DataProjectionInfo.ALL_FIELDS_TYPE.getTypeName();
- } else {
- //Subset of the values were requested, set the functionCallInformation
- conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
- configuration.get(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION));
- }
- conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS, requestedValues);
+ configureParquet(configuration, conf);
}
return conf;
}
+ private static void configureParquet(Map<String, String> configuration, JobConf conf) {
+ //Parquet configurations
+ conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, ParquetReadSupport.class.getName());
+
+ //Get requested values
+ String requestedValues = configuration.get(ExternalDataConstants.KEY_REQUESTED_FIELDS);
+ if (requestedValues == null) {
+ //No value is requested, return the entire record
+ requestedValues = DataProjectionInfo.ALL_FIELDS_TYPE.getTypeName();
+ } else {
+ //Subset of the values were requested, set the functionCallInformation
+ conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
+ configuration.get(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION));
+ }
+ conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS, requestedValues);
+
+ //Parse JSON string as ADM?
+ conf.set(ParquetOptions.HADOOP_PARSE_JSON_STRING,
+ configuration.getOrDefault(ParquetOptions.PARSE_JSON_STRING, ExternalDataConstants.TRUE));
+
+ //Rebase and parse decimal as double?
+ conf.set(ParquetOptions.HADOOP_DECIMAL_TO_DOUBLE,
+ configuration.getOrDefault(ParquetOptions.DECIMAL_TO_DOUBLE, ExternalDataConstants.FALSE));
+ //Re-adjust the time zone for UTC-adjusted values
+ conf.set(ParquetOptions.HADOOP_TIMEZONE, configuration.getOrDefault(ParquetOptions.TIMEZONE, ""));
+
+ }
+
public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(IApplicationContext appCtx,
AlgebricksAbsolutePartitionConstraint clusterLocations) {
if (clusterLocations == null) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 199c2e1..e242531 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -619,11 +619,11 @@
List<LogicalVariable> varCopy = deepCopyVariableList(op.getVariables());
List<Mutable<ILogicalExpression>> exprCopy =
exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getExpressions());
- List<ILogicalPlan> nestedPlansCopy = new ArrayList<>();
WindowOperator opCopy = new WindowOperator(partitionExprCopy, orderExprCopy, frameValueExprCopy,
frameStartExprCopy, frameStartValidationExprCopy, frameEndExprCopy, frameEndValidationExprCopy,
frameExcludeExprCopy, op.getFrameExcludeNegationStartIdx(), frameExcludeUnaryExprCopy,
- frameOffsetExprCopy, op.getFrameMaxObjects(), varCopy, exprCopy, nestedPlansCopy);
+ frameOffsetExprCopy, op.getFrameMaxObjects(), varCopy, exprCopy, null);
+ List<ILogicalPlan> nestedPlansCopy = opCopy.getNestedPlans();
deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
deepCopyPlanList(op.getNestedPlans(), nestedPlansCopy, opCopy);
return opCopy;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 7b67af1..c2ee661 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -448,11 +448,11 @@
deepCopyVars(newVariables, op.getVariables());
List<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
deepCopyExpressionRefs(newExpressions, op.getExpressions());
- List<ILogicalPlan> newNestedPlans = new ArrayList<>();
WindowOperator newWinOp = new WindowOperator(newPartitionExprs, newOrderExprs, newFrameValueExprs,
newFrameStartExprs, newFrameStartValidationExprs, newFrameEndExprs, newFrameEndValidationExprs,
newFrameExclusionExprs, op.getFrameExcludeNegationStartIdx(), newFrameExcludeUnaryExpr,
- newFrameOffsetExpr, op.getFrameMaxObjects(), newVariables, newExpressions, newNestedPlans);
+ newFrameOffsetExpr, op.getFrameMaxObjects(), newVariables, newExpressions, null);
+ List<ILogicalPlan> newNestedPlans = newWinOp.getNestedPlans();
for (ILogicalPlan nestedPlan : op.getNestedPlans()) {
newNestedPlans.add(OperatorManipulationUtil.deepCopy(nestedPlan, newWinOp));
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStructureVerifier.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStructureVerifier.java
index a072d11..2b40114 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStructureVerifier.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStructureVerifier.java
@@ -46,9 +46,11 @@
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.IPlanPrettyPrinter;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
@@ -76,6 +78,11 @@
private static final String ERROR_MESSAGE_TEMPLATE_6 = "undefined used variables %s in %s";
+ private static final String ERROR_MESSAGE_TEMPLATE_7 =
+ "unexpected source operator in NestedTupleSourceOperator: %s. Expected source operator %s";
+
+ private static final String ERROR_MESSAGE_TEMPLATE_8 = "unexpected leaf operator in nested plan: %s";
+
public static final Comparator<LogicalVariable> VARIABLE_CMP = Comparator.comparing(LogicalVariable::toString);
private final ExpressionReferenceVerifierVisitor exprVisitor = new ExpressionReferenceVerifierVisitor();
@@ -185,7 +192,10 @@
if (op instanceof AbstractOperatorWithNestedPlans) {
children = new ArrayList<>(children);
for (ILogicalPlan nestedPlan : ((AbstractOperatorWithNestedPlans) op).getNestedPlans()) {
- children.addAll(nestedPlan.getRoots());
+ for (Mutable<ILogicalOperator> nestedRootRef : nestedPlan.getRoots()) {
+ checkLeafOperatorsInNestedPlan(op, nestedRootRef);
+ children.add(nestedRootRef);
+ }
}
}
return children;
@@ -262,6 +272,29 @@
}
}
+ private void checkLeafOperatorsInNestedPlan(ILogicalOperator op, Mutable<ILogicalOperator> rootRef)
+ throws AlgebricksException {
+ for (Mutable<ILogicalOperator> leafRef : OperatorManipulationUtil.findLeafDescendantsOrSelf(rootRef)) {
+ ILogicalOperator leafOp = leafRef.getValue();
+ switch (leafOp.getOperatorTag()) {
+ case EMPTYTUPLESOURCE:
+ break;
+ case NESTEDTUPLESOURCE:
+ NestedTupleSourceOperator ntsOp = (NestedTupleSourceOperator) leafOp;
+ ILogicalOperator ntsSrcOp = ntsOp.getDataSourceReference().getValue();
+ if (ntsSrcOp != op) {
+ throw new AlgebricksException(String.format(ERROR_MESSAGE_TEMPLATE_7,
+ PlanStabilityVerifier.printOperator(ntsSrcOp, prettyPrinter),
+ PlanStabilityVerifier.printOperator(op, prettyPrinter)));
+ }
+ break;
+ default:
+ throw new AlgebricksException(String.format(ERROR_MESSAGE_TEMPLATE_8,
+ PlanStabilityVerifier.printOperator(leafOp, prettyPrinter)));
+ }
+ }
+ }
+
private void raiseException(String sharedReferenceKind, String sharedEntity, ILogicalOperator firstOp,
ILogicalOperator secondOp) throws AlgebricksException {
String errorMessage;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanVerifierTestBase.java b/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanVerifierTestBase.java
index 7bfbaa9..65d6460 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanVerifierTestBase.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanVerifierTestBase.java
@@ -92,7 +92,9 @@
Mutable<ILogicalOperator> createSamplePlan1() {
AssignOperator op1 = newAssign(newVar(), newMutable(ConstantExpression.TRUE));
- SubplanOperator op2 = new SubplanOperator(newAssign(newVar(), newMutable(ConstantExpression.TRUE)));
+ AssignOperator r1 = newAssign(newVar(), newMutable(ConstantExpression.TRUE));
+ r1.getInputs().add(new MutableObject<>(new EmptyTupleSourceOperator()));
+ SubplanOperator op2 = new SubplanOperator(r1);
op1.getInputs().add(newMutable(op2));
InnerJoinOperator op3 = new InnerJoinOperator(newMutable(ConstantExpression.TRUE));
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateWindowOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateWindowOperatorsRule.java
index 2ec0654..317dabb 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateWindowOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateWindowOperatorsRule.java
@@ -33,10 +33,12 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.IsomorphismOperatorVisitor;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.IsomorphismUtilities;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -82,7 +84,9 @@
Set<LogicalVariable> used1 = new HashSet<>();
VariableUtilities.getUsedVariables(winOp1, used1);
- if (!OperatorPropertiesUtil.disjoint(winOp2.getVariables(), used1)) {
+ Set<LogicalVariable> produced2 = new HashSet<>();
+ VariableUtilities.getProducedVariables(winOp2, produced2);
+ if (!OperatorPropertiesUtil.disjoint(produced2, used1)) {
return false;
}
@@ -130,7 +134,6 @@
aggTo.getExpressions().addAll(aggFrom.getExpressions());
context.computeAndSetTypeEnvironmentForOperator(aggTo);
} else {
- setAll(winOpTo.getNestedPlans(), winOpFrom.getNestedPlans());
setAll(winOpTo.getFrameValueExpressions(), winOpFrom.getFrameValueExpressions());
setAll(winOpTo.getFrameStartExpressions(), winOpFrom.getFrameStartExpressions());
setAll(winOpTo.getFrameStartValidationExpressions(), winOpFrom.getFrameStartValidationExpressions());
@@ -141,6 +144,19 @@
winOpTo.getFrameExcludeUnaryExpression().setValue(winOpFrom.getFrameExcludeUnaryExpression().getValue());
winOpTo.getFrameOffsetExpression().setValue(winOpFrom.getFrameOffsetExpression().getValue());
winOpTo.setFrameMaxObjects(winOpFrom.getFrameMaxObjects());
+ // move nested plans
+ for (ILogicalPlan fromNestedPlan : winOpFrom.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> rootRef : fromNestedPlan.getRoots()) {
+ for (Mutable<ILogicalOperator> leafRef : OperatorManipulationUtil
+ .findLeafDescendantsOrSelf(rootRef)) {
+ ILogicalOperator leafOp = leafRef.getValue();
+ if (leafOp.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+ ((NestedTupleSourceOperator) leafOp).getDataSourceReference().setValue(winOpTo);
+ }
+ }
+ }
+ winOpTo.getNestedPlans().add(fromNestedPlan);
+ }
}
return true;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index f4c4183..beabb5d 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -56,6 +56,7 @@
public static final int EC_ACTIVE_RESUME_FAILURE = 18;
public static final int EC_NC_FAILED_TO_NOTIFY_TASKS_COMPLETED = 19;
public static final int EC_FAILED_TO_CANCEL_ACTIVE_START_STOP = 22;
+ public static final int EC_INCONSISTENT_STORAGE_REFERENCES = 23;
public static final int EC_IMMEDIATE_HALT = 33;
public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44;
public static final int EC_IO_SCHEDULER_FAILED = 55;