[ASTERIXDB-3392] Improve logging on copy to parquet

- user mode changes: no
- storage format changes: no
- interface changes: no
Ext-ref: MB-66587

Change-Id: I93c6e8d9d6eed4e42ea5419a444b7aaee348965b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19745
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: <preetham02@apache.org>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: <preetham02@apache.org>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java
index 255009e..ca87ced 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java
@@ -29,9 +29,13 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.util.LogRedactionUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 // Maintains a pool of Parquet writers holding a file, each with its own schema , and writes values to the appropriate writer based on schema.
 public class ParquetSchemaInferPoolWriter {
+    private static final Logger LOGGER = LogManager.getLogger();
     private final ParquetExternalWriterFactory writerFactory;
     private List<ParquetSchemaTree.SchemaNode> schemaNodes;
     private List<IExternalWriter> writerList;
@@ -65,6 +69,10 @@
         }
 
         if (schemaNodes.size() == maxSchemas) {
+            LOGGER.info("Schema limit exceeded, max schemas allowed: {}", maxSchemas);
+            schemaNodes.forEach(schemaNode -> {
+                LOGGER.info("Inferred schema: {}", LogRedactionUtil.userData(schemaNode.toString()));
+            });
             throw new HyracksDataException(ErrorCode.SCHEMA_LIMIT_EXCEEDED, maxSchemas);
         }
         schemaNodes.add(schemaLazyVisitor.inferSchema(value));
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
index 2a03bfd..cffeb2f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
@@ -35,13 +35,16 @@
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.util.LogRedactionUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
 
 public class ParquetRecordLazyVisitor implements ILazyVisitablePointableVisitor<Void, Type> {
-
+    private static final Logger LOGGER = LogManager.getLogger();
     private final MessageType schema;
     private final RecordLazyVisitablePointable rec;
     //     The Record Consumer is responsible for traversing the record tree,
@@ -72,6 +75,7 @@
     public Void visit(RecordLazyVisitablePointable pointable, Type type) throws HyracksDataException {
 
         if (type.isPrimitive()) {
+            LOGGER.info("Expected primitive type: {} but got record type", LogRedactionUtil.userData(type.toString()));
             throw new HyracksDataException(ErrorCode.RESULT_DOES_NOT_FOLLOW_SCHEMA, GROUP_TYPE_ERROR_FIELD,
                     PRIMITIVE_TYPE_ERROR_FIELD, type.getName());
         }
@@ -84,6 +88,8 @@
             String columnName = fieldNamesDictionary.getOrCreateFieldNameIndex(pointable.getFieldName());
 
             if (!groupType.containsField(columnName)) {
+                LOGGER.info("Group type: {} does not contain field in record type: {}",
+                        LogRedactionUtil.userData(groupType.getName()), LogRedactionUtil.userData(columnName));
                 throw new HyracksDataException(ErrorCode.EXTRA_FIELD_IN_RESULT_NOT_FOUND_IN_SCHEMA, columnName,
                         groupType.getName());
             }
@@ -99,17 +105,22 @@
     public Void visit(AbstractListLazyVisitablePointable pointable, Type type) throws HyracksDataException {
 
         if (type.isPrimitive()) {
+            LOGGER.info("Expected primitive type: {} but got list type", LogRedactionUtil.userData(type.toString()));
             throw new HyracksDataException(ErrorCode.RESULT_DOES_NOT_FOLLOW_SCHEMA, GROUP_TYPE_ERROR_FIELD,
                     PRIMITIVE_TYPE_ERROR_FIELD, type.getName());
         }
         GroupType groupType = type.asGroupType();
 
         if (!groupType.containsField(LIST_FIELD)) {
+            LOGGER.info("Group type: {} does not contain field in list type: {}",
+                    LogRedactionUtil.userData(groupType.getName()), LIST_FIELD);
             throw new HyracksDataException(ErrorCode.EXTRA_FIELD_IN_RESULT_NOT_FOUND_IN_SCHEMA, LIST_FIELD,
                     groupType.getName());
         }
 
         if (groupType.getType(LIST_FIELD).isPrimitive()) {
+            LOGGER.info("Expected group type: {} but got primitive type",
+                    LogRedactionUtil.userData(groupType.getType(LIST_FIELD).toString()));
             throw new HyracksDataException(ErrorCode.RESULT_DOES_NOT_FOLLOW_SCHEMA, GROUP_TYPE_ERROR_FIELD,
                     PRIMITIVE_TYPE_ERROR_FIELD, LIST_FIELD);
         }
@@ -117,6 +128,8 @@
         GroupType listType = groupType.getType(LIST_FIELD).asGroupType();
 
         if (!listType.containsField(ELEMENT_FIELD)) {
+            LOGGER.info("Group type: {} does not contain field: {}", LogRedactionUtil.userData(listType.toString()),
+                    ELEMENT_FIELD);
             throw new HyracksDataException(ErrorCode.EXTRA_FIELD_IN_RESULT_NOT_FOUND_IN_SCHEMA, ELEMENT_FIELD,
                     listType.getName());
         }
@@ -147,8 +160,9 @@
 
     @Override
     public Void visit(FlatLazyVisitablePointable pointable, Type type) throws HyracksDataException {
-
         if (!type.isPrimitive()) {
+            LOGGER.info("Expected non primitive type: {} but got: {}", LogRedactionUtil.userData(type.toString()),
+                    pointable.getTypeTag());
             throw new HyracksDataException(ErrorCode.RESULT_DOES_NOT_FOLLOW_SCHEMA, PRIMITIVE_TYPE_ERROR_FIELD,
                     GROUP_TYPE_ERROR_FIELD, type.getName());
         }
@@ -168,6 +182,8 @@
             AbstractLazyVisitablePointable child = rec.getChildVisitablePointable();
 
             if (!schema.containsField(columnName)) {
+                LOGGER.info("Schema: {} does not contain field: {}", LogRedactionUtil.userData(schema.toString()),
+                        LogRedactionUtil.userData(columnName));
                 throw new HyracksDataException(ErrorCode.EXTRA_FIELD_IN_RESULT_NOT_FOUND_IN_SCHEMA, columnName,
                         schema.getName());
             }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java
index bd869e9..2278372 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java
@@ -36,11 +36,15 @@
 import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.util.LogRedactionUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Types;
 
 // This class is used to infer the schema of a record into SchemaNode, which is an internal tree representation of the schema.
 public class ParquetSchemaLazyVisitor implements ILazyVisitablePointableVisitor<Void, ParquetSchemaTree.SchemaNode> {
+    private static final Logger LOGGER = LogManager.getLogger();
     private final RecordLazyVisitablePointable rec;
     private final FieldNamesDictionary fieldNamesDictionary;
     private final static String SCHEMA_NAME = "asterix_schema";
@@ -63,6 +67,8 @@
             schemaNode.setType(new ParquetSchemaTree.RecordType());
         }
         if (!(schemaNode.getType() instanceof ParquetSchemaTree.RecordType)) {
+            LOGGER.info("Incompatible type found in record: {} and {}",
+                    LogRedactionUtil.userData(schemaNode.toString()), pointable.getTypeTag());
             throw RuntimeDataException.create(PARQUET_UNSUPPORTED_MIXED_TYPE_ARRAY);
         }
         ParquetSchemaTree.RecordType recordType = (ParquetSchemaTree.RecordType) schemaNode.getType();
@@ -88,11 +94,12 @@
         if (schemaNode.getType() == null) {
             schemaNode.setType(new ParquetSchemaTree.ListType());
         }
-        if (!(schemaNode.getType() instanceof ParquetSchemaTree.ListType)) {
+        if (!(schemaNode.getType() instanceof ParquetSchemaTree.ListType listType)) {
+            LOGGER.info("Incompatible type found in list: {} and {}" ,LogRedactionUtil.userData(schemaNode.toString()) ,pointable.getTypeTag());
             throw RuntimeDataException.create(PARQUET_UNSUPPORTED_MIXED_TYPE_ARRAY);
         }
-        ParquetSchemaTree.ListType listType = (ParquetSchemaTree.ListType) schemaNode.getType();
-        for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+        int numChildren = pointable.getNumberOfChildren();
+        for (int i = 0; i < numChildren; i++) {
             pointable.nextChild();
             AbstractLazyVisitablePointable child = pointable.getChildVisitablePointable();
             if (listType.isEmpty()) {
@@ -114,11 +121,14 @@
             return null;
         }
         if (!(schemaNode.getType() instanceof ParquetSchemaTree.FlatType)) {
+            LOGGER.info("Incompatible type found: {} and {}", LogRedactionUtil.userData(schemaNode.toString()),
+                    pointable.getTypeTag());
             throw RuntimeDataException.create(PARQUET_UNSUPPORTED_MIXED_TYPE_ARRAY);
         }
         ParquetSchemaTree.FlatType flatType = (ParquetSchemaTree.FlatType) schemaNode.getType();
 
         if (!flatType.isCompatibleWith(pointable.getTypeTag())) {
+            LOGGER.info("Incompatible type found: {} and {}", flatType, pointable.getTypeTag());
             throw RuntimeDataException.create(PARQUET_UNSUPPORTED_MIXED_TYPE_ARRAY);
         }
 
@@ -136,6 +146,7 @@
 
     public static MessageType generateSchema(ParquetSchemaTree.SchemaNode schemaRoot) throws HyracksDataException {
         Types.MessageTypeBuilder builder = Types.buildMessage();
+        LOGGER.info("Building parquet schema: {}", LogRedactionUtil.userData(schemaRoot.toString()));
         if (schemaRoot.getType() == null)
             return builder.named(SCHEMA_NAME);
         for (Map.Entry<String, ParquetSchemaTree.SchemaNode> entry : ((ParquetSchemaTree.RecordType) schemaRoot
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaTree.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaTree.java
index 2ca086f..dae5295 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaTree.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaTree.java
@@ -28,11 +28,15 @@
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.LogRedactionUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Types;
 
 public class ParquetSchemaTree {
+    private static final Logger LOGGER = LogManager.getLogger();
 
     public static class SchemaNode {
         private AbstractType type;
@@ -48,6 +52,11 @@
         public AbstractType getType() {
             return type;
         }
+
+        @Override
+        public String toString() {
+            return type == null ? "NULL" : type.toString();
+        }
     }
 
     static class RecordType extends AbstractType {
@@ -68,6 +77,17 @@
         Map<String, SchemaNode> getChildren() {
             return children;
         }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append("{ ");
+            for (Map.Entry<String, SchemaNode> entry : children.entrySet()) {
+                sb.append(entry.getKey()).append(" : ").append(entry.getValue()).append(", ");
+            }
+            sb.append(" }");
+            return sb.toString();
+        }
     }
 
     abstract static class AbstractType {
@@ -82,6 +102,11 @@
             isHierarchical = AsterixParquetTypeMap.HIERARCHIAL_TYPES.containsKey(typeTag);
         }
 
+        @Override
+        public String toString() {
+            return typeTag.toString();
+        }
+
         public boolean isCompatibleWith(ATypeTag typeTag) {
             if (isHierarchical) {
                 return AsterixParquetTypeMap.HIERARCHIAL_TYPES.containsKey(typeTag);
@@ -113,6 +138,11 @@
             child = null;
         }
 
+        @Override
+        public String toString() {
+            return "[ " + child + " ]";
+        }
+
         void setChild(SchemaNode child) {
             this.child = child;
         }
@@ -129,6 +159,8 @@
     public static void buildParquetSchema(Types.Builder builder, SchemaNode schemaNode, String columnName)
             throws HyracksDataException {
         if (schemaNode.getType() == null) {
+            LOGGER.info(
+                    "Child type not set for record value with column name: " + LogRedactionUtil.userData(columnName));
             throw new HyracksDataException(ErrorCode.EMPTY_TYPE_INFERRED);
         }
         AbstractType typeClass = schemaNode.getType();
@@ -154,6 +186,7 @@
         Types.BaseListBuilder<?, ?> childBuilder = getListChild(builder);
         SchemaNode child = type.child;
         if (child == null) {
+            LOGGER.info("Child type not set for list with column name: " + LogRedactionUtil.userData(columnName));
             throw new HyracksDataException(ErrorCode.EMPTY_TYPE_INFERRED);
         }
         buildParquetSchema(childBuilder, child, columnName);
@@ -161,6 +194,8 @@
 
     private static void buildFlat(Types.Builder builder, FlatType type, String columnName) throws HyracksDataException {
         if (type.getPrimitiveTypeName() == null) {
+            LOGGER.info("Child primitive type not set for flat value with column name: "
+                    + LogRedactionUtil.userData(columnName));
             // Not sure if this is the right thing to do here
             throw new HyracksDataException(ErrorCode.EMPTY_TYPE_INFERRED);
         }