[ASTERIXDB-3392] Improve logging on copy to parquet
- user mode changes: no
- storage format changes: no
- interface changes: no
Ext-ref: MB-66587
Change-Id: I93c6e8d9d6eed4e42ea5419a444b7aaee348965b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19745
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: <preetham02@apache.org>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: <preetham02@apache.org>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java
index 255009e..ca87ced 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java
@@ -29,9 +29,13 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.util.LogRedactionUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
// Maintains a pool of Parquet writers holding a file, each with its own schema , and writes values to the appropriate writer based on schema.
public class ParquetSchemaInferPoolWriter {
+ private static final Logger LOGGER = LogManager.getLogger();
private final ParquetExternalWriterFactory writerFactory;
private List<ParquetSchemaTree.SchemaNode> schemaNodes;
private List<IExternalWriter> writerList;
@@ -65,6 +69,10 @@
}
if (schemaNodes.size() == maxSchemas) {
+ LOGGER.info("Schema limit exceeded, max schemas allowed: {}", maxSchemas);
+ schemaNodes.forEach(schemaNode -> {
+ LOGGER.info("Inferred schema: {}", LogRedactionUtil.userData(schemaNode.toString()));
+ });
throw new HyracksDataException(ErrorCode.SCHEMA_LIMIT_EXCEEDED, maxSchemas);
}
schemaNodes.add(schemaLazyVisitor.inferSchema(value));
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
index 2a03bfd..cffeb2f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
@@ -35,13 +35,16 @@
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.util.LogRedactionUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
public class ParquetRecordLazyVisitor implements ILazyVisitablePointableVisitor<Void, Type> {
-
+ private static final Logger LOGGER = LogManager.getLogger();
private final MessageType schema;
private final RecordLazyVisitablePointable rec;
// The Record Consumer is responsible for traversing the record tree,
@@ -72,6 +75,7 @@
public Void visit(RecordLazyVisitablePointable pointable, Type type) throws HyracksDataException {
if (type.isPrimitive()) {
+ LOGGER.info("Expected primitive type: {} but got record type", LogRedactionUtil.userData(type.toString()));
throw new HyracksDataException(ErrorCode.RESULT_DOES_NOT_FOLLOW_SCHEMA, GROUP_TYPE_ERROR_FIELD,
PRIMITIVE_TYPE_ERROR_FIELD, type.getName());
}
@@ -84,6 +88,8 @@
String columnName = fieldNamesDictionary.getOrCreateFieldNameIndex(pointable.getFieldName());
if (!groupType.containsField(columnName)) {
+ LOGGER.info("Group type: {} does not contain field in record type: {}",
+ LogRedactionUtil.userData(groupType.getName()), LogRedactionUtil.userData(columnName));
throw new HyracksDataException(ErrorCode.EXTRA_FIELD_IN_RESULT_NOT_FOUND_IN_SCHEMA, columnName,
groupType.getName());
}
@@ -99,17 +105,22 @@
public Void visit(AbstractListLazyVisitablePointable pointable, Type type) throws HyracksDataException {
if (type.isPrimitive()) {
+ LOGGER.info("Expected primitive type: {} but got list type", LogRedactionUtil.userData(type.toString()));
throw new HyracksDataException(ErrorCode.RESULT_DOES_NOT_FOLLOW_SCHEMA, GROUP_TYPE_ERROR_FIELD,
PRIMITIVE_TYPE_ERROR_FIELD, type.getName());
}
GroupType groupType = type.asGroupType();
if (!groupType.containsField(LIST_FIELD)) {
+ LOGGER.info("Group type: {} does not contain field in list type: {}",
+ LogRedactionUtil.userData(groupType.getName()), LIST_FIELD);
throw new HyracksDataException(ErrorCode.EXTRA_FIELD_IN_RESULT_NOT_FOUND_IN_SCHEMA, LIST_FIELD,
groupType.getName());
}
if (groupType.getType(LIST_FIELD).isPrimitive()) {
+ LOGGER.info("Expected group type: {} but got primitive type",
+ LogRedactionUtil.userData(groupType.getType(LIST_FIELD).toString()));
throw new HyracksDataException(ErrorCode.RESULT_DOES_NOT_FOLLOW_SCHEMA, GROUP_TYPE_ERROR_FIELD,
PRIMITIVE_TYPE_ERROR_FIELD, LIST_FIELD);
}
@@ -117,6 +128,8 @@
GroupType listType = groupType.getType(LIST_FIELD).asGroupType();
if (!listType.containsField(ELEMENT_FIELD)) {
+ LOGGER.info("Group type: {} does not contain field: {}", LogRedactionUtil.userData(listType.toString()),
+ ELEMENT_FIELD);
throw new HyracksDataException(ErrorCode.EXTRA_FIELD_IN_RESULT_NOT_FOUND_IN_SCHEMA, ELEMENT_FIELD,
listType.getName());
}
@@ -147,8 +160,9 @@
@Override
public Void visit(FlatLazyVisitablePointable pointable, Type type) throws HyracksDataException {
-
if (!type.isPrimitive()) {
+ LOGGER.info("Expected non primitive type: {} but got: {}", LogRedactionUtil.userData(type.toString()),
+ pointable.getTypeTag());
throw new HyracksDataException(ErrorCode.RESULT_DOES_NOT_FOLLOW_SCHEMA, PRIMITIVE_TYPE_ERROR_FIELD,
GROUP_TYPE_ERROR_FIELD, type.getName());
}
@@ -168,6 +182,8 @@
AbstractLazyVisitablePointable child = rec.getChildVisitablePointable();
if (!schema.containsField(columnName)) {
+ LOGGER.info("Schema: {} does not contain field: {}", LogRedactionUtil.userData(schema.toString()),
+ LogRedactionUtil.userData(columnName));
throw new HyracksDataException(ErrorCode.EXTRA_FIELD_IN_RESULT_NOT_FOUND_IN_SCHEMA, columnName,
schema.getName());
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java
index bd869e9..2278372 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java
@@ -36,11 +36,15 @@
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.util.LogRedactionUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
// This class is used to infer the schema of a record into SchemaNode, which is an internal tree representation of the schema.
public class ParquetSchemaLazyVisitor implements ILazyVisitablePointableVisitor<Void, ParquetSchemaTree.SchemaNode> {
+ private static final Logger LOGGER = LogManager.getLogger();
private final RecordLazyVisitablePointable rec;
private final FieldNamesDictionary fieldNamesDictionary;
private final static String SCHEMA_NAME = "asterix_schema";
@@ -63,6 +67,8 @@
schemaNode.setType(new ParquetSchemaTree.RecordType());
}
if (!(schemaNode.getType() instanceof ParquetSchemaTree.RecordType)) {
+ LOGGER.info("Incompatible type found in record: {} and {}",
+ LogRedactionUtil.userData(schemaNode.toString()), pointable.getTypeTag());
throw RuntimeDataException.create(PARQUET_UNSUPPORTED_MIXED_TYPE_ARRAY);
}
ParquetSchemaTree.RecordType recordType = (ParquetSchemaTree.RecordType) schemaNode.getType();
@@ -88,11 +94,12 @@
if (schemaNode.getType() == null) {
schemaNode.setType(new ParquetSchemaTree.ListType());
}
- if (!(schemaNode.getType() instanceof ParquetSchemaTree.ListType)) {
+ if (!(schemaNode.getType() instanceof ParquetSchemaTree.ListType listType)) {
+ LOGGER.info("Incompatible type found in list: {} and {}" ,LogRedactionUtil.userData(schemaNode.toString()) ,pointable.getTypeTag());
throw RuntimeDataException.create(PARQUET_UNSUPPORTED_MIXED_TYPE_ARRAY);
}
- ParquetSchemaTree.ListType listType = (ParquetSchemaTree.ListType) schemaNode.getType();
- for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+ int numChildren = pointable.getNumberOfChildren();
+ for (int i = 0; i < numChildren; i++) {
pointable.nextChild();
AbstractLazyVisitablePointable child = pointable.getChildVisitablePointable();
if (listType.isEmpty()) {
@@ -114,11 +121,14 @@
return null;
}
if (!(schemaNode.getType() instanceof ParquetSchemaTree.FlatType)) {
+ LOGGER.info("Incompatible type found: {} and {}", LogRedactionUtil.userData(schemaNode.toString()),
+ pointable.getTypeTag());
throw RuntimeDataException.create(PARQUET_UNSUPPORTED_MIXED_TYPE_ARRAY);
}
ParquetSchemaTree.FlatType flatType = (ParquetSchemaTree.FlatType) schemaNode.getType();
if (!flatType.isCompatibleWith(pointable.getTypeTag())) {
+ LOGGER.info("Incompatible type found: {} and {}", flatType, pointable.getTypeTag());
throw RuntimeDataException.create(PARQUET_UNSUPPORTED_MIXED_TYPE_ARRAY);
}
@@ -136,6 +146,7 @@
public static MessageType generateSchema(ParquetSchemaTree.SchemaNode schemaRoot) throws HyracksDataException {
Types.MessageTypeBuilder builder = Types.buildMessage();
+ LOGGER.info("Building parquet schema: {}", LogRedactionUtil.userData(schemaRoot.toString()));
if (schemaRoot.getType() == null)
return builder.named(SCHEMA_NAME);
for (Map.Entry<String, ParquetSchemaTree.SchemaNode> entry : ((ParquetSchemaTree.RecordType) schemaRoot
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaTree.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaTree.java
index 2ca086f..dae5295 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaTree.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaTree.java
@@ -28,11 +28,15 @@
import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.LogRedactionUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
public class ParquetSchemaTree {
+ private static final Logger LOGGER = LogManager.getLogger();
public static class SchemaNode {
private AbstractType type;
@@ -48,6 +52,11 @@
public AbstractType getType() {
return type;
}
+
+ @Override
+ public String toString() {
+ return type == null ? "NULL" : type.toString();
+ }
}
static class RecordType extends AbstractType {
@@ -68,6 +77,17 @@
Map<String, SchemaNode> getChildren() {
return children;
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ ");
+ for (Map.Entry<String, SchemaNode> entry : children.entrySet()) {
+ sb.append(entry.getKey()).append(" : ").append(entry.getValue()).append(", ");
+ }
+ sb.append(" }");
+ return sb.toString();
+ }
}
abstract static class AbstractType {
@@ -82,6 +102,11 @@
isHierarchical = AsterixParquetTypeMap.HIERARCHIAL_TYPES.containsKey(typeTag);
}
+ @Override
+ public String toString() {
+ return typeTag.toString();
+ }
+
public boolean isCompatibleWith(ATypeTag typeTag) {
if (isHierarchical) {
return AsterixParquetTypeMap.HIERARCHIAL_TYPES.containsKey(typeTag);
@@ -113,6 +138,11 @@
child = null;
}
+ @Override
+ public String toString() {
+ return "[ " + child + " ]";
+ }
+
void setChild(SchemaNode child) {
this.child = child;
}
@@ -129,6 +159,8 @@
public static void buildParquetSchema(Types.Builder builder, SchemaNode schemaNode, String columnName)
throws HyracksDataException {
if (schemaNode.getType() == null) {
+ LOGGER.info(
+ "Child type not set for record value with column name: " + LogRedactionUtil.userData(columnName));
throw new HyracksDataException(ErrorCode.EMPTY_TYPE_INFERRED);
}
AbstractType typeClass = schemaNode.getType();
@@ -154,6 +186,7 @@
Types.BaseListBuilder<?, ?> childBuilder = getListChild(builder);
SchemaNode child = type.child;
if (child == null) {
+ LOGGER.info("Child type not set for list with column name: " + LogRedactionUtil.userData(columnName));
throw new HyracksDataException(ErrorCode.EMPTY_TYPE_INFERRED);
}
buildParquetSchema(childBuilder, child, columnName);
@@ -161,6 +194,8 @@
private static void buildFlat(Types.Builder builder, FlatType type, String columnName) throws HyracksDataException {
if (type.getPrimitiveTypeName() == null) {
+ LOGGER.info("Child primitive type not set for flat value with column name: "
+ + LogRedactionUtil.userData(columnName));
// Not sure if this is the right thing to do here
throw new HyracksDataException(ErrorCode.EMPTY_TYPE_INFERRED);
}