[ASTERIXDB-3310][COMP] Enforce supported types in columnar collections

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Ensure supported types when create a dataset and when
inserting/upserting records.

Change-Id: Ie2b5e2f0ad702be8f4cbcd1e81821144a5fe3c44
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17950
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 0a0aa5c..7a06f4c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -42,6 +42,7 @@
 import org.apache.asterix.optimizer.rules.ConstantFoldingRule;
 import org.apache.asterix.optimizer.rules.CountVarToCountOneRule;
 import org.apache.asterix.optimizer.rules.DisjunctivePredicateToJoinRule;
+import org.apache.asterix.optimizer.rules.EnsureColumnarSupportedTypesRule;
 import org.apache.asterix.optimizer.rules.ExtractBatchableExternalFunctionCallsRule;
 import org.apache.asterix.optimizer.rules.ExtractDistinctByExpressionsRule;
 import org.apache.asterix.optimizer.rules.ExtractOrderExpressionsRule;
@@ -206,6 +207,8 @@
         normalization.add(new ExtractDistinctByExpressionsRule());
         normalization.add(new ExtractOrderExpressionsRule());
         normalization.add(new ExtractWindowExpressionsRule());
+        // EnsureColumnarSupportedTypesRule should go before cast rules
+        normalization.add(new EnsureColumnarSupportedTypesRule());
 
         // IntroduceStaticTypeCastRule should go before
         // IntroduceDynamicTypeCastRule to
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/EnsureColumnarSupportedTypesRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/EnsureColumnarSupportedTypesRule.java
new file mode 100644
index 0000000..8840c8f
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/EnsureColumnarSupportedTypesRule.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules;
+
+import org.apache.asterix.column.metadata.schema.visitor.ColumnSupportedTypesValidator;
+import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.om.types.IAType;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+/**
+ * This rule enforces that inserted or upserted records do not contain unsupported
+ * types for {@link DatasetConfig.DatasetFormat#COLUMN} datasets.
+ */
+public class EnsureColumnarSupportedTypesRule implements IAlgebraicRewriteRule {
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        ILogicalOperator op = opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT || context.checkIfInDontApplySet(this, op)) {
+            return false;
+        }
+
+        InsertDeleteUpsertOperator modOp = (InsertDeleteUpsertOperator) op;
+        // Do not apply the rule again to this operator
+        context.addToDontApplySet(this, op);
+
+        InsertDeleteUpsertOperator.Kind operation = modOp.getOperation();
+        if (operation != InsertDeleteUpsertOperator.Kind.INSERT
+                && operation != InsertDeleteUpsertOperator.Kind.UPSERT) {
+            return false;
+        }
+
+        DatasetConfig.DatasetFormat format = getFormat(modOp, context);
+        if (format != DatasetConfig.DatasetFormat.COLUMN) {
+            return false;
+        }
+
+        IVariableTypeEnvironment typeEnv = context.getOutputTypeEnvironment(modOp);
+        IAType type = (IAType) typeEnv.getType(modOp.getPayloadExpression().getValue());
+        SourceLocation srcLoc = modOp.getPayloadExpression().getValue().getSourceLocation();
+        ColumnSupportedTypesValidator.validate(format, type, srcLoc);
+
+        return false;
+    }
+
+    private DatasetConfig.DatasetFormat getFormat(InsertDeleteUpsertOperator modOp, IOptimizationContext context)
+            throws AlgebricksException {
+        MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+        DataSource dataSource = (DataSource) modOp.getDataSource();
+        if (dataSource == null) {
+            return null;
+        }
+        DataverseName dataverse = dataSource.getId().getDataverseName();
+        String datasetName = dataSource.getId().getDatasourceName();
+        String database = dataSource.getId().getDatabaseName();
+        Dataset dataset = metadataProvider.findDataset(database, dataverse, datasetName);
+        if (dataset != null) {
+            return dataset.getDatasetFormatInfo().getFormat();
+        }
+
+        return null;
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 74f5305..2f8b391 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -64,6 +64,7 @@
 import org.apache.asterix.app.result.fields.ResultHandlePrinter;
 import org.apache.asterix.app.result.fields.ResultsPrinter;
 import org.apache.asterix.app.result.fields.StatusPrinter;
+import org.apache.asterix.column.metadata.schema.visitor.ColumnSupportedTypesValidator;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.common.api.IMetadataLockManager;
@@ -72,6 +73,7 @@
 import org.apache.asterix.common.api.IResponsePrinter;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalTxManager;
+import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.config.DatasetConfig.TransactionState;
@@ -915,9 +917,14 @@
                     throw new CompilationException(ErrorCode.DATASET_EXISTS, sourceLoc, datasetName, dataverseName);
                 }
             }
+            List<TypeExpression> partitioningExprTypes = null;
+            if (dsType == DatasetType.INTERNAL) {
+                partitioningExprTypes = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprTypes();
+            }
 
-            Pair<Datatype, Boolean> itemTypePair = fetchDatasetItemType(mdTxnCtx, dsType, itemTypeDatabaseName,
-                    itemTypeDataverseName, itemTypeName, itemTypeExpr, false, metadataProvider, sourceLoc);
+            Pair<Datatype, Boolean> itemTypePair = fetchDatasetItemType(mdTxnCtx, dsType, datasetFormatInfo.getFormat(),
+                    partitioningExprTypes, itemTypeDatabaseName, itemTypeDataverseName, itemTypeName, itemTypeExpr,
+                    false, metadataProvider, sourceLoc);
             itemTypeEntity = itemTypePair.first;
             IAType itemType = itemTypeEntity.getDatatype();
             boolean itemTypeIsInline = itemTypePair.second;
@@ -938,9 +945,10 @@
             switch (dsType) {
                 case INTERNAL:
                     if (metaItemTypeExpr != null) {
-                        Pair<Datatype, Boolean> metaItemTypePair = fetchDatasetItemType(mdTxnCtx, dsType,
-                                metaItemTypeDatabaseName, metaItemTypeDataverseName, metaItemTypeName, metaItemTypeExpr,
-                                true, metadataProvider, sourceLoc);
+                        Pair<Datatype, Boolean> metaItemTypePair =
+                                fetchDatasetItemType(mdTxnCtx, dsType, datasetFormatInfo.getFormat(),
+                                        partitioningExprTypes, metaItemTypeDatabaseName, metaItemTypeDataverseName,
+                                        metaItemTypeName, metaItemTypeExpr, true, metadataProvider, sourceLoc);
                         metaItemTypeEntity = metaItemTypePair.first;
                         metaItemType = metaItemTypeEntity.getDatatype();
                         metaItemTypeIsInline = metaItemTypePair.second;
@@ -949,8 +957,7 @@
 
                     List<List<String>> partitioningExprs =
                             ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs();
-                    List<TypeExpression> partitioningExprTypes =
-                            ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprTypes();
+
                     List<Integer> keySourceIndicators =
                             ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getKeySourceIndicators();
                     boolean autogenerated = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).isAutogenerated();
@@ -1130,9 +1137,10 @@
     }
 
     protected Pair<Datatype, Boolean> fetchDatasetItemType(MetadataTransactionContext mdTxnCtx, DatasetType datasetType,
-            String itemTypeDatabaseName, DataverseName itemTypeDataverseName, String itemTypeName,
-            TypeExpression itemTypeExpr, boolean isMetaItemType, MetadataProvider metadataProvider,
-            SourceLocation sourceLoc) throws AlgebricksException {
+            DatasetConfig.DatasetFormat format, List<TypeExpression> partitioningExprTypes, String itemTypeDatabaseName,
+            DataverseName itemTypeDataverseName, String itemTypeName, TypeExpression itemTypeExpr,
+            boolean isMetaItemType, MetadataProvider metadataProvider, SourceLocation sourceLoc)
+            throws AlgebricksException {
         switch (itemTypeExpr.getTypeKind()) {
             case TYPEREFERENCE:
                 Datatype itemTypeEntity =
@@ -1143,12 +1151,14 @@
                             DatasetUtil.getFullyQualifiedDisplayName(itemTypeDataverseName, itemTypeName));
                 }
                 IAType itemType = itemTypeEntity.getDatatype();
-                validateDatasetItemType(datasetType, itemType, isMetaItemType, sourceLoc);
+                validateDatasetItemType(datasetType, format, partitioningExprTypes, itemType, isMetaItemType,
+                        sourceLoc);
                 return new Pair<>(itemTypeEntity, false);
             case RECORD:
                 itemType = translateType(itemTypeDatabaseName, itemTypeDataverseName, itemTypeName, itemTypeExpr,
                         mdTxnCtx);
-                validateDatasetItemType(datasetType, itemType, isMetaItemType, sourceLoc);
+                validateDatasetItemType(datasetType, format, partitioningExprTypes, itemType, isMetaItemType,
+                        sourceLoc);
                 itemTypeEntity =
                         new Datatype(itemTypeDatabaseName, itemTypeDataverseName, itemTypeName, itemType, true);
                 return new Pair<>(itemTypeEntity, true);
@@ -1158,7 +1168,8 @@
         }
     }
 
-    protected void validateDatasetItemType(DatasetType datasetType, IAType itemType, boolean isMetaItemType,
+    protected void validateDatasetItemType(DatasetType datasetType, DatasetConfig.DatasetFormat format,
+            List<TypeExpression> partitioningExprTypes, IAType itemType, boolean isMetaItemType,
             SourceLocation sourceLoc) throws AlgebricksException {
         if (itemType.getTypeTag() != ATypeTag.OBJECT) {
             throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
@@ -1169,6 +1180,21 @@
         if (datasetType == DatasetType.VIEW) {
             ViewUtil.validateViewItemType((ARecordType) itemType, sourceLoc);
         }
+
+        // Validate columnar type
+        if (datasetType == DatasetType.INTERNAL) {
+            ColumnSupportedTypesValidator.validate(format, itemType, sourceLoc);
+            if (partitioningExprTypes != null) {
+                for (TypeExpression typeExpr : partitioningExprTypes) {
+                    String typeName = ((TypeReferenceExpression) typeExpr).getIdent().second.getValue();
+                    IAType type = BuiltinTypeMap.getBuiltinType(typeName);
+                    if (type != null) {
+                        // type will be validated next
+                        ColumnSupportedTypesValidator.validate(format, type, sourceLoc);
+                    }
+                }
+            }
+        }
     }
 
     protected Map<String, String> createExternalDatasetProperties(String databaseName, DataverseName dataverseName,
@@ -2780,9 +2806,9 @@
             List<ViewDetails.ForeignKey> foreignKeys = null;
             String datetimeFormat = null, dateFormat = null, timeFormat = null;
             if (cvs.hasItemType()) {
-                Pair<Datatype, Boolean> itemTypePair =
-                        fetchDatasetItemType(mdTxnCtx, DatasetType.VIEW, itemTypeDatabaseName, itemTypeDataverseName,
-                                itemTypeName, cvs.getItemType(), false, metadataProvider, sourceLoc);
+                Pair<Datatype, Boolean> itemTypePair = fetchDatasetItemType(mdTxnCtx, DatasetType.VIEW,
+                        DatasetConfig.DatasetFormat.ROW, null, itemTypeDatabaseName, itemTypeDataverseName,
+                        itemTypeName, cvs.getItemType(), false, metadataProvider, sourceLoc);
                 itemTypeEntity = itemTypePair.first;
                 itemTypeIsInline = itemTypePair.second;
                 ARecordType itemType = (ARecordType) itemTypeEntity.getDatatype();
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.001.ddl.sqlpp
new file mode 100644
index 0000000..a938e7e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.001.ddl.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE TYPE InvalidType AS {
+    uid: uuid,
+    birthdate: datetime,
+    created_time: time,
+    created_date: date,
+    online_since: duration
+};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.002.ddl.sqlpp
new file mode 100644
index 0000000..6544da6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.002.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+CREATE DATASET ColumnUnsupportedType
+PRIMARY KEY (birthdate: datetime) WITH {
+    "storage-format": {"format": "column"}
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.003.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.003.ddl.sqlpp
new file mode 100644
index 0000000..93df9b4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.003.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+CREATE DATASET ColumnUnsupportedType(InvalidType)
+PRIMARY KEY uid WITH {
+    "storage-format": {"format": "column"}
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.004.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.004.ddl.sqlpp
new file mode 100644
index 0000000..c3a6c7e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.004.ddl.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+CREATE DATASET ColumnUnsupportedType
+PRIMARY KEY (uid: uuid) AUTOGENERATED WITH {
+    "storage-format": {"format": "column"}
+};
+
+CREATE DATASET RowDataset
+PRIMARY KEY (birthdate: datetime) WITH {
+    "storage-format": {"format": "row"}
+};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.005.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.005.update.sqlpp
new file mode 100644
index 0000000..9a9428c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.005.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+INSERT INTO ColumnUnsupportedType (
+    {"birthdate": datetime("2022-01-01T00:00:00")}
+)
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.006.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.006.update.sqlpp
new file mode 100644
index 0000000..2595748
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/supported-types/supported-types.006.update.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+INSERT INTO ColumnUnsupportedType (
+    SELECT VALUE r
+    FROM RowDataset r
+)
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
index aa72489..0b65d93 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
@@ -16438,6 +16438,16 @@
         <output-dir compare="Text">metadata</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="column">
+      <compilation-unit name="supported-types">
+        <output-dir compare="Text">supported-types</output-dir>
+        <expected-error>ASX0067: Type(s) '[datetime]' are not supported in columnar storage format. Supported types are [bigint, double, string, boolean, uuid]</expected-error>
+        <expected-error>ASX0067: Type(s) '[datetime, date, time, duration]' are not supported in columnar storage format. Supported types are [bigint, double, string, boolean, uuid]</expected-error>
+        <expected-error>ASX0067: Type(s) '[datetime]' are not supported in columnar storage format. Supported types are [bigint, double, string, boolean, uuid]</expected-error>
+        <expected-error>ASX0067: Type(s) '[datetime]' are not supported in columnar storage format. Supported types are [bigint, double, string, boolean, uuid]</expected-error>
+        <source-location>false</source-location>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="copy-from">
     <test-case FilePath="copy-from">
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java
index eba5ac0..3cacb8a 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.metadata.schema;
 
+import static org.apache.asterix.column.util.ColumnValuesUtil.getNormalizedTypeTag;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -69,7 +71,7 @@
 
     public AbstractSchemaNode getOrCreateChild(ATypeTag childTypeTag, FlushColumnMetadata columnMetadata)
             throws HyracksDataException {
-        ATypeTag normalizedTypeTag = FlushColumnMetadata.getNormalizedTypeTag(childTypeTag);
+        ATypeTag normalizedTypeTag = getNormalizedTypeTag(childTypeTag);
         AbstractSchemaNode currentChild = children.get(normalizedTypeTag);
         //The parent of a union child should be the actual parent
         AbstractSchemaNode newChild = columnMetadata.getOrCreateChild(currentChild, normalizedTypeTag);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/ColumnSupportedTypesValidator.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/ColumnSupportedTypesValidator.java
new file mode 100644
index 0000000..5b27a74
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/ColumnSupportedTypesValidator.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.visitor;
+
+import static org.apache.asterix.column.util.ColumnValuesUtil.getNormalizedTypeTag;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.column.values.reader.ColumnValueReaderFactory;
+import org.apache.asterix.column.values.writer.ColumnValuesWriterFactory;
+import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.IATypeVisitor;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+/**
+ * Validates supported types for datasets with {@link DatasetConfig.DatasetFormat#COLUMN} format
+ *
+ * @see ColumnValuesWriterFactory
+ * @see ColumnValueReaderFactory
+ */
+public class ColumnSupportedTypesValidator implements IATypeVisitor<Void, Set<ATypeTag>> {
+    private static final Set<ATypeTag> SUPPORTED_PRIMITIVE_TYPES =
+            Set.of(ATypeTag.BOOLEAN, ATypeTag.BIGINT, ATypeTag.DOUBLE, ATypeTag.STRING, ATypeTag.UUID);
+    private static final String SUPPORTED_TYPES_STRING =
+            SUPPORTED_PRIMITIVE_TYPES.stream().sorted().collect(Collectors.toList()).toString();
+    private static final ColumnSupportedTypesValidator VALIDATOR = new ColumnSupportedTypesValidator();
+
+    private ColumnSupportedTypesValidator() {
+    }
+
+    /**
+     * Ensure dataset format nested type includes only supported types
+     *
+     * @param format dataset format
+     * @param type   to check
+     * @throws CompilationException if an unsupported type is encountered
+     */
+    public static void validate(DatasetConfig.DatasetFormat format, IAType type) throws CompilationException {
+        validate(format, type, null);
+    }
+
+    /**
+     * Ensure dataset format nested type includes only supported types
+     *
+     * @param format         dataset format
+     * @param type           to check
+     * @param sourceLocation source location (if any)
+     * @throws CompilationException if an unsupported type is encountered
+     */
+    public static void validate(DatasetConfig.DatasetFormat format, IAType type, SourceLocation sourceLocation)
+            throws CompilationException {
+        if (format != DatasetConfig.DatasetFormat.COLUMN) {
+            return;
+        }
+
+        Set<ATypeTag> unsupportedTypes = new HashSet<>();
+        type.accept(VALIDATOR, unsupportedTypes);
+        if (!unsupportedTypes.isEmpty()) {
+            String unsupportedList = unsupportedTypes.stream().sorted().collect(Collectors.toList()).toString();
+            throw CompilationException.create(ErrorCode.UNSUPPORTED_COLUMN_TYPE, sourceLocation, unsupportedList,
+                    SUPPORTED_TYPES_STRING);
+        }
+    }
+
+    @Override
+    public Void visit(ARecordType recordType, Set<ATypeTag> arg) {
+        for (IAType fieldType : recordType.getFieldTypes()) {
+            fieldType.accept(this, arg);
+        }
+
+        return null;
+    }
+
+    @Override
+    public Void visit(AbstractCollectionType collectionType, Set<ATypeTag> arg) {
+        return collectionType.getItemType().accept(this, arg);
+    }
+
+    @Override
+    public Void visit(AUnionType unionType, Set<ATypeTag> arg) {
+        for (IAType fieldType : unionType.getUnionList()) {
+            fieldType.accept(this, arg);
+        }
+
+        return null;
+    }
+
+    @Override
+    public Void visitFlat(IAType flatType, Set<ATypeTag> arg) {
+        ATypeTag typeTag = getNormalizedTypeTag(flatType.getTypeTag());
+        // Allow ANY
+        if (typeTag != ATypeTag.ANY && typeTag != ATypeTag.NULL && typeTag != ATypeTag.MISSING
+                && !SUPPORTED_PRIMITIVE_TYPES.contains(typeTag)) {
+            arg.add(typeTag);
+        }
+
+        return null;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
index 0d7404d..e89a120 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.operation.lsm.flush;
 
+import static org.apache.asterix.column.util.ColumnValuesUtil.getNormalizedTypeTag;
+
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
@@ -563,19 +565,6 @@
         }
     }
 
-    public static ATypeTag getNormalizedTypeTag(ATypeTag typeTag) {
-        switch (typeTag) {
-            case TINYINT:
-            case SMALLINT:
-            case INTEGER:
-                return ATypeTag.BIGINT;
-            case FLOAT:
-                return ATypeTag.DOUBLE;
-            default:
-                return typeTag;
-        }
-    }
-
     public void close() {
         //Dereference multiPageOp
         multiPageOpRef.setValue(null);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/ColumnValuesUtil.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/ColumnValuesUtil.java
index 0ecdeef..3094abe 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/ColumnValuesUtil.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/ColumnValuesUtil.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.util;
 
+import org.apache.asterix.om.types.ATypeTag;
+
 public class ColumnValuesUtil {
     private ColumnValuesUtil() {
     }
@@ -45,4 +47,17 @@
     public static int clearNullBit(int nullBitMask, int level) {
         return (nullBitMask - 1) & level;
     }
+
+    public static ATypeTag getNormalizedTypeTag(ATypeTag typeTag) {
+        switch (typeTag) {
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+                return ATypeTag.BIGINT;
+            case FLOAT:
+                return ATypeTag.DOUBLE;
+            default:
+                return typeTag;
+        }
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 97b8060..a49ca55 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -94,6 +94,7 @@
     NON_STRING_WRITE_PATH(64),
     WRITE_PATH_LENGTH_EXCEEDS_MAX_LENGTH(65),
     TYPE_MISMATCH_EXTRA_FIELD(66),
+    UNSUPPORTED_COLUMN_TYPE(67),
 
     UNSUPPORTED_JRE(100),
 
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 0e80720..f97b7b6 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -101,6 +101,7 @@
 64 = Path expression produced a value of type '%1$s'. Path must be of type string
 65 = Length of the file path '%1$s' exceeds the maximum length of '%2$s bytes' allowed in %3$s
 66 = Type mismatch: including an extra field %1$s
+67 = Type(s) '%1$s' are not supported in columnar storage format. Supported types are %2$s
 
 100 = Unsupported JRE: %1$s