remove dataverse folder when dataverse is dropped
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 428781f..66a3980 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -64,6 +64,7 @@
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
 import edu.uci.ics.asterix.file.DatasetOperations;
+import edu.uci.ics.asterix.file.DataverseOperations;
 import edu.uci.ics.asterix.file.FeedOperations;
 import edu.uci.ics.asterix.file.IndexOperations;
 import edu.uci.ics.asterix.formats.base.IDataFormat;
@@ -765,6 +766,7 @@
                     jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
                 }
             }
+            jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
 
             //#. mark PendingDropOp on the dataverse record by 
             //   first, deleting the dataverse record from the DATAVERSE_DATASET
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DataverseOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DataverseOperations.java
new file mode 100644
index 0000000..0654ff7
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DataverseOperations.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.asterix.file;
+
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class DataverseOperations {
+    public static JobSpecification createDropDataverseJobSpec(Dataverse dataverse, AqlMetadataProvider metadata) {
+        JobSpecification jobSpec = new JobSpecification();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForDataverse(dataverse.getDataverseName());
+        FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, splitsAndConstraint.second);
+        jobSpec.addRoot(frod);
+        return jobSpec;
+    }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index fa42aed..4773ed0 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -1299,6 +1299,17 @@
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
             String dataverseName, String datasetName, String targetIdxName) throws AlgebricksException {
         FileSplit[] splits = splitsForInternalOrFeedDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName);
+        return splitProviderAndPartitionConstraints(splits);
+    }
+
+    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse(
+            String dataverse) {
+        FileSplit[] splits = splitsForDataverse(mdTxnCtx, dataverse);
+        return splitProviderAndPartitionConstraints(splits);
+    }
+
+    private Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraints(
+            FileSplit[] splits) {
         IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
         String[] loc = new String[splits.length];
         for (int p = 0; p < splits.length; p++) {
@@ -1308,6 +1319,23 @@
         return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
     }
 
+    private FileSplit[] splitsForDataverse(MetadataTransactionContext mdTxnCtx, String dataverseName) {
+        File relPathFile = new File(dataverseName);
+        List<FileSplit> splits = new ArrayList<FileSplit>();
+        for (Map.Entry<String, String[]> entry : stores.entrySet()) {
+            String node = entry.getKey();
+            String[] nodeStores = entry.getValue();
+            if (nodeStores == null) {
+                continue;
+            }
+            for (int i = 0; i < nodeStores.length; i++) {
+                File f = new File(nodeStores[i] + File.separator + relPathFile);
+                splits.add(new FileSplit(node, new FileReference(f)));
+            }
+        }
+        return splits.toArray(new FileSplit[] {});
+    }
+
     private FileSplit[] splitsForInternalOrFeedDataset(MetadataTransactionContext mdTxnCtx, String dataverseName,
             String datasetName, String targetIdxName) throws AlgebricksException {