[NO ISSUE][TX] Maintain Txn Accessed Datasets

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- For every transaction, maintain a list of datasets
  the transaction is attempting to access.

Change-Id: I54cb04ac313d73981dd43d575875fa003fcd3bb9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3180
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
index e4e300a..e929ef0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
@@ -55,6 +55,8 @@
 import org.apache.asterix.testframework.xml.TestCase.CompilationUnit;
 import org.apache.asterix.testframework.xml.TestGroup;
 import org.junit.Assert;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import junit.extensions.PA;
 
@@ -131,7 +133,16 @@
             when(metadataProvider.getDefaultDataverseName()).thenReturn(dvName);
             when(metadataProvider.getConfig()).thenReturn(config);
             when(config.get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS)).thenReturn("true");
-            when(metadataProvider.findDataset(anyString(), anyString())).thenReturn(mock(Dataset.class));
+            when(metadataProvider.findDataset(anyString(), anyString())).thenAnswer(new Answer<Dataset>() {
+                @Override
+                public Dataset answer(InvocationOnMock invocation) {
+                    Object[] args = invocation.getArguments();
+                    final Dataset mockDataset = mock(Dataset.class);
+                    String fullyQualifiedName = args[0] != null ? args[0] + "." + args[1] : (String) args[1];
+                    when(mockDataset.getFullyQualifiedName()).thenReturn(fullyQualifiedName);
+                    return mockDataset;
+                }
+            });
 
             for (Statement st : statements) {
                 if (st.getKind() == Statement.Kind.QUERY) {
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/VariableCheckAndRewriteVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/VariableCheckAndRewriteVisitor.java
index 50b13e2..45a18e8 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/VariableCheckAndRewriteVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/VariableCheckAndRewriteVisitor.java
@@ -44,6 +44,7 @@
 import org.apache.asterix.lang.sqlpp.visitor.CheckDatasetOnlyResolutionVisitor;
 import org.apache.asterix.lang.sqlpp.visitor.base.AbstractSqlppExpressionScopingVisitor;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -146,12 +147,13 @@
 
     private Expression resolveAsDataset(String dataverseName, String datasetName, SourceLocation sourceLoc)
             throws CompilationException {
-        if (!datasetExists(dataverseName, datasetName, sourceLoc)) {
+        Dataset dataset = findDataset(dataverseName, datasetName, sourceLoc);
+        if (dataset == null) {
             throwUnresolvableError(dataverseName, datasetName, sourceLoc);
         }
-        String fullyQualifiedName = dataverseName == null ? datasetName : dataverseName + "." + datasetName;
+        metadataProvider.addAccessedDataset(dataset);
         List<Expression> argList = new ArrayList<>(1);
-        argList.add(new LiteralExpr(new StringLiteral(fullyQualifiedName)));
+        argList.add(new LiteralExpr(new StringLiteral(dataset.getFullyQualifiedName())));
         CallExpr callExpr = new CallExpr(new FunctionSignature(BuiltinFunctions.DATASET), argList);
         callExpr.setSourceLocation(sourceLoc);
         return callExpr;
@@ -186,22 +188,28 @@
         return parent.accept(visitor, originalExpressionWithUndefinedIdentifier);
     }
 
-    private boolean datasetExists(String dataverseName, String datasetName, SourceLocation sourceLoc)
+    private Dataset findDataset(String dataverseName, String datasetName, SourceLocation sourceLoc)
             throws CompilationException {
         try {
-            return metadataProvider.findDataset(dataverseName, datasetName) != null
-                    || fullyQualifiedDatasetNameExists(datasetName);
+            Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+            if (dataset != null) {
+                return dataset;
+            }
+            return findDatasetByFullyQualifiedName(datasetName);
         } catch (AlgebricksException e) {
             throw new CompilationException(ErrorCode.COMPILATION_ERROR, e, sourceLoc, e.getMessage());
         }
     }
 
-    private boolean fullyQualifiedDatasetNameExists(String name) throws AlgebricksException {
+    private Dataset findDatasetByFullyQualifiedName(String name) throws AlgebricksException {
         if (name.indexOf('.') < 0) {
-            return false;
+            return null;
         }
         String[] path = StringUtils.split(name, '.');
-        return path.length == 2 && metadataProvider.findDataset(path[0], path[1]) != null;
+        if (path.length != 2) {
+            return null;
+        }
+        return metadataProvider.findDataset(path[0], path[1]);
     }
 
     @Override
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 85beb95..adfaa89 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -21,10 +21,13 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
@@ -124,9 +127,9 @@
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.data.std.primitive.ShortPointable;
 import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -151,6 +154,7 @@
     private final IFunctionManager functionManager;
     private final LockList locks;
     private final Map<String, Object> config;
+    private final Set<Dataset> txnAccessedDatasets;
 
     private Dataverse defaultDataverse;
     private MetadataTransactionContext mdTxnCtx;
@@ -173,6 +177,7 @@
         functionManager = ((IFunctionExtensionManager) appCtx.getExtensionManager()).getFunctionManager();
         locks = new LockList();
         config = new HashMap<>();
+        txnAccessedDatasets = new HashSet<>();
     }
 
     @SuppressWarnings("unchecked")
@@ -233,6 +238,7 @@
 
     public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) {
         this.mdTxnCtx = mdTxnCtx;
+        txnAccessedDatasets.clear();
     }
 
     public MetadataTransactionContext getMetadataTxnContext() {
@@ -1621,4 +1627,12 @@
     public ICompressionManager getCompressionManager() {
         return appCtx.getCompressionManager();
     }
+
+    public void addAccessedDataset(Dataset dataset) {
+        txnAccessedDatasets.add(dataset);
+    }
+
+    public Set<Dataset> getAccssedDatasets() {
+        return Collections.unmodifiableSet(txnAccessedDatasets);
+    }
 }