[ASTERIXDB-3483]: Owner changes for Dataset and View

- user model changes: no
- storage format changes: no
- interface changes: yes

details:
- Add 'Creator' nested open fields in the dataset entity.
- Extend methods to pass the creator.
- This change is backward compatible.

Ext-ref: MB-62971

Change-Id: I4b7a586b8d839ea812c36b464d562d80fddc25f4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18686
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Janhavi Tripurwar <janhavi.tripurwar@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index bf07018..02f1085 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -394,7 +394,8 @@
                                 Creator.DEFAULT_CREATOR);
                         break;
                     case DATASET_DECL:
-                        handleCreateDatasetStatement(metadataProvider, stmt, hcc, requestParameters);
+                        handleCreateDatasetStatement(metadataProvider, stmt, hcc, requestParameters,
+                                Creator.DEFAULT_CREATOR);
                         break;
                     case CREATE_INDEX:
                         handleCreateIndexStatement(metadataProvider, stmt, hcc, requestParameters);
@@ -460,7 +461,8 @@
                         handleDropSynonymStatement(metadataProvider, stmt, requestParameters);
                         break;
                     case CREATE_VIEW:
-                        handleCreateViewStatement(metadataProvider, stmt, stmtRewriter, requestParameters);
+                        handleCreateViewStatement(metadataProvider, stmt, stmtRewriter, requestParameters,
+                                Creator.DEFAULT_CREATOR);
                         break;
                     case VIEW_DROP:
                         handleViewDropStatement(metadataProvider, stmt, requestParameters);
@@ -693,7 +695,7 @@
                 }
             }
 
-            beforeTxnCommit(mdProvider, creator, databaseName, null, null, EntityDetails.EntityType.DATABASE);
+            beforeTxnCommit(mdProvider, creator, EntityDetails.newDatabase(databaseName));
             MetadataManager.INSTANCE.addDatabase(mdTxnCtx,
                     new Database(databaseName, false, MetadataUtil.PENDING_NO_OP, creator));
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -748,7 +750,7 @@
                             dvName);
                 }
             }
-            beforeTxnCommit(metadataProvider, creator, dbName, dvName, null, EntityDetails.EntityType.DATAVERSE);
+            beforeTxnCommit(metadataProvider, creator, EntityDetails.newDataverse(dbName, dvName));
             MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dbName,
                     dvName, stmtCreateDataverse.getFormat(), MetadataUtil.PENDING_NO_OP, creator));
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -797,7 +799,7 @@
     }
 
     public void handleCreateDatasetStatement(MetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
+            IHyracksClientConnection hcc, IRequestParameters requestParameters, Creator creator) throws Exception {
         DatasetDecl dd = (DatasetDecl) stmt;
         String datasetName = dd.getName().getValue();
         metadataProvider.validateDatabaseObjectName(dd.getNamespace(), datasetName, stmt.getSourceLocation());
@@ -847,7 +849,7 @@
         try {
             doCreateDatasetStatement(metadataProvider, dd, stmtActiveNamespace, datasetName, itemTypeNamespace,
                     itemTypeExpr, itemTypeName, metaItemTypeExpr, metaItemTypeNamespace, metaItemTypeName, hcc,
-                    requestParameters);
+                    requestParameters, creator);
             if (dd.getQuery() != null) {
                 final IResultSet resultSet = requestParameters.getResultSet();
                 final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery();
@@ -869,8 +871,8 @@
     protected Optional<? extends Dataset> doCreateDatasetStatement(MetadataProvider metadataProvider, DatasetDecl dd,
             Namespace namespace, String datasetName, Namespace itemTypeNamespace, TypeExpression itemTypeExpr,
             String itemTypeName, TypeExpression metaItemTypeExpr, Namespace metaItemTypeNamespace,
-            String metaItemTypeName, IHyracksClientConnection hcc, IRequestParameters requestParameters)
-            throws Exception {
+            String metaItemTypeName, IHyracksClientConnection hcc, IRequestParameters requestParameters,
+            Creator creator) throws Exception {
         DataverseName dataverseName = namespace.getDataverseName();
         String databaseName = namespace.getDatabaseName();
 
@@ -1043,7 +1045,7 @@
             dataset = (Dataset) createDataset(dd, databaseName, dataverseName, datasetName, itemTypeDatabaseName,
                     itemTypeDataverseName, itemTypeName, metaItemTypeDatabaseName, metaItemTypeDataverseName,
                     metaItemTypeName, dsType, compactionPolicy, compactionPolicyProperties, compressionScheme,
-                    datasetFormatInfo, datasetDetails, ngName);
+                    datasetFormatInfo, datasetDetails, ngName, creator);
             MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
 
             if (itemTypeIsInline) {
@@ -1077,6 +1079,8 @@
                     datasetName, requestParameters.isForceDropDataset());
             dataset.setPendingOp(MetadataUtil.PENDING_NO_OP);
             MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
+            beforeTxnCommit(metadataProvider, creator,
+                    EntityDetails.newDataset(databaseName, dataverseName, datasetName));
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -1144,11 +1148,11 @@
             String metaItemTypeDatabase, DataverseName metaItemTypeDataverseName, String metaItemTypeName,
             DatasetType dsType, String compactionPolicy, Map<String, String> compactionPolicyProperties,
             String compressionScheme, DatasetFormatInfo datasetFormatInfo, IDatasetDetails datasetDetails,
-            String ngName) throws AlgebricksException {
+            String ngName, Creator creator) throws AlgebricksException {
         return new Dataset(database, dataverseName, datasetName, itemTypeDatabase, itemTypeDataverseName, itemTypeName,
                 metaItemTypeDatabase, metaItemTypeDataverseName, metaItemTypeName, ngName, compactionPolicy,
                 compactionPolicyProperties, datasetDetails, dd.getHints(), dsType, DatasetIdFactory.generateDatasetId(),
-                MetadataUtil.PENDING_ADD_OP, compressionScheme, datasetFormatInfo);
+                MetadataUtil.PENDING_ADD_OP, compressionScheme, datasetFormatInfo, creator);
     }
 
     protected Triple<Namespace, String, Boolean> extractDatasetItemTypeName(Namespace datasetNamespace,
@@ -2758,7 +2762,7 @@
     }
 
     public void handleCreateViewStatement(MetadataProvider metadataProvider, Statement stmt,
-            IStatementRewriter stmtRewriter, IRequestParameters requestParameters) throws Exception {
+            IStatementRewriter stmtRewriter, IRequestParameters requestParameters, Creator creator) throws Exception {
         CreateViewStatement cvs = (CreateViewStatement) stmt;
         String viewName = cvs.getViewName();
         metadataProvider.validateDatabaseObjectName(cvs.getNamespace(), viewName, stmt.getSourceLocation());
@@ -2792,7 +2796,7 @@
                 null, false, null, null, true, DatasetType.VIEW, null, metadataProvider);
         try {
             doCreateView(metadataProvider, cvs, databaseName, dataverseName, viewName, itemTypeDatabaseName,
-                    viewItemTypeDataverseName, viewItemTypeName, stmtRewriter, requestParameters);
+                    viewItemTypeDataverseName, viewItemTypeName, stmtRewriter, requestParameters, creator);
         } finally {
             metadataProvider.getLocks().unlock();
             metadataProvider.setDefaultNamespace(activeNamespace);
@@ -2802,7 +2806,7 @@
     protected CreateResult doCreateView(MetadataProvider metadataProvider, CreateViewStatement cvs, String databaseName,
             DataverseName dataverseName, String viewName, String itemTypeDatabaseName,
             DataverseName itemTypeDataverseName, String itemTypeName, IStatementRewriter stmtRewriter,
-            IRequestParameters requestParameters) throws Exception {
+            IRequestParameters requestParameters, Creator creator) throws Exception {
         SourceLocation sourceLoc = cvs.getSourceLocation();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2968,10 +2972,10 @@
             ViewDetails viewDetails = new ViewDetails(cvs.getViewBody(), dependencies, cvs.getDefaultNull(),
                     primaryKeyFields, foreignKeys, datetimeFormat, dateFormat, timeFormat);
 
-            Dataset view =
-                    new Dataset(databaseName, dataverseName, viewName, itemTypeDatabaseName, itemTypeDataverseName,
-                            itemTypeName, MetadataConstants.METADATA_NODEGROUP_NAME, "", Collections.emptyMap(),
-                            viewDetails, Collections.emptyMap(), DatasetType.VIEW, 0, MetadataUtil.PENDING_NO_OP);
+            Dataset view = new Dataset(databaseName, dataverseName, viewName, itemTypeDatabaseName,
+                    itemTypeDataverseName, itemTypeName, MetadataConstants.METADATA_NODEGROUP_NAME, "",
+                    Collections.emptyMap(), viewDetails, Collections.emptyMap(), DatasetType.VIEW, 0,
+                    MetadataUtil.PENDING_NO_OP, creator);
             if (existingDataset == null) {
                 if (itemTypeIsInline) {
                     MetadataManager.INSTANCE.addDatatype(mdTxnCtx, itemTypeEntity);
@@ -2983,6 +2987,7 @@
                 }
                 MetadataManager.INSTANCE.updateDataset(mdTxnCtx, view);
             }
+            beforeTxnCommit(metadataProvider, creator, EntityDetails.newView(databaseName, dataverseName, viewName));
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             return existingDataset != null ? CreateResult.REPLACED : CreateResult.CREATED;
         } catch (Exception e) {
@@ -5480,6 +5485,7 @@
                             participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions));
                 }
             }
+
             jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, requestParameters.getClientContextId(), clientRequest);
             if (jId != null) {
                 jId.setValue(jobId);
@@ -5824,8 +5830,8 @@
         }
     }
 
-    protected void beforeTxnCommit(MetadataProvider metadataProvider, Creator creator, String databaseName,
-            DataverseName dataverseName, String objectName, Object objectType) throws AlgebricksException {
+    protected void beforeTxnCommit(MetadataProvider metadataProvider, Creator creator, EntityDetails entityDetails)
+            throws AlgebricksException {
         //no op
     }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index 841e81b..460565c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -114,9 +114,9 @@
         handler = new ActiveNotificationHandler();
         allDatasets = new ArrayList<>();
         firstDataset = new Dataset(database, dataverseName, "firstDataset", recordTypeDatabaseName, null, null, null,
-                null, null, null, null, null, 0, 0);
+                null, null, null, null, null, 0, 0, null);
         secondDataset = new Dataset(database, dataverseName, "secondDataset", recordTypeDatabaseName, null, null, null,
-                null, null, null, null, null, 0, 0);
+                null, null, null, null, null, 0, 0, null);
         allDatasets.add(firstDataset);
         allDatasets.add(secondDataset);
         AtomicInteger threadCounter = new AtomicInteger(0);
@@ -992,7 +992,7 @@
                 new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING));
         recoveringSubscriber.sync();
         Dataset newDataset = new Dataset(database, dataverseName, "newDataset", recordTypeDatabaseName, null, null,
-                null, null, null, null, null, null, 0, 0);
+                null, null, null, null, null, null, 0, 0, null);
         Action add = users[1].addDataset(newDataset, listener);
         listener.allowStep();
         runningSubscriber.sync();
@@ -1019,7 +1019,7 @@
         recoveringSubscriber.sync();
         tempFailSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
         Dataset newDataset = new Dataset(database, dataverseName, "newDataset", recordTypeDatabaseName, null, null,
-                null, null, null, null, null, null, 0, 0);
+                null, null, null, null, null, null, 0, 0, null);
         Action add = users[1].addDataset(newDataset, listener);
         listener.allowStep();
         tempFailSubscriber.sync();
@@ -1046,7 +1046,7 @@
         recoveringSubscriber.sync();
         tempFailSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
         Dataset newDataset = new Dataset(database, dataverseName, "newDataset", recordTypeDatabaseName, null, null,
-                null, null, null, null, null, null, 0, 0);
+                null, null, null, null, null, null, 0, 0, null);
         Action add = users[1].addDataset(newDataset, listener);
         listener.allowStep();
         tempFailSubscriber.sync();
@@ -1065,7 +1065,7 @@
                 new WaitForStateSubscriber(listener, Collections.singleton(ActivityState.STARTING));
         subscriber.sync();
         Dataset newDataset = new Dataset(database, dataverseName, "newDataset", recordTypeDatabaseName, null, null,
-                null, null, null, null, null, null, 0, 0);
+                null, null, null, null, null, null, 0, 0, null);
         Action createDatasetAction = users[1].addDataset(newDataset, listener);
         listener.allowStep();
         startAction.sync();
@@ -1081,7 +1081,7 @@
     public void testCreateNewDatasetWhileRunning() throws Exception {
         testStartWhenStartSucceed();
         Dataset newDataset = new Dataset(database, dataverseName, "newDataset", recordTypeDatabaseName, null, null,
-                null, null, null, null, null, null, 0, 0);
+                null, null, null, null, null, null, 0, 0, null);
         Action createDatasetAction = users[1].addDataset(newDataset, listener);
         createDatasetAction.sync();
         assertFailure(createDatasetAction, ErrorCode.CANNOT_ADD_DATASET_TO_ACTIVE_ENTITY);
@@ -1101,7 +1101,7 @@
                 new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.SUSPENDING, ActivityState.SUSPENDED));
         subscriber.sync();
         Dataset newDataset = new Dataset(database, dataverseName, "newDataset", recordTypeDatabaseName, null, null,
-                null, null, null, null, null, null, 0, 0);
+                null, null, null, null, null, null, 0, 0, null);
         Action createDatasetAction = users[0].addDataset(newDataset, listener);
         listener.allowStep();
         listener.allowStep();
@@ -1120,7 +1120,7 @@
         testRecoveryFailureAfterOneAttemptCompilationFailure();
         Assert.assertEquals(ActivityState.STOPPED, listener.getState());
         Dataset newDataset = new Dataset(database, dataverseName, "newDataset", recordTypeDatabaseName, null, null,
-                null, null, null, null, null, null, 0, 0);
+                null, null, null, null, null, null, 0, 0, null);
         Action createDatasetAction = users[0].addDataset(newDataset, listener);
         createDatasetAction.sync();
         assertSuccess(createDatasetAction);
@@ -1553,7 +1553,7 @@
         query.sync();
         assertSuccess(query);
         Dataset newDataset = new Dataset(database, dataverseName, "newDataset", recordTypeDatabaseName, null, null,
-                null, null, null, null, null, null, 0, 0);
+                null, null, null, null, null, null, 0, 0, null);
         Action addDataset = users[1].addDataset(newDataset, listener);
         // blocked by suspension
         Assert.assertFalse(addDataset.isDone());
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
index 210440d..b268b19 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
@@ -28,6 +28,7 @@
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.utils.Creator;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
@@ -54,7 +55,7 @@
         super(MetadataUtil.databaseFor(dataverseName), dataverseName, datasetName,
                 MetadataUtil.databaseFor(recordTypeDataverseName), recordTypeDataverseName, recordTypeName,
                 nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, hints, datasetType,
-                datasetId, pendingOp);
+                datasetId, pendingOp, Creator.DEFAULT_CREATOR);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
index 0b66676..0d3102a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
@@ -325,7 +325,7 @@
         Dataset dataset = new Dataset(source.getDatabaseName(), source.getDataverseName(), "ds_" + datasetPostfix,
                 source.getDatabaseName(), source.getDataverseName(), source.getDatasetType().name(),
                 source.getNodeGroupName(), NoMergePolicyFactory.NAME, null, source.getDatasetDetails(),
-                source.getHints(), DatasetConfig.DatasetType.INTERNAL, datasetPostfix, 0);
+                source.getHints(), DatasetConfig.DatasetType.INTERNAL, datasetPostfix, 0, source.getCreator());
         MetadataProvider metadataProvider = MetadataProvider.createWithDefaultNamespace(appCtx);
         final MetadataTransactionContext writeTxn = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(writeTxn);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
index 8d9a5f5..297f565 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
@@ -41,6 +41,7 @@
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.utils.Creator;
 import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
@@ -114,7 +115,7 @@
                     NODE_GROUP_NAME, NoMergePolicyFactory.NAME, null,
                     new InternalDatasetDetails(null, InternalDatasetDetails.PartitioningStrategy.HASH, partitioningKeys,
                             null, null, null, false, null, null),
-                    null, DatasetConfig.DatasetType.INTERNAL, DATASET_ID, 0);
+                    null, DatasetConfig.DatasetType.INTERNAL, DATASET_ID, 0, Creator.DEFAULT_CREATOR);
             // create dataset
             TestNodeController.PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE,
                     META_TYPE, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, 0);
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java
index c9f8640..eaf9cce 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java
@@ -263,12 +263,17 @@
                             Triple<DatasetFullyQualifiedName, Boolean, DatasetFullyQualifiedName> dsArgs =
                                     FunctionUtil.parseDatasetFunctionArguments(functionCall);
                             DatasetFullyQualifiedName datasetFullyQualifiedName = dsArgs.first;
-                            EntityDetails.EntityType entityType =
-                                    dsArgs.second ? EntityDetails.EntityType.VIEW : EntityDetails.EntityType.DATASET;
-                            metadataProvider
-                                    .addAccessedEntity(new EntityDetails(datasetFullyQualifiedName.getDatabaseName(),
-                                            datasetFullyQualifiedName.getDataverseName(),
-                                            datasetFullyQualifiedName.getDatasetName(), entityType));
+                            if (dsArgs.second) {
+                                metadataProvider.addAccessedEntity(
+                                        EntityDetails.newView(datasetFullyQualifiedName.getDatabaseName(),
+                                                datasetFullyQualifiedName.getDataverseName(),
+                                                datasetFullyQualifiedName.getDatasetName()));
+                            } else {
+                                metadataProvider.addAccessedEntity(
+                                        EntityDetails.newDataset(datasetFullyQualifiedName.getDatabaseName(),
+                                                datasetFullyQualifiedName.getDataverseName(),
+                                                datasetFullyQualifiedName.getDatasetName()));
+                            }
                             DatasetFullyQualifiedName synonymReference = dsArgs.third;
                             if (synonymReference != null) {
                                 // resolved via synonym -> store synonym name as a dependency
@@ -290,8 +295,8 @@
                     } else {
                         if (seenFunctions.add(signature)) {
                             String functionName = signature.getName() + "(" + signature.getArity() + ")";
-                            metadataProvider.addAccessedEntity(new EntityDetails(signature.getDatabaseName(),
-                                    signature.getDataverseName(), functionName, EntityDetails.EntityType.FUNCTION));
+                            metadataProvider.addAccessedEntity(EntityDetails.newFunction(signature.getDatabaseName(),
+                                    signature.getDataverseName(), functionName));
                             outFunctionDependencies.add(new DependencyFullyQualifiedName(signature.getDatabaseName(),
                                     signature.getDataverseName(), signature.getName(),
                                     Integer.toString(signature.getArity())));
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppLoadAccessedDataset.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppLoadAccessedDataset.java
index 2f3323f..60a7f28 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppLoadAccessedDataset.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppLoadAccessedDataset.java
@@ -84,8 +84,13 @@
                 }
             }
 
-            context.getMetadataProvider()
-                    .addAccessedEntity(new EntityDetails(databaseName, dataverseName, datasetName, entityType));
+            if (entityType == EntityDetails.EntityType.VIEW) {
+                context.getMetadataProvider()
+                        .addAccessedEntity(EntityDetails.newView(databaseName, dataverseName, datasetName));
+            } else {
+                context.getMetadataProvider()
+                        .addAccessedEntity(EntityDetails.newDataset(databaseName, dataverseName, datasetName));
+            }
 
         } else {
             FunctionSignature signature = expression.getFunctionSignature();
@@ -94,8 +99,8 @@
                 return;
             }
             String functionName = signature.getName() + "(" + signature.getArity() + ")";
-            context.getMetadataProvider().addAccessedEntity(new EntityDetails(signature.getDatabaseName(),
-                    signature.getDataverseName(), functionName, EntityDetails.EntityType.FUNCTION));
+            context.getMetadataProvider().addAccessedEntity(
+                    EntityDetails.newFunction(signature.getDatabaseName(), signature.getDataverseName(), functionName));
         }
     }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
index 1901798..7ec1b51 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
@@ -165,7 +165,7 @@
 
     public void dropDataset(String database, DataverseName dataverseName, String datasetName) {
         Dataset dataset = new Dataset(database, dataverseName, datasetName, null, null, null, null, null, null, null,
-                null, null, -1, MetadataUtil.PENDING_NO_OP);
+                null, null, -1, MetadataUtil.PENDING_NO_OP, null);
         droppedCache.addDatasetIfNotExists(dataset);
         logAndApply(new MetadataLogicalOperation(dataset, false));
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 71458f9..901ffdf 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -71,6 +71,7 @@
 import org.apache.asterix.metadata.entities.Node;
 import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
+import org.apache.asterix.metadata.utils.Creator;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.BuiltinTypeMap;
 import org.apache.asterix.om.types.IAType;
@@ -221,13 +222,12 @@
             IDatasetDetails id = new InternalDatasetDetails(FileStructure.BTREE, PartitioningStrategy.HASH,
                     indexes[i].getPartitioningExpr(), indexes[i].getPartitioningExpr(), null,
                     indexes[i].getPartitioningExprType(), false, null, null);
-            MetadataManager.INSTANCE.addDataset(mdTxnCtx,
-                    new Dataset(indexes[i].getDatabaseName(), indexes[i].getDataverseName(),
-                            indexes[i].getIndexedDatasetName(), indexes[i].getDatabaseName(),
-                            indexes[i].getDataverseName(), indexes[i].getPayloadRecordType().getTypeName(),
-                            indexes[i].getNodeGroupName(), StorageConstants.DEFAULT_COMPACTION_POLICY_NAME,
-                            StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES, id, new HashMap<>(),
-                            DatasetType.INTERNAL, indexes[i].getDatasetId().getId(), MetadataUtil.PENDING_NO_OP));
+            MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(indexes[i].getDatabaseName(),
+                    indexes[i].getDataverseName(), indexes[i].getIndexedDatasetName(), indexes[i].getDatabaseName(),
+                    indexes[i].getDataverseName(), indexes[i].getPayloadRecordType().getTypeName(),
+                    indexes[i].getNodeGroupName(), StorageConstants.DEFAULT_COMPACTION_POLICY_NAME,
+                    StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES, id, new HashMap<>(), DatasetType.INTERNAL,
+                    indexes[i].getDatasetId().getId(), MetadataUtil.PENDING_NO_OP, Creator.DEFAULT_CREATOR));
         }
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("Finished inserting initial datasets.");
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 132efbc..cae1385 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -63,6 +63,7 @@
 import org.apache.asterix.metadata.declared.ArrayBTreeResourceFactoryProvider;
 import org.apache.asterix.metadata.declared.BTreeResourceFactoryProvider;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.utils.Creator;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.IndexUtil;
 import org.apache.asterix.metadata.utils.InvertedIndexResourceFactoryProvider;
@@ -166,15 +167,16 @@
     private final String compressionScheme;
     private final DatasetFullyQualifiedName datasetFullyQualifiedName;
     private final DatasetFormatInfo datasetFormatInfo;
+    private final Creator creator;
 
     public Dataset(String databaseName, DataverseName dataverseName, String datasetName, String recordTypeDatabaseName,
             DataverseName recordTypeDataverseName, String recordTypeName, String nodeGroupName, String compactionPolicy,
             Map<String, String> compactionPolicyProperties, IDatasetDetails datasetDetails, Map<String, String> hints,
-            DatasetType datasetType, int datasetId, int pendingOp) {
+            DatasetType datasetType, int datasetId, int pendingOp, Creator creator) {
         this(databaseName, dataverseName, datasetName, recordTypeDatabaseName, recordTypeDataverseName, recordTypeName,
                 null, /*metaTypeDataverseName*/null, /*metaTypeName*/null, nodeGroupName, compactionPolicy,
                 compactionPolicyProperties, datasetDetails, hints, datasetType, datasetId, pendingOp,
-                CompressionManager.NONE, DatasetFormatInfo.SYSTEM_DEFAULT);
+                CompressionManager.NONE, DatasetFormatInfo.SYSTEM_DEFAULT, creator);
     }
 
     public Dataset(String databaseName, DataverseName dataverseName, String datasetName, String itemTypeDatabaseName,
@@ -182,11 +184,11 @@
             DataverseName metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName,
             String compactionPolicy, Map<String, String> compactionPolicyProperties, IDatasetDetails datasetDetails,
             Map<String, String> hints, DatasetType datasetType, int datasetId, int pendingOp, String compressionScheme,
-            DatasetFormatInfo datasetFormatInfo) {
+            DatasetFormatInfo datasetFormatInfo, Creator creator) {
         this(databaseName, dataverseName, datasetName, itemTypeDatabaseName, itemTypeDataverseName, itemTypeName,
                 metaItemTypeDatabaseName, metaItemTypeDataverseName, metaItemTypeName, nodeGroupName, compactionPolicy,
                 compactionPolicyProperties, datasetDetails, hints, datasetType, datasetId, pendingOp, 0L,
-                compressionScheme, datasetFormatInfo);
+                compressionScheme, datasetFormatInfo, creator);
     }
 
     public Dataset(Dataset dataset) {
@@ -195,7 +197,7 @@
                 dataset.metaTypeDataverseName, dataset.metaTypeName, dataset.nodeGroupName,
                 dataset.compactionPolicyFactory, dataset.compactionPolicyProperties, dataset.datasetDetails,
                 dataset.hints, dataset.datasetType, dataset.datasetId, dataset.pendingOp, dataset.rebalanceCount,
-                dataset.compressionScheme, dataset.datasetFormatInfo);
+                dataset.compressionScheme, dataset.datasetFormatInfo, dataset.creator);
     }
 
     public Dataset(String databaseName, DataverseName dataverseName, String datasetName, String itemTypeDatabaseName,
@@ -203,7 +205,7 @@
             DataverseName metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName,
             String compactionPolicy, Map<String, String> compactionPolicyProperties, IDatasetDetails datasetDetails,
             Map<String, String> hints, DatasetType datasetType, int datasetId, int pendingOp, long rebalanceCount,
-            String compressionScheme, DatasetFormatInfo datasetFormatInfo) {
+            String compressionScheme, DatasetFormatInfo datasetFormatInfo, Creator creator) {
         this.databaseName = Objects.requireNonNull(databaseName);
         this.dataverseName = dataverseName;
         this.datasetName = datasetName;
@@ -225,6 +227,7 @@
         this.compressionScheme = compressionScheme;
         this.datasetFullyQualifiedName = new DatasetFullyQualifiedName(databaseName, dataverseName, datasetName);
         this.datasetFormatInfo = datasetFormatInfo;
+        this.creator = creator;
     }
 
     public String getDatabaseName() {
@@ -301,6 +304,10 @@
         return rebalanceCount;
     }
 
+    public Creator getCreator() {
+        return creator;
+    }
+
     public boolean hasMetaPart() {
         return metaTypeDataverseName != null && metaTypeName != null;
     }
@@ -395,7 +402,7 @@
                             getMetaItemTypeDataverseName(), getMetaItemTypeName(), getNodeGroupName(),
                             getCompactionPolicy(), getCompactionPolicyProperties(), getDatasetDetails(), getHints(),
                             getDatasetType(), getDatasetId(), MetadataUtil.PENDING_DROP_OP, getCompressionScheme(),
-                            getDatasetFormatInfo()));
+                            getDatasetFormatInfo(), creator));
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
             bActiveTxn.setValue(false);
@@ -856,7 +863,7 @@
                 this.metaTypeDataverseName, this.metaTypeName, targetNodeGroupName, this.compactionPolicyFactory,
                 this.compactionPolicyProperties, this.datasetDetails, this.hints, this.datasetType,
                 DatasetIdFactory.generateAlternatingDatasetId(this.datasetId), this.pendingOp, this.rebalanceCount + 1,
-                this.compressionScheme, this.datasetFormatInfo);
+                this.compressionScheme, this.datasetFormatInfo, creator);
     }
 
     // Gets an array of partition numbers for this dataset.
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/EntityDetails.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/EntityDetails.java
index 9493349..532d107 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/EntityDetails.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/EntityDetails.java
@@ -20,6 +20,10 @@
 
 import org.apache.asterix.common.metadata.DataverseName;
 
+/**
+ * This class provides static factory methods for creating entity details.
+ */
+
 public class EntityDetails {
 
     public enum EntityType {
@@ -33,15 +37,35 @@
     private final String databaseName;
     private final DataverseName dataverseName;
     private final String entityName;
-    private final EntityType entityType;
+    private EntityType entityType;
 
-    public EntityDetails(String databaseName, DataverseName dataverseName, String entityName, EntityType entityType) {
+    private EntityDetails(String databaseName, DataverseName dataverseName, String entityName, EntityType entityType) {
         this.databaseName = databaseName;
         this.dataverseName = dataverseName;
         this.entityName = entityName;
         this.entityType = entityType;
     }
 
+    public static EntityDetails newDatabase(String databaseName) {
+        return new EntityDetails(databaseName, null, null, EntityType.DATABASE);
+    }
+
+    public static EntityDetails newDataverse(String databaseName, DataverseName dataverseName) {
+        return new EntityDetails(databaseName, dataverseName, null, EntityType.DATAVERSE);
+    }
+
+    public static EntityDetails newDataset(String databaseName, DataverseName dataverseName, String datasetName) {
+        return new EntityDetails(databaseName, dataverseName, datasetName, EntityType.DATASET);
+    }
+
+    public static EntityDetails newView(String databaseName, DataverseName dataverseName, String viewName) {
+        return new EntityDetails(databaseName, dataverseName, viewName, EntityType.VIEW);
+    }
+
+    public static EntityDetails newFunction(String databaseName, DataverseName dataverseName, String functionName) {
+        return new EntityDetails(databaseName, dataverseName, functionName, EntityType.FUNCTION);
+    }
+
     public String getDatabaseName() {
         return databaseName;
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index 7613dd3..191da69 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -52,6 +52,7 @@
 import org.apache.asterix.metadata.entities.InternalDatasetDetails.FileStructure;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
 import org.apache.asterix.metadata.entities.ViewDetails;
+import org.apache.asterix.metadata.utils.Creator;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.om.base.ABoolean;
 import org.apache.asterix.om.base.ADateTime;
@@ -412,11 +413,12 @@
         long rebalanceCount = getRebalanceCount(datasetRecord);
         String compressionScheme = getCompressionScheme(datasetRecord);
         DatasetFormatInfo datasetFormatInfo = getDatasetFormatInfo(datasetRecord);
+        Creator creator = Creator.createOrDefault(datasetRecord);
 
         return new Dataset(databaseName, dataverseName, datasetName, itemTypeDatabaseName, typeDataverseName, typeName,
                 metaItemTypeDatabaseName, metaTypeDataverseName, metaTypeName, nodeGroupName, compactionPolicy.first,
                 compactionPolicy.second, datasetDetails, hints, datasetType, datasetId, pendingOp, rebalanceCount,
-                compressionScheme, datasetFormatInfo);
+                compressionScheme, datasetFormatInfo, creator);
     }
 
     protected Pair<String, Map<String, String>> readCompactionPolicy(DatasetType datasetType, ARecord datasetRecord) {
@@ -680,6 +682,7 @@
         writeBlockLevelStorageCompression(dataset);
         writeOpenDetails(dataset);
         writeDatasetFormatInfo(dataset);
+        writeDatasetCreator(dataset);
     }
 
     private void writeOpenDetails(Dataset dataset) throws HyracksDataException {
@@ -863,4 +866,35 @@
 
         propertyRecordBuilder.write(out, true);
     }
+
+    private void writeDatasetCreator(Dataset dataset) throws HyracksDataException {
+        if (datasetEntity.databaseNameIndex() >= 0) {
+            Creator creatorInfo = dataset.getCreator();
+            RecordBuilder creatorObject = new RecordBuilder();
+            creatorObject.reset(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+
+            fieldName.reset();
+            aString.setValue(MetadataRecordTypes.FIELD_NAME_CREATOR_NAME);
+            stringSerde.serialize(aString, fieldName.getDataOutput());
+            fieldValue.reset();
+            aString.setValue(creatorInfo.getName());
+            stringSerde.serialize(aString, fieldValue.getDataOutput());
+            creatorObject.addField(fieldName, fieldValue);
+
+            fieldName.reset();
+            aString.setValue(MetadataRecordTypes.FIELD_NAME_CREATOR_UUID);
+            stringSerde.serialize(aString, fieldName.getDataOutput());
+            fieldValue.reset();
+            aString.setValue(creatorInfo.getUuid());
+            stringSerde.serialize(aString, fieldValue.getDataOutput());
+            creatorObject.addField(fieldName, fieldValue);
+
+            fieldName.reset();
+            aString.setValue(MetadataRecordTypes.CREATOR_ARECORD_FIELD_NAME);
+            stringSerde.serialize(aString, fieldName.getDataOutput());
+            fieldValue.reset();
+            creatorObject.write(fieldValue.getDataOutput(), true);
+            recordBuilder.addField(fieldName, fieldValue);
+        }
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
index 2832470..788231c 100644
--- a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
+++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails.FileStructure;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.metadata.utils.Creator;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.runtime.compression.CompressionManager;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -64,7 +65,7 @@
             Dataset dataset = new Dataset(db, dv, "log", itemTypeDb, itemTypeDv, "LogType", metaTypeDb, metaTypeDv,
                     "MetaType", "DEFAULT_NG_ALL_NODES", "prefix", compactionPolicyProperties, details,
                     Collections.emptyMap(), DatasetType.INTERNAL, 115, 0, CompressionManager.NONE,
-                    DatasetFormatInfo.SYSTEM_DEFAULT);
+                    DatasetFormatInfo.SYSTEM_DEFAULT, Creator.DEFAULT_CREATOR);
             DatasetTupleTranslator dtTranslator = new DatasetTupleTranslator(true, DatasetEntity.of(false));
             ITupleReference tuple = dtTranslator.getTupleFromMetadataEntity(dataset);
             Dataset deserializedDataset = dtTranslator.getMetadataEntityFromTuple(tuple);
diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
index 00f188d..507b3bd 100644
--- a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
+++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
@@ -41,6 +41,7 @@
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails.FileStructure;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.metadata.utils.Creator;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
@@ -76,7 +77,7 @@
             Dataset dataset = new Dataset(dvTestDatabase, dvTest, "d1", itemTypeDatabase, dvFoo, "LogType",
                     metaTypeDatabase, dvCB, "MetaType", "DEFAULT_NG_ALL_NODES", "prefix", compactionPolicyProperties,
                     details, Collections.emptyMap(), DatasetType.INTERNAL, 115, 0, CompressionManager.NONE,
-                    DatasetFormatInfo.SYSTEM_DEFAULT);
+                    DatasetFormatInfo.SYSTEM_DEFAULT, Creator.DEFAULT_CREATOR);
 
             Index index = new Index(dvTestDatabase, dvTest, "d1", "i1", IndexType.BTREE,
                     Collections.singletonList(Collections.singletonList("row_id")),