First phase of metadata/compiler cleanup.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_fix_issue_96@441 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index 6b305e2..10a093b 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -4,11 +4,11 @@
 import java.util.List;
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl.IndexKind;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.optimizer.rules.am.BTreeJobGenParams;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -61,18 +61,18 @@
         }
         BTreeJobGenParams jobGenParams = new BTreeJobGenParams();
         jobGenParams.readFromFuncArgs(unnestFuncExpr.getArguments());
-        if (jobGenParams.getIndexKind() != IndexKind.BTREE) {
-            throw new NotImplementedException(jobGenParams.getIndexKind() + " indexes are not implemented.");
+        if (jobGenParams.getIndexType() != IndexType.BTREE) {
+            throw new NotImplementedException(jobGenParams.getIndexType() + " indexes are not implemented.");
         }
         int[] lowKeyIndexes = getKeyIndexes(jobGenParams.getLowKeyVarList(), inputSchemas);
         int[] highKeyIndexes = getKeyIndexes(jobGenParams.getHighKeyVarList(), inputSchemas);
         AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
         AqlCompiledMetadataDeclarations metadata = metadataProvider.getMetadataDeclarations();
-        AqlCompiledDatasetDecl datasetDecl = metadata.findDataset(jobGenParams.getDatasetName());
-        if (datasetDecl == null) {
+        Dataset dataset = metadata.findDataset(jobGenParams.getDatasetName());
+        if (dataset == null) {
             throw new AlgebricksException("Unknown dataset " + jobGenParams.getDatasetName());
         }
-        if (datasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
+        if (dataset.getType() == DatasetType.EXTERNAL) {
             throw new AlgebricksException("Trying to run btree search over external dataset ("
                     + jobGenParams.getDatasetName() + ").");
         }
@@ -84,7 +84,7 @@
         }
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = AqlMetadataProvider.buildBtreeRuntime(
                 builder.getJobSpec(), outputVars, opSchema, typeEnv, metadata, context, jobGenParams.getRetainInput(),
-                jobGenParams.getDatasetName(), datasetDecl, jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes,
+                jobGenParams.getDatasetName(), dataset, jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes,
                 jobGenParams.isLowKeyInclusive(), jobGenParams.isHighKeyInclusive());
         builder.contributeHyracksOperator(unnestMap, btreeSearch.first);
         builder.contributeAlgebricksPartitionConstraint(btreeSearch.first, btreeSearch.second);
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index f6c49ab..2d21db8 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -5,12 +5,12 @@
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
-import edu.uci.ics.asterix.metadata.MetadataException;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.base.IAObject;
 import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
@@ -84,11 +84,11 @@
         
         AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
         AqlCompiledMetadataDeclarations metadata = metadataProvider.getMetadataDeclarations();
-        AqlCompiledDatasetDecl datasetDecl = metadata.findDataset(jobGenParams.getDatasetName());
-        if (datasetDecl == null) {
+        Dataset dataset = metadata.findDataset(jobGenParams.getDatasetName());
+        if (dataset == null) {
             throw new AlgebricksException("Unknown dataset " + jobGenParams.getDatasetName());
         }
-        if (datasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
+        if (dataset.getType() == DatasetType.EXTERNAL) {
             throw new AlgebricksException("Trying to run inverted index search over external dataset (" + jobGenParams.getDatasetName() + ").");
         }
         int[] keyIndexes = getKeyIndexes(jobGenParams.getKeyVarList(), inputSchemas);
@@ -96,7 +96,7 @@
         // Build runtime.
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> invIndexSearch = buildInvertedIndexRuntime(metadata,
                 context, builder.getJobSpec(), unnestMapOp, opSchema, jobGenParams.getRetainInput(),
-                jobGenParams.getDatasetName(), datasetDecl, jobGenParams.getIndexName(),
+                jobGenParams.getDatasetName(), dataset, jobGenParams.getIndexName(),
                 jobGenParams.getSearchKeyType(), keyIndexes, jobGenParams.getSearchModifierType(),
                 jobGenParams.getSimilarityThreshold());
         // Contribute operator in hyracks job.
@@ -110,19 +110,19 @@
             AqlCompiledMetadataDeclarations metadata, JobGenContext context,
             JobSpecification jobSpec, UnnestMapOperator unnestMap,
             IOperatorSchema opSchema, boolean retainInput, String datasetName,
-            AqlCompiledDatasetDecl datasetDecl, String indexName,
+            Dataset dataset, String indexName,
             ATypeTag searchKeyType, int[] keyFields, SearchModifierType searchModifierType,
             IAlgebricksConstantValue similarityThreshold) throws AlgebricksException {
         IAObject simThresh = ((AsterixConstantValue)similarityThreshold).getObject();
-        String itemTypeName = datasetDecl.getItemTypeName();
+        String itemTypeName = dataset.getDatatypeName();
         IAType itemType = metadata.findType(itemTypeName);
-        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(datasetDecl).size();
-        AqlCompiledIndexDecl index = DatasetUtils.findSecondaryIndexByName(datasetDecl, indexName);
-        if (index == null) {
+        int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+        Index secondaryIndex = metadata.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
+        if (secondaryIndex == null) {
             throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
                     + datasetName);
         }
-        List<String> secondaryKeyFields = index.getFieldExprs();
+        List<String> secondaryKeyFields = secondaryIndex.getKeyFieldNames();
         int numSecondaryKeys = secondaryKeyFields.size();
         if (numSecondaryKeys != 1) {
             throw new AlgebricksException(
@@ -162,23 +162,18 @@
         ITypeTraits[] invListsTypeTraits = JobGenHelper.variablesToTypeTraits(outputVars, start, numPrimaryKeys, typeEnv, context);
         
         IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();        
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint;
-        try {
-            secondarySplitsAndConstraint = metadata
-                    .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
-                            datasetName, indexName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-        
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
         Pair<IFileSplitProvider, IFileSplitProvider> fileSplitProviders = metadata
                 .getInvertedIndexFileSplitProviders(secondarySplitsAndConstraint.first);
         
         // TODO: Here we assume there is only one search key field.
         int queryField = keyFields[0];
         // Get tokenizer and search modifier factories.
-        IInvertedIndexSearchModifierFactory searchModifierFactory = InvertedIndexAccessMethod.getSearchModifierFactory(searchModifierType, simThresh, index);
-        IBinaryTokenizerFactory queryTokenizerFactory = InvertedIndexAccessMethod.getBinaryTokenizerFactory(searchModifierType, searchKeyType, index);
+        IInvertedIndexSearchModifierFactory searchModifierFactory = InvertedIndexAccessMethod.getSearchModifierFactory(
+                searchModifierType, simThresh, secondaryIndex);
+        IBinaryTokenizerFactory queryTokenizerFactory = InvertedIndexAccessMethod.getBinaryTokenizerFactory(
+                searchModifierType, searchKeyType, secondaryIndex);
 		InvertedIndexSearchOperatorDescriptor invIndexSearchOp = new InvertedIndexSearchOperatorDescriptor(
 				jobSpec, queryField, appContext.getStorageManagerInterface(),
 				fileSplitProviders.first, fileSplitProviders.second,
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/RTreeSearchPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/RTreeSearchPOperator.java
index 1832343..587b845 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/RTreeSearchPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/RTreeSearchPOperator.java
@@ -1,11 +1,11 @@
 package edu.uci.ics.asterix.algebra.operators.physical;
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl.IndexKind;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.optimizer.rules.am.RTreeJobGenParams;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -56,23 +56,23 @@
 
         RTreeJobGenParams jobGenParams = new RTreeJobGenParams();
         jobGenParams.readFromFuncArgs(unnestFuncExpr.getArguments());
-        if (jobGenParams.getIndexKind() != IndexKind.RTREE) {
-            throw new NotImplementedException(jobGenParams.getIndexKind() + " indexes are not implemented.");
+        if (jobGenParams.getIndexType() != IndexType.RTREE) {
+            throw new NotImplementedException(jobGenParams.getIndexType() + " indexes are not implemented.");
         }
 
         int[] keyIndexes = getKeyIndexes(jobGenParams.getKeyVarList(), inputSchemas);
         AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
         AqlCompiledMetadataDeclarations metadata = mp.getMetadataDeclarations();
-        AqlCompiledDatasetDecl adecl = metadata.findDataset(jobGenParams.getDatasetName());
-        if (adecl == null) {
+        Dataset dataset = metadata.findDataset(jobGenParams.getDatasetName());
+        if (dataset == null) {
             throw new AlgebricksException("Unknown dataset " + jobGenParams.getDatasetName());
         }
-        if (adecl.getDatasetType() == DatasetType.EXTERNAL) {
+        if (dataset.getType() == DatasetType.EXTERNAL) {
             throw new AlgebricksException("Trying to run rtree search over external dataset ("
                     + jobGenParams.getDatasetName() + ").");
         }
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> rtreeSearch = AqlMetadataProvider.buildRtreeRuntime(
-                metadata, context, builder.getJobSpec(), jobGenParams.getDatasetName(), adecl,
+                metadata, context, builder.getJobSpec(), jobGenParams.getDatasetName(), dataset,
                 jobGenParams.getIndexName(), keyIndexes);
         builder.contributeHyracksOperator(unnestMap, rtreeSearch.first);
         builder.contributeAlgebricksPartitionConstraint(rtreeSearch.first, rtreeSearch.second);
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index dc5d659..43e8d00 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -8,14 +8,14 @@
 
 import edu.uci.ics.asterix.aql.util.FunctionUtils;
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl.IndexKind;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
 import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
 import edu.uci.ics.asterix.metadata.declared.AqlIndex;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
-import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.om.base.AInt32;
 import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
@@ -92,30 +92,32 @@
         AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
         AqlCompiledMetadataDeclarations metadata = mp.getMetadataDeclarations();
         String datasetName = datasetSource.getId().getDatasetName();
-        AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
-        if (adecl == null) {
+        Dataset dataset = metadata.findDataset(datasetName);
+        if (dataset == null) {
             throw new AlgebricksException("Unknown dataset " + datasetName);
         }
-        if (adecl.getDatasetType() == DatasetType.EXTERNAL) {
+        if (dataset.getType() == DatasetType.EXTERNAL) {
             return false;
         }
 
         List<LogicalVariable> projectVars = new ArrayList<LogicalVariable>();
         VariableUtilities.getUsedVariables(op1, projectVars);
         // Create operators for secondary index insert/delete.
-        String itemTypeName = adecl.getItemTypeName();
+        String itemTypeName = dataset.getDatatypeName();
         IAType itemType = metadata.findType(itemTypeName);
         if (itemType.getTypeTag() != ATypeTag.RECORD) {
             throw new AlgebricksException("Only record types can be indexed.");
         }
         ARecordType recType = (ARecordType) itemType;
-        List<AqlCompiledIndexDecl> secondaryIndexes = DatasetUtils.getSecondaryIndexes(adecl);
-        if (secondaryIndexes.isEmpty()) {
-            return false;
-        }
+        List<Index> indexes = metadata.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName());
         ILogicalOperator currentTop = op1;
-        for (AqlCompiledIndexDecl index : secondaryIndexes) {
-            List<String> secondaryKeyFields = index.getFieldExprs();
+        boolean hasSecondaryIndex = false;
+        for (Index index : indexes) {
+            if (!index.isSecondaryIndex()) {
+                continue;
+            }
+            hasSecondaryIndex = true;
+            List<String> secondaryKeyFields = index.getKeyFieldNames();
             List<LogicalVariable> secondaryKeyVars = new ArrayList<LogicalVariable>();
             List<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();
             List<Mutable<ILogicalExpression>> secondaryExpressions = new ArrayList<Mutable<ILogicalExpression>>();
@@ -146,7 +148,7 @@
             project.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
             context.computeAndSetTypeEnvironmentForOperator(project);
             context.computeAndSetTypeEnvironmentForOperator(assign);
-            if (index.getKind() == IndexKind.BTREE) {
+            if (index.getIndexType() == IndexType.BTREE) {
                 for (LogicalVariable secondaryKeyVar : secondaryKeyVars) {
                     secondaryExpressions.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(
                             secondaryKeyVar)));
@@ -160,7 +162,7 @@
                 indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(assign));
                 currentTop = indexUpdate;
                 context.computeAndSetTypeEnvironmentForOperator(indexUpdate);
-            } else if (index.getKind() == IndexKind.RTREE) {
+            } else if (index.getIndexType() == IndexType.RTREE) {
                 Pair<IAType, Boolean> keyPairType = AqlCompiledIndexDecl.getNonNullableKeyFieldType(
                         secondaryKeyFields.get(0), recType);
                 IAType spatialType = keyPairType.first;
@@ -203,7 +205,9 @@
                 currentTop = indexUpdate;                
                 context.computeAndSetTypeEnvironmentForOperator(indexUpdate);
             }
-
+        }
+        if (!hasSecondaryIndex) {
+            return false;
         }
         op0.getInputs().clear();
         op0.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushFieldAccessRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushFieldAccessRule.java
index 5a3d3b8..87b233a 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushFieldAccessRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushFieldAccessRule.java
@@ -11,11 +11,11 @@
 import edu.uci.ics.asterix.algebra.base.AsterixOperatorAnnotations;
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.exceptions.AsterixRuntimeException;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.base.AInt32;
 import edu.uci.ics.asterix.om.base.AString;
@@ -116,11 +116,11 @@
         AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
         AqlCompiledMetadataDeclarations metadata = mp.getMetadataDeclarations();
         AqlSourceId asid = ((IDataSource<AqlSourceId>) scan.getDataSource()).getId();
-        AqlCompiledDatasetDecl adecl = metadata.findDataset(asid.getDatasetName());
-        if (adecl == null) {
+        Dataset dataset = metadata.findDataset(asid.getDatasetName());
+        if (dataset == null) {
             throw new AlgebricksException("Dataset " + asid.getDatasetName() + " not found.");
         }
-        if (adecl.getDatasetType() != DatasetType.INTERNAL && adecl.getDatasetType() != DatasetType.FEED) {
+        if (dataset.getType() != DatasetType.INTERNAL && dataset.getType() != DatasetType.FEED) {
             return false;
         }
         ILogicalExpression e1 = accessFun.getArguments().get(1).getValue();
@@ -134,7 +134,7 @@
             fldName = ((AString) obj).getStringValue();
         } else {
             int pos = ((AInt32) obj).getIntegerValue();
-            String tName = adecl.getItemTypeName();
+            String tName = dataset.getDatatypeName();
             IAType t = metadata.findType(tName);
             if (t.getTypeTag() != ATypeTag.RECORD) {
                 return false;
@@ -146,9 +146,16 @@
             fldName = rt.getFieldNames()[pos];
         }
 
-        List<AqlCompiledIndexDecl> idxList = DatasetUtils.findSecondaryIndexesByOneOfTheKeys(adecl, fldName);
-
-        return idxList != null && !idxList.isEmpty();
+        List<Index> datasetIndexes = metadata.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName());
+        boolean hasSecondaryIndex = false;
+        for (Index index : datasetIndexes) {
+            if (index.isSecondaryIndex()) {
+                hasSecondaryIndex = true;
+                break;
+            }
+        }
+        
+        return hasSecondaryIndex;
     }
 
     private boolean tryingToPushThroughSelectionWithSameDataSource(AssignOperator access, AbstractLogicalOperator op2) {
@@ -285,12 +292,12 @@
                             AqlSourceId asid = dataSource.getId();
                             AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
                             AqlCompiledMetadataDeclarations metadata = mp.getMetadataDeclarations();
-                            AqlCompiledDatasetDecl adecl = metadata.findDataset(asid.getDatasetName());
-                            if (adecl == null) {
+                            Dataset dataset = metadata.findDataset(asid.getDatasetName());
+                            if (dataset == null) {
                                 throw new AlgebricksException("Dataset " + asid.getDatasetName() + " not found.");
                             }
-                            if (adecl.getDatasetType() != DatasetType.INTERNAL
-                                    && adecl.getDatasetType() != DatasetType.FEED) {
+                            if (dataset.getType() != DatasetType.INTERNAL
+                                    && dataset.getType() != DatasetType.FEED) {
                                 setAsFinal(access, context, finalAnnot);
                                 return false;
                             }
@@ -301,7 +308,7 @@
                                 fldName = ((AString) obj).getStringValue();
                             } else {
                                 int pos = ((AInt32) obj).getIntegerValue();
-                                String tName = adecl.getItemTypeName();
+                                String tName = dataset.getDatatypeName();
                                 IAType t = metadata.findType(tName);
                                 if (t.getTypeTag() != ATypeTag.RECORD) {
                                     return false;
@@ -313,7 +320,7 @@
                                 }
                                 fldName = rt.getFieldNames()[pos];
                             }
-                            int p = DatasetUtils.getPositionOfPartitioningKeyField(adecl, fldName);
+                            int p = DatasetUtils.getPositionOfPartitioningKeyField(dataset, fldName);
                             if (p < 0) { // not one of the partitioning fields
                                 setAsFinal(access, context, finalAnnot);
                                 return false;
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index d7ebdc4..74f790f 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -9,7 +9,7 @@
 import edu.uci.ics.asterix.algebra.operators.physical.BTreeSearchPOperator;
 import edu.uci.ics.asterix.algebra.operators.physical.InvertedIndexPOperator;
 import edu.uci.ics.asterix.algebra.operators.physical.RTreeSearchPOperator;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl.IndexKind;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
@@ -157,9 +157,9 @@
                             throw new AlgebricksException("Could not find index " + jobGenParams.getIndexName()
                                     + " for dataset " + dataSourceId);
                         }
-                        IndexKind indexKind = jobGenParams.getIndexKind();
+                        IndexType indexType = jobGenParams.getIndexType();
                         boolean requiresBroadcast = jobGenParams.getRequiresBroadcast();
-                        switch (indexKind) {
+                        switch (indexType) {
                             case BTREE: {
                                 op.setPhysicalOperator(new BTreeSearchPOperator(dsi, requiresBroadcast));
                                 break;
@@ -177,7 +177,7 @@
                                 break;
                             }
                             default: {
-                                throw new NotImplementedException(indexKind + " indexes are not implemented.");
+                                throw new NotImplementedException(indexType + " indexes are not implemented.");
                             }
                         }
                     }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
index fb0c656..c2b35fa 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -6,13 +6,12 @@
 import org.apache.commons.lang3.mutable.Mutable;
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledFeedDatasetDetails;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
 import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
 import edu.uci.ics.asterix.metadata.declared.ExternalFeedDataSource;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
@@ -78,8 +77,8 @@
 
                 AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
                 AqlCompiledMetadataDeclarations metadata = mp.getMetadataDeclarations();
-                AqlCompiledDatasetDecl acdd = metadata.findDataset(datasetName);
-                if (acdd == null) {
+                Dataset dataset = metadata.findDataset(datasetName);
+                if (dataset == null) {
                     throw new AlgebricksException("Could not find dataset " + datasetName);
                 }
 
@@ -87,9 +86,8 @@
 
                 ArrayList<LogicalVariable> v = new ArrayList<LogicalVariable>();
 
-                if (acdd.getDatasetType() == DatasetType.INTERNAL || acdd.getDatasetType() == DatasetType.FEED) {
-
-                    int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(acdd).size();
+                if (dataset.getType() == DatasetType.INTERNAL || dataset.getType() == DatasetType.FEED) {
+                    int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
                     for (int i = 0; i < numPrimaryKeys; i++) {
                         v.add(context.newVar());
                     }
@@ -127,14 +125,14 @@
 
                 AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
                 AqlCompiledMetadataDeclarations metadata = mp.getMetadataDeclarations();
-                AqlCompiledDatasetDecl acdd = metadata.findDataset(datasetName);
+                Dataset dataset = metadata.findDataset(datasetName);
 
-                if (acdd == null) {
+                if (dataset == null) {
                     throw new AlgebricksException("Could not find dataset " + datasetName);
                 }
 
-                if (acdd.getDatasetType() != DatasetType.FEED) {
-                    throw new IllegalArgumentException("invalid dataset type:" + acdd.getDatasetType());
+                if (dataset.getType() != DatasetType.FEED) {
+                    throw new IllegalArgumentException("invalid dataset type:" + dataset.getType());
                 }
 
                 AqlSourceId asid = new AqlSourceId(metadata.getDataverseName(), datasetName);
@@ -149,7 +147,7 @@
 
                 v.add(unnest.getVariable());
 
-                DataSourceScanOperator scan = new DataSourceScanOperator(v, createDummyFeedDataSource(asid, acdd,
+                DataSourceScanOperator scan = new DataSourceScanOperator(v, createDummyFeedDataSource(asid, dataset,
                         metadata));
 
                 List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
@@ -166,27 +164,14 @@
         return false;
     }
 
-    private AqlDataSource createDummyFeedDataSource(AqlSourceId aqlId, AqlCompiledDatasetDecl acdd,
+    private AqlDataSource createDummyFeedDataSource(AqlSourceId aqlId, Dataset dataset,
             AqlCompiledMetadataDeclarations metadata) throws AlgebricksException {
-
-        AqlCompiledFeedDatasetDetails feedDetails = (AqlCompiledFeedDatasetDetails) acdd.getAqlCompiledDatasetDetails();
-
         if (!aqlId.getDataverseName().equals(metadata.getDataverseName())) {
             return null;
         }
-
-        String tName = acdd.getItemTypeName();
-        IAType itemType;
-        try {
-            itemType = metadata.findType(tName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-
-        AqlCompiledDatasetDecl dummySourceDecl = new AqlCompiledDatasetDecl(acdd.getName(), tName,
-                DatasetType.EXTERNAL, feedDetails);
-
-        ExternalFeedDataSource extDataSource = new ExternalFeedDataSource(aqlId, dummySourceDecl, itemType,
+        String tName = dataset.getDatatypeName();
+        IAType itemType = metadata.findType(tName);
+        ExternalFeedDataSource extDataSource = new ExternalFeedDataSource(aqlId, dataset, itemType,
                 AqlDataSource.AqlDataSourceType.EXTERNAL_FEED);
         return extDataSource;
     }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
index e4f795b..131e7b5a 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
@@ -7,14 +7,17 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.base.AInt32;
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -34,6 +37,8 @@
  */
 public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRewriteRule {
     
+    private AqlCompiledMetadataDeclarations metadata;
+    
     public abstract Map<FunctionIdentifier, List<IAccessMethod>> getAccessMethods();
     
     protected static void registerAccessMethod(IAccessMethod accessMethod, Map<FunctionIdentifier, List<IAccessMethod>> accessMethods) {
@@ -53,7 +58,12 @@
         return false;
     }
     
-    protected void fillSubTreeIndexExprs(OptimizableOperatorSubTree subTree, Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs) {
+    protected void setMetadataDeclarations(IOptimizationContext context) {
+        AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
+        metadata = metadataProvider.getMetadataDeclarations();
+    }
+    
+    protected void fillSubTreeIndexExprs(OptimizableOperatorSubTree subTree, Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs) throws AlgebricksException {
         // The assign may be null if there is only a filter on the primary index key.
         // Match variables from lowest assign which comes directly after the dataset scan.
         List<LogicalVariable> varList = (!subTree.assigns.isEmpty()) ? subTree.assigns.get(subTree.assigns.size() - 1).getVariables() : subTree.dataSourceScan.getVariables();
@@ -85,16 +95,16 @@
      * Simply picks the first index that it finds.
      * TODO: Improve this decision process by making it more systematic.
      */
-    protected Pair<IAccessMethod, AqlCompiledIndexDecl> chooseIndex(
+    protected Pair<IAccessMethod, Index> chooseIndex(
             Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs) {
         Iterator<Map.Entry<IAccessMethod, AccessMethodAnalysisContext>> amIt = analyzedAMs.entrySet().iterator();
         while (amIt.hasNext()) {
             Map.Entry<IAccessMethod, AccessMethodAnalysisContext> amEntry = amIt.next();
             AccessMethodAnalysisContext analysisCtx = amEntry.getValue();
-            Iterator<Map.Entry<AqlCompiledIndexDecl, List<Integer>>> indexIt = analysisCtx.indexExprs.entrySet().iterator();
+            Iterator<Map.Entry<Index, List<Integer>>> indexIt = analysisCtx.indexExprs.entrySet().iterator();
             if (indexIt.hasNext()) {
-                Map.Entry<AqlCompiledIndexDecl, List<Integer>> indexEntry = indexIt.next();
-                return new Pair<IAccessMethod, AqlCompiledIndexDecl>(amEntry.getKey(), indexEntry.getKey());
+                Map.Entry<Index, List<Integer>> indexEntry = indexIt.next();
+                return new Pair<IAccessMethod, Index>(amEntry.getKey(), indexEntry.getKey());
             }
         }
         return null;
@@ -110,15 +120,15 @@
      * 
      */
     public void pruneIndexCandidates(IAccessMethod accessMethod, AccessMethodAnalysisContext analysisCtx) {
-        Iterator<Map.Entry<AqlCompiledIndexDecl, List<Integer>>> it = analysisCtx.indexExprs.entrySet().iterator();
+        Iterator<Map.Entry<Index, List<Integer>>> it = analysisCtx.indexExprs.entrySet().iterator();
         while (it.hasNext()) {
-            Map.Entry<AqlCompiledIndexDecl, List<Integer>> entry = it.next();
-            AqlCompiledIndexDecl index = entry.getKey();
+            Map.Entry<Index, List<Integer>> entry = it.next();
+            Index index = entry.getKey();
             Iterator<Integer> exprsIter = entry.getValue().iterator();
             boolean allUsed = true;
             int lastFieldMatched = -1;
-            for (int i = 0; i < index.getFieldExprs().size(); i++) {
-                String keyField = index.getFieldExprs().get(i);
+            for (int i = 0; i < index.getKeyFieldNames().size(); i++) {
+                String keyField = index.getKeyFieldNames().get(i);
                 boolean foundKeyField = false;
                 while (exprsIter.hasNext()) {
                     Integer ix = exprsIter.next();
@@ -223,31 +233,30 @@
      * 
      * @return true if a candidate index was added to foundIndexExprs, false
      *         otherwise
+     * @throws AlgebricksException 
      */
     protected boolean fillIndexExprs(String fieldName, int matchedFuncExprIndex,
-            AqlCompiledDatasetDecl datasetDecl, AccessMethodAnalysisContext analysisCtx) {
-        AqlCompiledIndexDecl primaryIndexDecl = DatasetUtils.getPrimaryIndex(datasetDecl);
-        List<String> primaryIndexFields = primaryIndexDecl.getFieldExprs();     
-        List<AqlCompiledIndexDecl> indexCandidates = DatasetUtils.findSecondaryIndexesByOneOfTheKeys(datasetDecl, fieldName);
-        // Check whether the primary index is a candidate. If so, add it to the list.
-        if (primaryIndexFields.contains(fieldName)) {
-            if (indexCandidates == null) {
-                indexCandidates = new ArrayList<AqlCompiledIndexDecl>(1);
+            Dataset dataset, AccessMethodAnalysisContext analysisCtx) throws AlgebricksException {
+        List<Index> datasetIndexes = metadata.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName());
+        List<Index> indexCandidates = new ArrayList<Index>();
+        // Add an index to the candidates if one of the indexed fields is fieldName.
+        for (Index index : datasetIndexes) {
+            if (index.getKeyFieldNames().contains(fieldName)) {
+                indexCandidates.add(index);
             }
-            indexCandidates.add(primaryIndexDecl);
         }
         // No index candidates for fieldName.
-        if (indexCandidates == null) {
+        if (indexCandidates.isEmpty()) {
             return false;
         }
         // Go through the candidates and fill indexExprs.
-        for (AqlCompiledIndexDecl index : indexCandidates) {
-            analysisCtx.addIndexExpr(datasetDecl, index, matchedFuncExprIndex);
+        for (Index index : indexCandidates) {
+            analysisCtx.addIndexExpr(dataset, index, matchedFuncExprIndex);
         }
         return true;
     }
 
-    protected void fillAllIndexExprs(List<LogicalVariable> varList, OptimizableOperatorSubTree subTree, AccessMethodAnalysisContext analysisCtx) {
+    protected void fillAllIndexExprs(List<LogicalVariable> varList, OptimizableOperatorSubTree subTree, AccessMethodAnalysisContext analysisCtx) throws AlgebricksException {
         for (int optFuncExprIndex = 0; optFuncExprIndex < analysisCtx.matchedFuncExprs.size(); optFuncExprIndex++) {
             for (int varIndex = 0; varIndex < varList.size(); varIndex++) {
                 LogicalVariable var = varList.get(varIndex);
@@ -274,12 +283,12 @@
                         break;
                     }
                     // The variable value is one of the partitioning fields.
-                    fieldName = DatasetUtils.getPartitioningExpressions(subTree.datasetDecl).get(varIndex);
+                    fieldName = DatasetUtils.getPartitioningKeys(subTree.dataset).get(varIndex);
                 }
                 // Set the fieldName in the corresponding matched function expression, and remember matching subtree.
                 optFuncExpr.setFieldName(funcVarIndex, fieldName);
                 optFuncExpr.setOptimizableSubTree(funcVarIndex, subTree);
-                fillIndexExprs(fieldName, optFuncExprIndex, subTree.datasetDecl, analysisCtx);
+                fillIndexExprs(fieldName, optFuncExprIndex, subTree.dataset, analysisCtx);
             }
         }
     }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodAnalysisContext.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodAnalysisContext.java
index b977e29..ec6bf3b 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodAnalysisContext.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodAnalysisContext.java
@@ -4,8 +4,8 @@
 import java.util.HashMap;
 import java.util.List;
 
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Index;
 
 /**
  * Context for analyzing the applicability of a single access method.
@@ -17,12 +17,12 @@
     // Contains candidate indexes and a list of integers that index into matchedFuncExprs.
     // We are mapping from candidate indexes to a list of function expressions 
     // that match one of the index's expressions.
-    public HashMap<AqlCompiledIndexDecl, List<Integer>> indexExprs = new HashMap<AqlCompiledIndexDecl, List<Integer>>();
+    public HashMap<Index, List<Integer>> indexExprs = new HashMap<Index, List<Integer>>();
     
     // Maps from index to the dataset it is indexing.
-    public HashMap<AqlCompiledIndexDecl, AqlCompiledDatasetDecl> indexDatasetMap = new HashMap<AqlCompiledIndexDecl, AqlCompiledDatasetDecl>();
+    public HashMap<Index, Dataset> indexDatasetMap = new HashMap<Index, Dataset>();
     
-    public void addIndexExpr(AqlCompiledDatasetDecl dataset,  AqlCompiledIndexDecl index, Integer exprIndex) {
+    public void addIndexExpr(Dataset dataset, Index index, Integer exprIndex) {
     	List<Integer> exprs = indexExprs.get(index);
     	if (exprs == null) {
     	    exprs = new ArrayList<Integer>();
@@ -32,7 +32,7 @@
     	indexDatasetMap.put(index, dataset);
     }
     
-    public List<Integer> getIndexExprs(AqlCompiledIndexDecl index) {
+    public List<Integer> getIndexExprs(Index index) {
         return indexExprs.get(index);
     }
 }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
index 22ed47a..3f3ac02 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
@@ -5,7 +5,7 @@
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl.IndexKind;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
 import edu.uci.ics.asterix.om.base.AInt32;
 import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -19,7 +19,7 @@
  */
 public class AccessMethodJobGenParams {
     protected String indexName;
-    protected IndexKind indexKind;
+    protected IndexType indexType;
     protected String datasetName;
     protected boolean retainInput;
     protected boolean requiresBroadcast;
@@ -29,9 +29,9 @@
     public AccessMethodJobGenParams() {
     }
     
-    public AccessMethodJobGenParams(String indexName, IndexKind indexKind, String datasetName, boolean retainInput, boolean requiresBroadcast) {
+    public AccessMethodJobGenParams(String indexName, IndexType indexType, String datasetName, boolean retainInput, boolean requiresBroadcast) {
         this.indexName = indexName;
-        this.indexKind = indexKind;
+        this.indexType = indexType;
         this.datasetName = datasetName;
         this.retainInput = retainInput;
         this.requiresBroadcast = requiresBroadcast;
@@ -39,7 +39,7 @@
     
     public void writeToFuncArgs(List<Mutable<ILogicalExpression>> funcArgs) {
         funcArgs.add(new MutableObject<ILogicalExpression>(AccessMethodUtils.createStringConstant(indexName)));
-        funcArgs.add(new MutableObject<ILogicalExpression>(AccessMethodUtils.createInt32Constant(indexKind.ordinal())));
+        funcArgs.add(new MutableObject<ILogicalExpression>(AccessMethodUtils.createInt32Constant(indexType.ordinal())));
         funcArgs.add(new MutableObject<ILogicalExpression>(AccessMethodUtils.createStringConstant(datasetName)));
         funcArgs.add(new MutableObject<ILogicalExpression>(AccessMethodUtils.createBooleanConstant(retainInput)));
         funcArgs.add(new MutableObject<ILogicalExpression>(AccessMethodUtils.createBooleanConstant(requiresBroadcast)));
@@ -47,7 +47,7 @@
 
     public void readFromFuncArgs(List<Mutable<ILogicalExpression>> funcArgs) {
         indexName = AccessMethodUtils.getStringConstant(funcArgs.get(0));
-        indexKind = IndexKind.values()[AccessMethodUtils.getInt32Constant(funcArgs.get(1))];
+        indexType = IndexType.values()[AccessMethodUtils.getInt32Constant(funcArgs.get(1))];
         datasetName = AccessMethodUtils.getStringConstant(funcArgs.get(2));
         retainInput = AccessMethodUtils.getBooleanConstant(funcArgs.get(3));
         requiresBroadcast = AccessMethodUtils.getBooleanConstant(funcArgs.get(4));
@@ -57,8 +57,8 @@
         return indexName;
     }
 
-    public IndexKind getIndexKind() {
-        return indexKind;
+    public IndexType getIndexType() {
+        return indexType;
     }
 
     public String getDatasetName() {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java
index 305b858..85d1eff 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java
@@ -7,9 +7,10 @@
 import org.apache.commons.lang3.mutable.MutableObject;
 
 import edu.uci.ics.asterix.aql.util.FunctionUtils;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl.IndexKind;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.base.ABoolean;
 import edu.uci.ics.asterix.om.base.AInt32;
@@ -22,7 +23,6 @@
 import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -40,17 +40,16 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 
 /**
  * Static helper functions for rewriting plans using indexes. 
  */
 public class AccessMethodUtils {
-    public static void appendPrimaryIndexTypes(AqlCompiledDatasetDecl datasetDecl, IAType itemType, List<Object> target) {
-        List<Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
-                .getPartitioningFunctions(datasetDecl);
-        for (Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType> t : partitioningFunctions) {
-            target.add(t.third);
+    public static void appendPrimaryIndexTypes(Dataset dataset, IAType itemType, List<Object> target) {
+        ARecordType recordType = (ARecordType) itemType;
+        List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+        for (String partitioningKey : partitioningKeys) {
+            target.add(recordType.getFieldType(partitioningKey));
         }
         target.add(itemType);
     }
@@ -115,22 +114,23 @@
         return true;
     }
     
-    public static int getNumSecondaryKeys(AqlCompiledDatasetDecl datasetDecl, AqlCompiledIndexDecl indexDecl,
-            ARecordType recordType) throws AlgebricksException {
-        switch (indexDecl.getKind()) {
+    public static int getNumSecondaryKeys(Index index, ARecordType recordType)
+            throws AlgebricksException {
+        switch (index.getIndexType()) {
             case BTREE:
             case WORD_INVIX:
             case NGRAM_INVIX: {
-                return indexDecl.getFieldExprs().size();
+                return index.getKeyFieldNames().size();
             }
             case RTREE: {
-            	Pair<IAType, Boolean> keyPairType = AqlCompiledIndexDecl.getNonNullableKeyFieldType(indexDecl.getFieldExprs().get(0), recordType);
+                Pair<IAType, Boolean> keyPairType = AqlCompiledIndexDecl.getNonNullableKeyFieldType(index
+                        .getKeyFieldNames().get(0), recordType);
                 IAType keyType = keyPairType.first;
                 int numDimensions = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
                 return numDimensions * 2;
             }
             default: {
-                throw new AlgebricksException("Unknown index kind: " + indexDecl.getKind());
+                throw new AlgebricksException("Unknown index kind: " + index.getIndexType());
             }
         }
     }
@@ -138,24 +138,24 @@
     /**
      * Appends the types of the fields produced by the given secondary index to dest.
      */
-    public static void appendSecondaryIndexTypes(AqlCompiledDatasetDecl datasetDecl, ARecordType recordType,
-            AqlCompiledIndexDecl indexDecl, boolean primaryKeysOnly, List<Object> dest) throws AlgebricksException {
+    public static void appendSecondaryIndexTypes(Dataset dataset, ARecordType recordType,
+            Index index, boolean primaryKeysOnly, List<Object> dest) throws AlgebricksException {
         if (!primaryKeysOnly) {
-            switch (indexDecl.getKind()) {
+            switch (index.getIndexType()) {
                 case BTREE:
                 case WORD_INVIX:
                 case NGRAM_INVIX: {
-                    for (String sk : indexDecl.getFieldExprs()) {
+                    for (String sk : index.getKeyFieldNames()) {
                     	Pair<IAType, Boolean> keyPairType = AqlCompiledIndexDecl.getNonNullableKeyFieldType(sk, recordType);
                         dest.add(keyPairType.first);
                     }
                     break;
                 }
                 case RTREE: {
-                	Pair<IAType, Boolean> keyPairType = AqlCompiledIndexDecl.getNonNullableKeyFieldType(indexDecl.getFieldExprs().get(0), recordType);
+                	Pair<IAType, Boolean> keyPairType = AqlCompiledIndexDecl.getNonNullableKeyFieldType(index.getKeyFieldNames().get(0), recordType);
                     IAType keyType = keyPairType.first;
                     IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
-                    int numKeys = getNumSecondaryKeys(datasetDecl, indexDecl, recordType);
+                    int numKeys = getNumSecondaryKeys(index, recordType);
                     for (int i = 0; i < numKeys; i++) {
                         dest.add(nestedKeyType);
                     }
@@ -164,26 +164,25 @@
             }
         }
         // Primary keys.
-        for (Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType> t : DatasetUtils
-                .getPartitioningFunctions(datasetDecl)) {
-            dest.add(t.third);
+        List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+        for (String partitioningKey : partitioningKeys) {
+            dest.add(recordType.getFieldType(partitioningKey));
         }
     }
     
-    public static void appendSecondaryIndexOutputVars(AqlCompiledDatasetDecl datasetDecl, ARecordType recordType,
-            AqlCompiledIndexDecl indexDecl, boolean primaryKeysOnly, IOptimizationContext context,
+    public static void appendSecondaryIndexOutputVars(Dataset dataset, ARecordType recordType,
+            Index index, boolean primaryKeysOnly, IOptimizationContext context,
             List<LogicalVariable> dest) throws AlgebricksException {
-        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(datasetDecl).size();
-        int numSecondaryKeys = getNumSecondaryKeys(datasetDecl, indexDecl, recordType);
+        int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+        int numSecondaryKeys = getNumSecondaryKeys(index, recordType);
         int numVars = (primaryKeysOnly) ? numPrimaryKeys : numPrimaryKeys + numSecondaryKeys;
         for (int i = 0; i < numVars; i++) {
             dest.add(context.newVar());
         }
     }
     
-    public static List<LogicalVariable> getPrimaryKeyVarsFromUnnestMap(AqlCompiledDatasetDecl datasetDecl,
-            ILogicalOperator unnestMapOp) {
-        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(datasetDecl).size();
+    public static List<LogicalVariable> getPrimaryKeyVarsFromUnnestMap(Dataset dataset, ILogicalOperator unnestMapOp) {
+        int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
         List<LogicalVariable> primaryKeyVars = new ArrayList<LogicalVariable>();
         List<LogicalVariable> sourceVars = ((UnnestMapOperator) unnestMapOp).getVariables();
         // Assumes the primary keys are located at the end.
@@ -195,8 +194,8 @@
         return primaryKeyVars;
     }
     
-    public static UnnestMapOperator createSecondaryIndexUnnestMap(AqlCompiledDatasetDecl datasetDecl,
-            ARecordType recordType, AqlCompiledIndexDecl indexDecl, ILogicalOperator inputOp,
+    public static UnnestMapOperator createSecondaryIndexUnnestMap(Dataset dataset,
+            ARecordType recordType, Index index, ILogicalOperator inputOp,
             AccessMethodJobGenParams jobGenParams, IOptimizationContext context, boolean outputPrimaryKeysOnly,
             boolean retainInput) throws AlgebricksException {
         // The job gen parameters are transferred to the actual job gen via the UnnestMapOperator's function arguments.
@@ -206,8 +205,8 @@
         List<LogicalVariable> secondaryIndexUnnestVars = new ArrayList<LogicalVariable>();
         List<Object> secondaryIndexOutputTypes = new ArrayList<Object>();
         // Append output variables/types generated by the secondary-index search (not forwarded from input).
-        appendSecondaryIndexOutputVars(datasetDecl, recordType, indexDecl, outputPrimaryKeysOnly, context, secondaryIndexUnnestVars);
-        appendSecondaryIndexTypes(datasetDecl, recordType, indexDecl, outputPrimaryKeysOnly, secondaryIndexOutputTypes);        
+        appendSecondaryIndexOutputVars(dataset, recordType, index, outputPrimaryKeysOnly, context, secondaryIndexUnnestVars);
+        appendSecondaryIndexTypes(dataset, recordType, index, outputPrimaryKeysOnly, secondaryIndexOutputTypes);        
         // An index search is expressed as an unnest over an index-search function.
         IFunctionInfo secondaryIndexSearch = FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.INDEX_SEARCH);
         UnnestingFunctionCallExpression secondaryIndexSearchFunc = new UnnestingFunctionCallExpression(secondaryIndexSearch, secondaryIndexFuncArgs);
@@ -222,10 +221,10 @@
         return secondaryIndexUnnestOp;
     }
     
-    public static UnnestMapOperator createPrimaryIndexUnnestMap(DataSourceScanOperator dataSourceScan, AqlCompiledDatasetDecl datasetDecl, 
+    public static UnnestMapOperator createPrimaryIndexUnnestMap(DataSourceScanOperator dataSourceScan, Dataset dataset, 
             ARecordType recordType, ILogicalOperator inputOp,
             IOptimizationContext context, boolean sortPrimaryKeys, boolean retainInput, boolean requiresBroadcast) throws AlgebricksException {        
-        List<LogicalVariable> primaryKeyVars = AccessMethodUtils.getPrimaryKeyVarsFromUnnestMap(datasetDecl, inputOp);
+        List<LogicalVariable> primaryKeyVars = AccessMethodUtils.getPrimaryKeyVarsFromUnnestMap(dataset, inputOp);
         // Optionally add a sort on the primary-index keys before searching the primary index.
         OrderOperator order = null;
         if (sortPrimaryKeys) {
@@ -242,7 +241,8 @@
         }
         // The job gen parameters are transferred to the actual job gen via the UnnestMapOperator's function arguments. 
         List<Mutable<ILogicalExpression>> primaryIndexFuncArgs = new ArrayList<Mutable<ILogicalExpression>>();
-        BTreeJobGenParams jobGenParams = new BTreeJobGenParams(datasetDecl.getName(), IndexKind.BTREE, datasetDecl.getName(), retainInput, requiresBroadcast);
+        BTreeJobGenParams jobGenParams = new BTreeJobGenParams(dataset.getDatasetName(), IndexType.BTREE,
+                dataset.getDatasetName(), retainInput, requiresBroadcast);
         // Set low/high inclusive to true for a point lookup.
         jobGenParams.setLowKeyInclusive(true);
         jobGenParams.setHighKeyInclusive(true);
@@ -254,7 +254,7 @@
         List<Object> primaryIndexOutputTypes = new ArrayList<Object>();
         // Append output variables/types generated by the primary-index search (not forwarded from input).
         primaryIndexUnnestVars.addAll(dataSourceScan.getVariables());
-        appendPrimaryIndexTypes(datasetDecl, recordType, primaryIndexOutputTypes);
+        appendPrimaryIndexTypes(dataset, recordType, primaryIndexOutputTypes);
         // An index search is expressed as an unnest over an index-search function.
         IFunctionInfo primaryIndexSearch = FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.INDEX_SEARCH);
         AbstractFunctionCallExpression primaryIndexSearchFunc = new ScalarFunctionCallExpression(primaryIndexSearch, primaryIndexFuncArgs);
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
index 25902bc..5a11524 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
@@ -9,10 +9,9 @@
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl.IndexKind;
-import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -82,9 +81,9 @@
 
     @Override
     public boolean applySelectPlanTransformation(Mutable<ILogicalOperator> selectRef, OptimizableOperatorSubTree subTree,
-            AqlCompiledIndexDecl chosenIndex, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context) 
+            Index chosenIndex, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context) 
                     throws AlgebricksException {
-        AqlCompiledDatasetDecl datasetDecl = subTree.datasetDecl;
+        Dataset dataset = subTree.dataset;
         ARecordType recordType = subTree.recordType;
         SelectOperator select = (SelectOperator) selectRef.getValue();
         DataSourceScanOperator dataSourceScan = subTree.dataSourceScan;
@@ -93,7 +92,7 @@
         if (assignRef != null) {
             assign = (AssignOperator) assignRef.getValue();
         }        
-        int numSecondaryKeys = chosenIndex.getFieldExprs().size();        
+        int numSecondaryKeys = chosenIndex.getKeyFieldNames().size();        
         
         // Info on high and low keys for the BTree search predicate.
         IAlgebricksConstantValue[] lowKeyConstants = new IAlgebricksConstantValue[numSecondaryKeys];
@@ -117,7 +116,7 @@
         for (Integer exprIndex : exprList) {
             // Position of the field of matchedFuncExprs.get(exprIndex) in the chosen index's indexed exprs.
             IOptimizableFuncExpr optFuncExpr = matchedFuncExprs.get(exprIndex);
-            int keyPos = indexOf(optFuncExpr.getFieldName(0), chosenIndex.getFieldExprs());
+            int keyPos = indexOf(optFuncExpr.getFieldName(0), chosenIndex.getKeyFieldNames());
             if (keyPos < 0) {
                 throw new InternalError();
             }
@@ -223,7 +222,8 @@
         int numLowKeys = createKeyVarsAndExprs(lowKeyLimits, lowKeyConstants, keyExprList, keyVarList, context);
         int numHighKeys = createKeyVarsAndExprs(highKeyLimits, highKeyConstants, keyExprList, keyVarList, context);
         
-        BTreeJobGenParams jobGenParams = new BTreeJobGenParams(chosenIndex.getIndexName(), IndexKind.BTREE, datasetDecl.getName(), false, false);
+        BTreeJobGenParams jobGenParams = new BTreeJobGenParams(chosenIndex.getIndexName(), IndexType.BTREE,
+                dataset.getDatasetName(), false, false);
         jobGenParams.setLowKeyInclusive(lowKeyInclusive[0]);
         jobGenParams.setHighKeyInclusive(highKeyInclusive[0]);
         jobGenParams.setLowKeyVarList(keyVarList, 0, numLowKeys);
@@ -235,17 +235,17 @@
         assignSearchKeys.getInputs().add(dataSourceScan.getInputs().get(0));
         assignSearchKeys.setExecutionMode(dataSourceScan.getExecutionMode());
         
-        UnnestMapOperator secondaryIndexUnnestOp = AccessMethodUtils.createSecondaryIndexUnnestMap(datasetDecl,
+        UnnestMapOperator secondaryIndexUnnestOp = AccessMethodUtils.createSecondaryIndexUnnestMap(dataset,
                 recordType, chosenIndex, assignSearchKeys, jobGenParams, context, false, false);
         
         // Generate the rest of the upstream plan which feeds the search results into the primary index.        
         UnnestMapOperator primaryIndexUnnestOp;
-        boolean isPrimaryIndex = chosenIndex == DatasetUtils.getPrimaryIndex(datasetDecl);
+        boolean isPrimaryIndex = chosenIndex.getIndexName().equals(dataset.getDatasetName());
         if (!isPrimaryIndex) {
-            primaryIndexUnnestOp = AccessMethodUtils.createPrimaryIndexUnnestMap(dataSourceScan, datasetDecl, recordType, secondaryIndexUnnestOp, context, true, false, false);
+            primaryIndexUnnestOp = AccessMethodUtils.createPrimaryIndexUnnestMap(dataSourceScan, dataset, recordType, secondaryIndexUnnestOp, context, true, false, false);
         } else {
             List<Object> primaryIndexOutputTypes = new ArrayList<Object>();
-            AccessMethodUtils.appendPrimaryIndexTypes(datasetDecl, recordType, primaryIndexOutputTypes);
+            AccessMethodUtils.appendPrimaryIndexTypes(dataset, recordType, primaryIndexOutputTypes);
             primaryIndexUnnestOp = new UnnestMapOperator(dataSourceScan.getVariables(),
                     secondaryIndexUnnestOp.getExpressionRef(), primaryIndexOutputTypes, false);
             primaryIndexUnnestOp.getInputs().add(new MutableObject<ILogicalOperator>(assignSearchKeys));
@@ -280,7 +280,7 @@
     @Override
     public boolean applyJoinPlanTransformation(Mutable<ILogicalOperator> joinRef,
             OptimizableOperatorSubTree leftSubTree, OptimizableOperatorSubTree rightSubTree,
-            AqlCompiledIndexDecl chosenIndex, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context)
+            Index chosenIndex, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context)
             throws AlgebricksException {
         // TODO: Implement this.
         return false;
@@ -395,7 +395,7 @@
     }
 
     @Override
-    public boolean exprIsOptimizable(AqlCompiledIndexDecl index, IOptimizableFuncExpr optFuncExpr) {
+    public boolean exprIsOptimizable(Index index, IOptimizableFuncExpr optFuncExpr) {
         // No additional analysis required for BTrees.
         return true;
     }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeJobGenParams.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeJobGenParams.java
index 5dea91f..72da5a6 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeJobGenParams.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeJobGenParams.java
@@ -6,7 +6,7 @@
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl.IndexKind;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
@@ -27,9 +27,9 @@
         super();
     }
     
-    public BTreeJobGenParams(String indexName, IndexKind indexKind, String datasetName, boolean retainInput,
+    public BTreeJobGenParams(String indexName, IndexType indexType, String datasetName, boolean retainInput,
             boolean requiresBroadcast) {
-        super(indexName, indexKind, datasetName, retainInput, requiresBroadcast);
+        super(indexName, indexType, datasetName, retainInput, requiresBroadcast);
     }
 
     public void setLowKeyVarList(List<LogicalVariable> keyVarList, int startIndex, int numKeys) {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IAccessMethod.java
index 8dc8b8e..0f3dd29 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IAccessMethod.java
@@ -4,7 +4,7 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
+import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -59,18 +59,18 @@
      * Applies the plan transformation to use chosenIndex to optimize a selection query.
      */
     public boolean applySelectPlanTransformation(Mutable<ILogicalOperator> selectRef, OptimizableOperatorSubTree subTree,            
-            AqlCompiledIndexDecl chosenIndex, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context)
+            Index chosenIndex, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context)
             throws AlgebricksException;
     
     /**
      * Applies the plan transformation to use chosenIndex to optimize a join query.
      */
     public boolean applyJoinPlanTransformation(Mutable<ILogicalOperator> joinRef, OptimizableOperatorSubTree leftSubTree, OptimizableOperatorSubTree rightSubTree,
-            AqlCompiledIndexDecl chosenIndex, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context)
+            Index chosenIndex, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context)
             throws AlgebricksException;
     
     /**
      * Analyzes expr to see whether it is optimizable by the given concrete index.
      */
-    public boolean exprIsOptimizable(AqlCompiledIndexDecl index, IOptimizableFuncExpr optFuncExpr);
+    public boolean exprIsOptimizable(Index index, IOptimizableFuncExpr optFuncExpr);
 }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
index 72acc1d..71a35e7 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
@@ -6,8 +6,8 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -62,11 +62,12 @@
 	
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        setMetadataDeclarations(context);
+        
     	// Match operator pattern and initialize optimizable sub trees.
         if (!matchesOperatorPattern(opRef, context)) {
             return false;
         }
-        
         // Analyze condition on those optimizable subtrees that have a datasource scan.
         Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs = new HashMap<IAccessMethod, AccessMethodAnalysisContext>();
         boolean matchInLeftSubTree = false;
@@ -103,7 +104,7 @@
         pruneIndexCandidates(analyzedAMs);
         
         // Choose index to be applied.
-        Pair<IAccessMethod, AqlCompiledIndexDecl> chosenIndex = chooseIndex(analyzedAMs);
+        Pair<IAccessMethod, Index> chosenIndex = chooseIndex(analyzedAMs);
         if (chosenIndex == null) {
             context.addToDontApplySet(this, join);
             return false;
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
index 1d2cc0e..66acab0 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
@@ -6,8 +6,8 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -69,6 +69,8 @@
 	
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        setMetadataDeclarations(context);
+        
     	// Match operator pattern and initialize operator members.
         if (!matchesOperatorPattern(opRef, context)) {
             return false;
@@ -89,7 +91,7 @@
         pruneIndexCandidates(analyzedAMs);
         
         // Choose index to be applied.
-        Pair<IAccessMethod, AqlCompiledIndexDecl> chosenIndex = chooseIndex(analyzedAMs);
+        Pair<IAccessMethod, Index> chosenIndex = chooseIndex(analyzedAMs);
         if (chosenIndex == null) {
             context.addToDontApplySet(this, select);
             return false;
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
index fe3a539..307d9d6 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
@@ -14,9 +14,8 @@
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryTokenizerFactoryProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl.IndexKind;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.om.base.AFloat;
 import edu.uci.ics.asterix.om.base.AInt32;
 import edu.uci.ics.asterix.om.base.ANull;
@@ -309,12 +308,13 @@
     }
 
     private ILogicalOperator createSecondaryToPrimaryPlan(OptimizableOperatorSubTree indexSubTree, OptimizableOperatorSubTree probeSubTree, 
-            AqlCompiledIndexDecl chosenIndex, IOptimizableFuncExpr optFuncExpr, boolean retainInput, boolean requiresBroadcast, IOptimizationContext context) throws AlgebricksException {
-        AqlCompiledDatasetDecl datasetDecl = indexSubTree.datasetDecl;
+            Index chosenIndex, IOptimizableFuncExpr optFuncExpr, boolean retainInput, boolean requiresBroadcast, IOptimizationContext context) throws AlgebricksException {
+        Dataset dataset = indexSubTree.dataset;
         ARecordType recordType = indexSubTree.recordType;
         DataSourceScanOperator dataSourceScan = indexSubTree.dataSourceScan;
         
-        InvertedIndexJobGenParams jobGenParams = new InvertedIndexJobGenParams(chosenIndex.getIndexName(), chosenIndex.getKind(), datasetDecl.getName(), retainInput, requiresBroadcast);
+        InvertedIndexJobGenParams jobGenParams = new InvertedIndexJobGenParams(chosenIndex.getIndexName(),
+                chosenIndex.getIndexType(), dataset.getDatasetName(), retainInput, requiresBroadcast);
         // Add function-specific args such as search modifier, and possibly a similarity threshold.
         addFunctionSpecificArgs(optFuncExpr, jobGenParams);
         // Add the type of search key from the optFuncExpr.
@@ -343,11 +343,11 @@
             inputOp = (AbstractLogicalOperator) probeSubTree.root;
         }
         jobGenParams.setKeyVarList(keyVarList);
-        UnnestMapOperator secondaryIndexUnnestOp = AccessMethodUtils.createSecondaryIndexUnnestMap(datasetDecl,
+        UnnestMapOperator secondaryIndexUnnestOp = AccessMethodUtils.createSecondaryIndexUnnestMap(dataset,
                 recordType, chosenIndex, inputOp, jobGenParams, context, true, retainInput);
         // Generate the rest of the upstream plan which feeds the search results into the primary index.
         UnnestMapOperator primaryIndexUnnestOp = AccessMethodUtils.createPrimaryIndexUnnestMap(dataSourceScan,
-                datasetDecl, recordType, secondaryIndexUnnestOp, context, true, retainInput, false);
+                dataset, recordType, secondaryIndexUnnestOp, context, true, retainInput, false);
         return primaryIndexUnnestOp;
     }
     
@@ -368,7 +368,7 @@
     
     @Override
     public boolean applySelectPlanTransformation(Mutable<ILogicalOperator> selectRef, OptimizableOperatorSubTree subTree,
-            AqlCompiledIndexDecl chosenIndex, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context)
+            Index chosenIndex, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context)
             throws AlgebricksException {
         IOptimizableFuncExpr optFuncExpr = chooseOptFuncExpr(chosenIndex, analysisCtx);
         ILogicalOperator indexPlanRootOp = createSecondaryToPrimaryPlan(subTree, null, chosenIndex, optFuncExpr, false, false, context);
@@ -380,17 +380,17 @@
     @Override
     public boolean applyJoinPlanTransformation(Mutable<ILogicalOperator> joinRef,
             OptimizableOperatorSubTree leftSubTree, OptimizableOperatorSubTree rightSubTree,
-            AqlCompiledIndexDecl chosenIndex, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context)
+            Index chosenIndex, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context)
             throws AlgebricksException {
         // Figure out if the index is applicable on the left or right side (if both, we arbitrarily prefer the left side).
-        AqlCompiledDatasetDecl dataset = analysisCtx.indexDatasetMap.get(chosenIndex);
+        Dataset dataset = analysisCtx.indexDatasetMap.get(chosenIndex);
         // Determine probe and index subtrees based on chosen index.
         OptimizableOperatorSubTree indexSubTree = null;
         OptimizableOperatorSubTree probeSubTree = null;
-        if (dataset.getName().equals(leftSubTree.datasetDecl.getName())) {
+        if (dataset.getDatasetName().equals(leftSubTree.dataset.getDatasetName())) {
             indexSubTree = leftSubTree;
             probeSubTree = rightSubTree;
-        } else if (dataset.getName().equals(rightSubTree.datasetDecl.getName())) {
+        } else if (dataset.getDatasetName().equals(rightSubTree.dataset.getDatasetName())) {
             indexSubTree = rightSubTree;
             probeSubTree = leftSubTree;
         }
@@ -454,7 +454,7 @@
         return true;
     }
     
-    private IOptimizableFuncExpr chooseOptFuncExpr(AqlCompiledIndexDecl chosenIndex, AccessMethodAnalysisContext analysisCtx) {
+    private IOptimizableFuncExpr chooseOptFuncExpr(Index chosenIndex, AccessMethodAnalysisContext analysisCtx) {
         // TODO: We can probably do something smarter here.
         // Pick the first expr optimizable by this index.
         List<Integer> indexExprs = analysisCtx.getIndexExprs(chosenIndex);
@@ -462,7 +462,7 @@
         return analysisCtx.matchedFuncExprs.get(firstExprIndex);
     }
 
-    private Mutable<ILogicalOperator> createPanicNestedLoopJoinPlan(Mutable<ILogicalOperator> joinRef, OptimizableOperatorSubTree indexSubTree, OptimizableOperatorSubTree probeSubTree, IOptimizableFuncExpr optFuncExpr, AqlCompiledIndexDecl chosenIndex, IOptimizationContext context) throws AlgebricksException {
+    private Mutable<ILogicalOperator> createPanicNestedLoopJoinPlan(Mutable<ILogicalOperator> joinRef, OptimizableOperatorSubTree indexSubTree, OptimizableOperatorSubTree probeSubTree, IOptimizableFuncExpr optFuncExpr, Index chosenIndex, IOptimizationContext context) throws AlgebricksException {
         LogicalVariable inputSearchVar = getInputSearchVar(optFuncExpr, indexSubTree);
         
         // We split the plan into two "branches", and add selections on each side.
@@ -487,7 +487,6 @@
         LogicalOperatorDeepCopyVisitor deepCopyVisitor = new LogicalOperatorDeepCopyVisitor(counter);
         ILogicalOperator scanSubTree = deepCopyVisitor.deepCopy(indexSubTree.root, null);        
         context.setVarCounter(counter.get());
-        //context.computeAndSetTypeEnvironmentForOperator(scanSubTree);
         
         List<LogicalVariable> copyLiveVars = new ArrayList<LogicalVariable>();
         VariableUtilities.getLiveVariables(scanSubTree, copyLiveVars);
@@ -516,7 +515,7 @@
         return isFilterableSelectOpRef;
     }
     
-    private void createIsFilterableSelectOps(ILogicalOperator inputOp, LogicalVariable inputSearchVar, IAType inputSearchVarType, IOptimizableFuncExpr optFuncExpr, AqlCompiledIndexDecl chosenIndex, IOptimizationContext context, Mutable<ILogicalOperator> isFilterableSelectOpRef, Mutable<ILogicalOperator> isNotFilterableSelectOpRef) throws AlgebricksException {        
+    private void createIsFilterableSelectOps(ILogicalOperator inputOp, LogicalVariable inputSearchVar, IAType inputSearchVarType, IOptimizableFuncExpr optFuncExpr, Index chosenIndex, IOptimizationContext context, Mutable<ILogicalOperator> isFilterableSelectOpRef, Mutable<ILogicalOperator> isNotFilterableSelectOpRef) throws AlgebricksException {        
         // Create select operator for removing tuples that are not filterable.
         // First determine the proper filter function and args based on the type of the input search var.
         ILogicalExpression isFilterableExpr = null;
@@ -616,7 +615,7 @@
     }
 
     @Override
-    public boolean exprIsOptimizable(AqlCompiledIndexDecl index, IOptimizableFuncExpr optFuncExpr) {
+    public boolean exprIsOptimizable(Index index, IOptimizableFuncExpr optFuncExpr) {
         if (optFuncExpr.getFuncExpr().getFunctionIdentifier() == AsterixBuiltinFunctions.EDIT_DISTANCE_CHECK) {
             // Must be for a join query.
             if (optFuncExpr.getNumConstantVals() == 1) {
@@ -631,7 +630,7 @@
             AInt32 edThresh = (AInt32) intObj;
             int mergeThreshold = 0;
             // We can only optimize edit distance on strings using an ngram index.
-            if (listOrStrObj.getType().getTypeTag() == ATypeTag.STRING && index.getKind() == IndexKind.NGRAM_INVIX) {
+            if (listOrStrObj.getType().getTypeTag() == ATypeTag.STRING && index.getIndexType() == IndexType.NGRAM_INVIX) {
                 AString astr = (AString) listOrStrObj;                    
                 // Compute merge threshold.
                 mergeThreshold = (astr.getStringValue().length() + index.getGramLength() - 1)
@@ -639,7 +638,7 @@
             }
             // We can only optimize edit distance on lists using a word index.
             if ((listOrStrObj.getType().getTypeTag() == ATypeTag.ORDEREDLIST || listOrStrObj.getType().getTypeTag() == ATypeTag.UNORDEREDLIST)
-                    && index.getKind() == IndexKind.WORD_INVIX) {
+                    && index.getIndexType() == IndexType.WORD_INVIX) {
                 IACollection alist = (IACollection) listOrStrObj;        
                 // Compute merge threshold.
                 mergeThreshold = alist.size() - edThresh.getIntegerValue();
@@ -665,11 +664,11 @@
                 AbstractFunctionCallExpression nonConstfuncExpr = (AbstractFunctionCallExpression) nonConstArg;
                 // We can use this index if the tokenization function matches the index type.
                 if (nonConstfuncExpr.getFunctionIdentifier() == AsterixBuiltinFunctions.WORD_TOKENS &&
-                        index.getKind() == IndexKind.WORD_INVIX) {
+                        index.getIndexType() == IndexType.WORD_INVIX) {
                     return true;
                 }
                 if (nonConstfuncExpr.getFunctionIdentifier() == AsterixBuiltinFunctions.GRAM_TOKENS &&
-                        index.getKind() == IndexKind.NGRAM_INVIX) {
+                        index.getIndexType() == IndexType.NGRAM_INVIX) {
                     return true;
                 }
             }
@@ -682,7 +681,7 @@
         }
         // We can only optimize contains with ngram indexes.
         if (optFuncExpr.getFuncExpr().getFunctionIdentifier() == AsterixBuiltinFunctions.CONTAINS
-                && index.getKind() == IndexKind.NGRAM_INVIX) {
+                && index.getIndexType() == IndexType.NGRAM_INVIX) {
             // Check that the constant search string has at least gramLength characters.
             AsterixConstantValue strConstVal = (AsterixConstantValue) optFuncExpr.getConstantVal(0);
             IAObject strObj = strConstVal.getObject();
@@ -733,9 +732,9 @@
         return AqlTypeTraitProvider.INSTANCE.getTypeTrait(type);
     }
     
-    public static IBinaryTokenizerFactory getBinaryTokenizerFactory(SearchModifierType searchModifierType, ATypeTag searchKeyType, AqlCompiledIndexDecl index)
-            throws AlgebricksException {
-        switch (index.getKind()) {
+    public static IBinaryTokenizerFactory getBinaryTokenizerFactory(SearchModifierType searchModifierType,
+            ATypeTag searchKeyType, Index index) throws AlgebricksException {
+        switch (index.getIndexType()) {
             case WORD_INVIX: {
                 return AqlBinaryTokenizerFactoryProvider.INSTANCE.getWordTokenizerFactory(searchKeyType, false);
             }
@@ -746,20 +745,20 @@
                         index.getGramLength(), prePost, false);
             }
             default: {
-                throw new AlgebricksException("Tokenizer not applicable to index kind '" + index.getKind() + "'.");
+                throw new AlgebricksException("Tokenizer not applicable to index kind '" + index.getIndexType() + "'.");
             }
         }
     }
     
-    public static IBinaryTokenizerFactory getBinaryTokenizerFactory(ATypeTag keyType, IndexType indexType, int gramLength)
-            throws AlgebricksException {
+    public static IBinaryTokenizerFactory getBinaryTokenizerFactory(ATypeTag keyType, IndexType indexType,
+            int gramLength) throws AlgebricksException {
         switch (indexType) {
             case WORD_INVIX: {
                 return AqlBinaryTokenizerFactoryProvider.INSTANCE.getWordTokenizerFactory(keyType, false);
             }
             case NGRAM_INVIX: {
-                return AqlBinaryTokenizerFactoryProvider.INSTANCE.getNGramTokenizerFactory(keyType,
-                        gramLength, true, false);
+                return AqlBinaryTokenizerFactoryProvider.INSTANCE.getNGramTokenizerFactory(keyType, gramLength, true,
+                        false);
             }
             default: {
                 throw new AlgebricksException("Tokenizer not applicable to index type '" + indexType + "'.");
@@ -767,8 +766,9 @@
         }
     }
     
-    public static IInvertedIndexSearchModifierFactory getSearchModifierFactory(SearchModifierType searchModifierType, IAObject simThresh, AqlCompiledIndexDecl index) throws AlgebricksException {
-        switch(searchModifierType) {
+    public static IInvertedIndexSearchModifierFactory getSearchModifierFactory(SearchModifierType searchModifierType,
+            IAObject simThresh, Index index) throws AlgebricksException {
+        switch (searchModifierType) {
             case CONJUNCTIVE: {
                 return new ConjunctiveSearchModifierFactory();
             }
@@ -778,7 +778,7 @@
             }
             case EDIT_DISTANCE: {
                 int edThresh = ((AInt32) simThresh).getIntegerValue();
-                switch (index.getKind()) {
+                switch (index.getIndexType()) {
                     case NGRAM_INVIX: {
                         // Edit distance on strings, filtered with overlapping grams.
                         return new EditDistanceSearchModifierFactory(index.getGramLength(), edThresh);
@@ -788,7 +788,8 @@
                         return new ListEditDistanceSearchModifierFactory(edThresh);
                     }
                     default: {
-                        throw new AlgebricksException("Incompatible search modifier '" + searchModifierType + "' for index type '" + index.getKind() + "'");
+                        throw new AlgebricksException("Incompatible search modifier '" + searchModifierType
+                                + "' for index type '" + index.getIndexType() + "'");
                     }
                 }
             }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexJobGenParams.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexJobGenParams.java
index fa5b5e1..824ea0f 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexJobGenParams.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexJobGenParams.java
@@ -6,7 +6,7 @@
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl.IndexKind;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
 import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexAccessMethod.SearchModifierType;
@@ -30,8 +30,8 @@
     public InvertedIndexJobGenParams() {
     }
     
-    public InvertedIndexJobGenParams(String indexName, IndexKind indexKind, String datasetName, boolean retainInput, boolean requiresBroadcast) {
-        super(indexName, indexKind, datasetName, retainInput, requiresBroadcast);
+    public InvertedIndexJobGenParams(String indexName, IndexType indexType, String datasetName, boolean retainInput, boolean requiresBroadcast) {
+        super(indexName, indexType, datasetName, retainInput, requiresBroadcast);
     }
     
     public void setSearchModifierType(SearchModifierType searchModifierType) {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
index dc4f8fc..e54c36d 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
@@ -6,9 +6,9 @@
 import org.apache.commons.lang3.mutable.Mutable;
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -34,7 +34,7 @@
     public Mutable<ILogicalOperator> dataSourceScanRef = null;
     public DataSourceScanOperator dataSourceScan = null;
     // Dataset and type metadata. Set in setDatasetAndTypeMetadata().
-    public AqlCompiledDatasetDecl datasetDecl = null;
+    public Dataset dataset = null;
     public ARecordType recordType = null;    
     
     public boolean initFromSubTree(Mutable<ILogicalOperator> subTreeOpRef) {
@@ -92,15 +92,15 @@
             return false;
         }
         AqlCompiledMetadataDeclarations metadata = metadataProvider.getMetadataDeclarations();
-        datasetDecl = metadata.findDataset(datasetName);
-        if (datasetDecl == null) {
+        dataset = metadata.findDataset(datasetName);
+        if (dataset == null) {
             throw new AlgebricksException("No metadata for dataset " + datasetName);
         }
-        if (datasetDecl.getDatasetType() != DatasetType.INTERNAL && datasetDecl.getDatasetType() != DatasetType.FEED) {
+        if (dataset.getType() != DatasetType.INTERNAL && dataset.getType() != DatasetType.FEED) {
             return false;
         }
         // Get the record type for that dataset.
-        IAType itemType = metadata.findType(datasetDecl.getItemTypeName());
+        IAType itemType = metadata.findType(dataset.getDatatypeName());
         if (itemType.getTypeTag() != ATypeTag.RECORD) {
             return false;
         }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
index ac22095..aaa359d 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
@@ -7,9 +7,10 @@
 import org.apache.commons.lang3.mutable.MutableObject;
 
 import edu.uci.ics.asterix.aql.util.FunctionUtils;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl.IndexKind;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.om.base.AInt32;
 import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
@@ -64,13 +65,13 @@
 
     @Override
     public boolean applySelectPlanTransformation(Mutable<ILogicalOperator> selectRef, OptimizableOperatorSubTree subTree,
-            AqlCompiledIndexDecl chosenIndex, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context)
+            Index index, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context)
             throws AlgebricksException {
-        AqlCompiledDatasetDecl datasetDecl = subTree.datasetDecl;
+        Dataset dataset = subTree.dataset;
         ARecordType recordType = subTree.recordType;
         // TODO: We can probably do something smarter here based on selectivity or MBR area.
         // Pick the first expr optimizable by this index.
-        List<Integer> indexExprs = analysisCtx.getIndexExprs(chosenIndex);
+        List<Integer> indexExprs = analysisCtx.getIndexExprs(index);
         int firstExprIndex = indexExprs.get(0);
         IOptimizableFuncExpr optFuncExpr = analysisCtx.matchedFuncExprs.get(firstExprIndex);
         
@@ -83,7 +84,8 @@
         
         DataSourceScanOperator dataSourceScan = subTree.dataSourceScan;
         // TODO: For now retainInput and requiresBroadcast are always false.
-        RTreeJobGenParams jobGenParams = new RTreeJobGenParams(chosenIndex.getIndexName(), IndexKind.RTREE, datasetDecl.getName(), false, false);
+        RTreeJobGenParams jobGenParams = new RTreeJobGenParams(index.getIndexName(), IndexType.RTREE,
+                dataset.getDatasetName(), false, false);
         // A spatial object is serialized in the constant of the func expr we are optimizing.
         // The R-Tree expects as input an MBR represented with 1 field per dimension. 
         // Here we generate vars and funcs for extracting MBR fields from the constant into fields of a tuple (as the R-Tree expects them).
@@ -117,11 +119,11 @@
         assignSearchKeys.getInputs().add(dataSourceScan.getInputs().get(0));
         assignSearchKeys.setExecutionMode(dataSourceScan.getExecutionMode());
 
-        UnnestMapOperator secondaryIndexUnnestOp = AccessMethodUtils.createSecondaryIndexUnnestMap(datasetDecl,
-                recordType, chosenIndex, assignSearchKeys, jobGenParams, context, false, false);
+        UnnestMapOperator secondaryIndexUnnestOp = AccessMethodUtils.createSecondaryIndexUnnestMap(dataset,
+                recordType, index, assignSearchKeys, jobGenParams, context, false, false);
         // Generate the rest of the upstream plan which feeds the search results into the primary index.
         UnnestMapOperator primaryIndexUnnestOp = AccessMethodUtils.createPrimaryIndexUnnestMap(dataSourceScan,
-                datasetDecl, recordType, secondaryIndexUnnestOp, context, true, false, false);
+                dataset, recordType, secondaryIndexUnnestOp, context, true, false, false);
         // Replace the datasource scan with the new plan rooted at primaryIndexUnnestMap.
         subTree.dataSourceScanRef.setValue(primaryIndexUnnestOp);
         return true;
@@ -130,14 +132,14 @@
     @Override
     public boolean applyJoinPlanTransformation(Mutable<ILogicalOperator> joinRef,
             OptimizableOperatorSubTree leftSubTree, OptimizableOperatorSubTree rightSubTree,
-            AqlCompiledIndexDecl chosenIndex, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context)
+            Index chosenIndex, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context)
             throws AlgebricksException {
         // TODO Implement this.
         return false;
     }
     
     @Override
-    public boolean exprIsOptimizable(AqlCompiledIndexDecl index, IOptimizableFuncExpr optFuncExpr) {
+    public boolean exprIsOptimizable(Index index, IOptimizableFuncExpr optFuncExpr) {
         // No additional analysis required.
         return true;
     }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeJobGenParams.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeJobGenParams.java
index 2d850b2..99d5223 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeJobGenParams.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeJobGenParams.java
@@ -5,7 +5,7 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl.IndexKind;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 
@@ -20,9 +20,10 @@
     public RTreeJobGenParams() {
     }
     
-    public RTreeJobGenParams(String indexName, IndexKind indexKind, String datasetName, boolean retainInput, boolean requiresBroadcast) {
+    // TODO: Call super c'tor.
+    public RTreeJobGenParams(String indexName, IndexType indexType, String datasetName, boolean retainInput, boolean requiresBroadcast) {
         this.indexName = indexName;
-        this.indexKind = indexKind;
+        this.indexType = indexType;
         this.datasetName = datasetName;
         this.retainInput = retainInput;
         this.requiresBroadcast = requiresBroadcast;
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java
index b1970ac..2f483d8 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java
@@ -80,8 +80,6 @@
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
 import edu.uci.ics.asterix.metadata.bootstrap.AsterixProperties;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledInternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
 import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
 import edu.uci.ics.asterix.metadata.declared.AqlLogicalPlanAndMetadataImpl;
@@ -89,6 +87,8 @@
 import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
 import edu.uci.ics.asterix.metadata.declared.FileSplitDataSink;
 import edu.uci.ics.asterix.metadata.declared.FileSplitSinkId;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.base.AInt32;
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
@@ -218,26 +218,23 @@
             topOp.getInputs().add(new MutableObject<ILogicalOperator>(project));
         } else {
             String dataVerseName = compiledDeclarations.getDataverseName();
-            AqlCompiledDatasetDecl adecl = compiledDeclarations.findDataset(outputDatasetName);
-            if (adecl == null) {
+            Dataset dataset = compiledDeclarations.findDataset(outputDatasetName);
+            if (dataset == null) {
                 throw new AlgebricksException("Cannot find dataset " + outputDatasetName);
             }
 
             AqlSourceId sourceId = new AqlSourceId(dataVerseName, outputDatasetName);
-            String itemTypeName = adecl.getItemTypeName();
+            String itemTypeName = dataset.getDatatypeName();
             IAType itemType = compiledDeclarations.findType(itemTypeName);
-            AqlDataSource dataSource = new AqlDataSource(sourceId, adecl, itemType);
-
-            if (adecl.getDatasetType() == DatasetType.EXTERNAL) {
+            AqlDataSource dataSource = new AqlDataSource(sourceId, dataset, itemType);
+            if (dataset.getType() == DatasetType.EXTERNAL) {
                 throw new AlgebricksException("Cannot write output to an external dataset.");
             }
             ArrayList<LogicalVariable> vars = new ArrayList<LogicalVariable>();
             ArrayList<Mutable<ILogicalExpression>> exprs = new ArrayList<Mutable<ILogicalExpression>>();
             List<Mutable<ILogicalExpression>> varRefsForLoading = new ArrayList<Mutable<ILogicalExpression>>();
 
-            AqlCompiledInternalDatasetDetails datasetDetails = (AqlCompiledInternalDatasetDetails) adecl
-                    .getAqlCompiledDatasetDetails();
-            List<String> partitionKeys = datasetDetails.getPartitioningExprs();
+            List<String> partitionKeys = DatasetUtils.getPartitioningKeys(dataset);
             for (String keyFieldName : partitionKeys) {
                 IFunctionInfo finfoAccess = AsterixBuiltinFunctions
                         .getAsterixFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME);
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlPlusExpressionToPlanTranslator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlPlusExpressionToPlanTranslator.java
index e6aaaad..9fb3d97 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlPlusExpressionToPlanTranslator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlPlusExpressionToPlanTranslator.java
@@ -81,12 +81,12 @@
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
 import edu.uci.ics.asterix.formats.base.IDataFormat;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
 import edu.uci.ics.asterix.metadata.declared.AqlLogicalPlanAndMetadataImpl;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.declared.FileSplitDataSink;
 import edu.uci.ics.asterix.metadata.declared.FileSplitSinkId;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.base.AInt32;
 import edu.uci.ics.asterix.om.base.AString;
@@ -94,6 +94,7 @@
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.AsterixFunction;
 import edu.uci.ics.asterix.om.functions.AsterixFunctionInfo;
+import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -254,19 +255,21 @@
             topOp = new WriteOperator(writeExprList, sink);
             topOp.getInputs().add(new MutableObject<ILogicalOperator>(project));
         } else {
-            AqlCompiledDatasetDecl adecl = compiledDeclarations.findDataset(outputDatasetName);
-            if (adecl == null) {
+            Dataset dataset = compiledDeclarations.findDataset(outputDatasetName);
+            if (dataset == null) {
                 throw new AlgebricksException("Cannot find dataset " + outputDatasetName);
             }
-
-            if (adecl.getDatasetType() == DatasetType.EXTERNAL) {
+            if (dataset.getType() == DatasetType.EXTERNAL) {
                 throw new AlgebricksException("Cannot write output to an external dataset.");
-            }
+            }            
+            ARecordType itemType = (ARecordType) compiledDeclarations.findType(dataset.getDatatypeName());
+            List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
             ArrayList<LogicalVariable> vars = new ArrayList<LogicalVariable>();
             ArrayList<Mutable<ILogicalExpression>> exprs = new ArrayList<Mutable<ILogicalExpression>>();
             List<Mutable<ILogicalExpression>> varRefsForLoading = new ArrayList<Mutable<ILogicalExpression>>();
-            for (Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType> partitioner : DatasetUtils
-                    .getPartitioningFunctions(adecl)) {
+            for (String partitioningKey : partitioningKeys) {
+                Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType> partitioner =
+                        format.partitioningEvaluatorFactory(itemType, partitioningKey);
                 AbstractFunctionCallExpression f = partitioner.second.cloneExpression();
                 f.substituteVar(METADATA_DUMMY_VAR, resVar);
                 exprs.add(new MutableObject<ILogicalExpression>(f));
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/DmlTranslator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/DmlTranslator.java
index d0e9626..0a5c516 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/DmlTranslator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/DmlTranslator.java
@@ -35,7 +35,6 @@
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
@@ -461,11 +460,11 @@
                 clauseList.add(dieClause);
             }
 
-            AqlCompiledDatasetDecl aqlDataset = compiledDeclarations.findDataset(datasetName);
-            if (aqlDataset == null) {
+            Dataset dataset = compiledDeclarations.findDataset(datasetName);
+            if (dataset == null) {
                 throw new AlgebricksException("Unknown dataset " + datasetName);
             }
-            String itemTypeName = aqlDataset.getItemTypeName();
+            String itemTypeName = dataset.getDatatypeName();
             IAType itemType = compiledDeclarations.findType(itemTypeName);
             ARecordType recType = (ARecordType) itemType;
             String[] fieldNames = recType.getFieldNames();
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index 5aa0f6e..a194265 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -27,11 +27,11 @@
 import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.formats.base.IDataFormat;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledExternalDatasetDetails;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -76,6 +76,7 @@
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
+// TODO: Lots of common code in this file. Clean it up!
 public class DatasetOperations {
 
     private static final PhysicalOptimizationConfig physicalOptimizationConfig = OptimizationConfUtil
@@ -95,32 +96,37 @@
         IIndexRegistryProvider<IIndex> indexRegistryProvider = AsterixIndexRegistryProvider.INSTANCE;
         IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
 
-        AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
-        if (adecl == null) {
+        Dataset dataset = metadata.findDataset(datasetName);
+        if (dataset == null) {
             throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName);
         }
-        if (adecl.getDatasetType() == DatasetType.EXTERNAL) {
+        if (dataset.getType() == DatasetType.EXTERNAL) {
             return new JobSpecification[0];
         }
 
-        List<AqlCompiledIndexDecl> secondaryIndexes = DatasetUtils.getSecondaryIndexes(adecl);
-
+        List<Index> datasetIndexes = metadata.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName());
+        int numSecondaryIndexes = 0;
+        for (Index index : datasetIndexes) {
+        	if (index.isSecondaryIndex()) {
+        		numSecondaryIndexes++;
+        	}
+        }
         JobSpecification[] specs;
-
-        if (secondaryIndexes != null && !secondaryIndexes.isEmpty()) {
-            int n = secondaryIndexes.size();
-            specs = new JobSpecification[n + 1];
+        if (numSecondaryIndexes > 0) {
+            specs = new JobSpecification[numSecondaryIndexes + 1];
             int i = 0;
             // First, drop secondary indexes.
-            for (AqlCompiledIndexDecl acid : secondaryIndexes) {
-                specs[i] = new JobSpecification();
-                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadata
-                        .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, acid.getIndexName());
-                TreeIndexDropOperatorDescriptor secondaryBtreeDrop = new TreeIndexDropOperatorDescriptor(specs[i],
-                        storageManager, indexRegistryProvider, idxSplitsAndConstraint.first);
-                AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop,
-                        idxSplitsAndConstraint.second);
-                i++;
+            for (Index index : datasetIndexes) {
+            	if (index.isSecondaryIndex()) {
+            		specs[i] = new JobSpecification();
+            		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadata
+            				.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, index.getIndexName());
+            		TreeIndexDropOperatorDescriptor secondaryBtreeDrop = new TreeIndexDropOperatorDescriptor(specs[i],
+            				storageManager, indexRegistryProvider, idxSplitsAndConstraint.first);
+            		AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop,
+            				idxSplitsAndConstraint.second);
+            		i++;
+            	}
             }
         } else {
             specs = new JobSpecification[1];
@@ -143,14 +149,15 @@
     // TODO: Lots of common code in this file. Refactor everything after merging in asterix-fix-issue-9.
     public static JobSpecification createDatasetJobSpec(String datasetName, AqlCompiledMetadataDeclarations metadata)
             throws AsterixException, AlgebricksException {
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
+        Dataset dataset = metadata.findDataset(datasetName);
+        if (dataset == null) {
             throw new AsterixException("Could not find dataset " + datasetName);
         }
+        ARecordType itemType = (ARecordType) metadata.findType(dataset.getDatatypeName());
         JobSpecification spec = new JobSpecification();
         IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                compiledDatasetDecl, metadata.getFormat().getBinaryComparatorFactoryProvider());
-        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
+                dataset, itemType, metadata.getFormat().getBinaryComparatorFactoryProvider());
+        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
         FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
@@ -173,46 +180,44 @@
     public static Job createLoadDatasetJobSpec(CompiledLoadFromFileStatement loadStmt,
             AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
         String datasetName = loadStmt.getDatasetName();
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
+        Dataset dataset = metadata.findDataset(datasetName);
+        if (dataset == null) {
             throw new AsterixException("Could not find dataset " + datasetName);
         }
-        if (compiledDatasetDecl.getDatasetType() != DatasetType.INTERNAL
-                && compiledDatasetDecl.getDatasetType() != DatasetType.FEED) {
+        if (dataset.getType() != DatasetType.INTERNAL
+                && dataset.getType() != DatasetType.FEED) {
             throw new AsterixException("Cannot load data into dataset  (" + datasetName + ")" + "of type "
-                    + compiledDatasetDecl.getDatasetType());
+                    + dataset.getType());
         }
         JobSpecification spec = new JobSpecification();
 
-        ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
+        ARecordType itemType = (ARecordType) metadata.findType(dataset.getDatatypeName());
         IDataFormat format = metadata.getFormat();
         ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
 
         IBinaryHashFunctionFactory[] hashFactories = DatasetUtils.computeKeysBinaryHashFunFactories(
-                compiledDatasetDecl, metadata.getFormat().getBinaryHashFunctionFactoryProvider());
+                dataset, itemType, metadata.getFormat().getBinaryHashFunctionFactoryProvider());
         IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                compiledDatasetDecl, metadata.getFormat().getBinaryComparatorFactoryProvider());
-        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
+                dataset, itemType, metadata.getFormat().getBinaryComparatorFactoryProvider());
+        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
 
-        AqlCompiledExternalDatasetDetails externalDatasetDetails = new AqlCompiledExternalDatasetDetails(
+        ExternalDatasetDetails externalDatasetDetails = new ExternalDatasetDetails(
                 loadStmt.getAdapter(), loadStmt.getProperties());
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = AqlMetadataProvider
                 .buildExternalDataScannerRuntime(spec, itemType, externalDatasetDetails, format);
         IOperatorDescriptor scanner = p.first;
         AlgebricksPartitionConstraint scannerPc = p.second;
-        RecordDescriptor recDesc = computePayloadKeyRecordDescriptor(compiledDatasetDecl, payloadSerde,
+        RecordDescriptor recDesc = computePayloadKeyRecordDescriptor(dataset, itemType, payloadSerde,
                 metadata.getFormat());
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, scanner, scannerPc);
 
-        AssignRuntimeFactory assign = makeAssignRuntimeFactory(compiledDatasetDecl);
+        AssignRuntimeFactory assign = makeAssignRuntimeFactory(dataset, itemType, metadata.getFormat());
         AlgebricksMetaOperatorDescriptor asterixOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
                 new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { recDesc });
 
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixOp, scannerPc);
 
-        List<Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
-                .getPartitioningFunctions(compiledDatasetDecl);
-        int numKeys = partitioningFunctions.size();
+        int numKeys = DatasetUtils.getPartitioningKeys(dataset).size();
         int[] keys = new int[numKeys];
         for (int i = 0; i < numKeys; i++) {
             keys[i] = i + 1;
@@ -270,14 +275,13 @@
         return fs.getNodeName() + ":" + fs.getLocalFile().toString();
     }
 
-    private static AssignRuntimeFactory makeAssignRuntimeFactory(AqlCompiledDatasetDecl compiledDatasetDecl) {
-        List<Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
-                .getPartitioningFunctions(compiledDatasetDecl);
-        int numKeys = partitioningFunctions.size();
+    private static AssignRuntimeFactory makeAssignRuntimeFactory(Dataset dataset, ARecordType itemType, IDataFormat format) throws AlgebricksException {
+        List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+        int numKeys = partitioningKeys.size();
         ICopyEvaluatorFactory[] evalFactories = new ICopyEvaluatorFactory[numKeys];
         for (int i = 0; i < numKeys; i++) {
-            Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType = partitioningFunctions
-                    .get(i);
+            Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType = 
+                    format.partitioningEvaluatorFactory(itemType, partitioningKeys.get(i));
             evalFactories[i] = evalFactoryAndType.first;
         }
         int[] outColumns = new int[numKeys];
@@ -296,21 +300,18 @@
         return new AssignRuntimeFactory(outColumns, sefs, projectionList);
     }
 
-    private static RecordDescriptor computePayloadKeyRecordDescriptor(AqlCompiledDatasetDecl compiledDatasetDecl,
+    @SuppressWarnings("rawtypes")
+    private static RecordDescriptor computePayloadKeyRecordDescriptor(Dataset dataset, ARecordType itemType,
             ISerializerDeserializer payloadSerde, IDataFormat dataFormat) throws AlgebricksException {
-        List<Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
-                .getPartitioningFunctions(compiledDatasetDecl);
-        int numKeys = partitioningFunctions.size();
+        List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+        int numKeys = partitioningKeys.size();
         ISerializerDeserializer[] recordFields = new ISerializerDeserializer[1 + numKeys];
         recordFields[0] = payloadSerde;
         for (int i = 0; i < numKeys; i++) {
-            Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType = partitioningFunctions
-                    .get(i);
-            IAType keyType = evalFactoryAndType.third;
+            IAType keyType = itemType.getFieldType(partitioningKeys.get(i));
             ISerializerDeserializer keySerde = dataFormat.getSerdeProvider().getSerializerDeserializer(keyType);
             recordFields[i + 1] = keySerde;
         }
         return new RecordDescriptor(recordFields);
     }
-
 }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
index 4d7cbe2..f94ed34 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
@@ -24,30 +24,18 @@
 import edu.uci.ics.asterix.feed.comm.FeedMessage;
 import edu.uci.ics.asterix.feed.comm.IFeedMessage;
 import edu.uci.ics.asterix.feed.comm.IFeedMessage.MessageType;
-import edu.uci.ics.asterix.formats.base.IDataFormat;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledFeedDatasetDetails;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
-import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
-import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
 import edu.uci.ics.asterix.translator.DmlTranslator.CompiledControlFeedStatement;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
 
 public class FeedOperations {
@@ -77,12 +65,17 @@
 
         LOGGER.info(" DATASETPATH: " + datasetPath);
 
-        AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
-        if (adecl == null) {
+        Dataset dataset;
+        try {
+            dataset = metadata.findDataset(datasetName);
+        } catch (AlgebricksException e) {
+            throw new AsterixException(e);
+        }
+        if (dataset == null) {
             throw new AsterixException("FEED DATASET: No metadata for dataset " + datasetName);
         }
-        if (adecl.getDatasetType() != DatasetType.FEED) {
-            throw new AsterixException("Operation not support for dataset type  " + adecl.getDatasetType());
+        if (dataset.getType() != DatasetType.FEED) {
+            throw new AsterixException("Operation not support for dataset type  " + dataset.getType());
         }
 
         JobSpecification spec = new JobSpecification();
@@ -107,7 +100,7 @@
 
         try {
             Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = AqlMetadataProvider.buildFeedMessengerRuntime(
-                    spec, metadata, (AqlCompiledFeedDatasetDetails) adecl.getAqlCompiledDatasetDetails(),
+                    spec, metadata, (FeedDatasetDetails) dataset.getDatasetDetails(),
                     metadata.getDataverseName(), datasetName, feedMessages);
             feedMessenger = p.first;
             messengerPc = p.second;
@@ -126,54 +119,4 @@
         return spec;
 
     }
-
-    private static AssignRuntimeFactory makeAssignRuntimeFactory(AqlCompiledDatasetDecl compiledDatasetDecl) {
-        List<Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
-                .getPartitioningFunctions(compiledDatasetDecl);
-        int numKeys = partitioningFunctions.size();
-        ICopyEvaluatorFactory[] evalFactories = new ICopyEvaluatorFactory[numKeys];
-
-        int index = 0;
-        for (Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
-            evalFactories[index++] = evalFactoryAndType.first;
-        }
-
-        int[] outColumns = new int[numKeys];
-        int[] projectionList = new int[numKeys + 1];
-        projectionList[0] = 0;
-
-        for (int i = 0; i < numKeys; i++) {
-            outColumns[i] = i + 1;
-            projectionList[i + 1] = i + 1;
-        }
-        IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[evalFactories.length];
-        for (int i = 0; i < evalFactories.length; ++i) {
-            sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
-                    evalFactories[i]);
-        }
-        return new AssignRuntimeFactory(outColumns, sefs, projectionList);
-    }
-
-    @SuppressWarnings("unchecked")
-    private static RecordDescriptor computePayloadKeyRecordDescriptor(AqlCompiledDatasetDecl compiledDatasetDecl,
-            ISerializerDeserializer payloadSerde, IDataFormat dataFormat) throws AlgebricksException {
-
-        List<Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
-                .getPartitioningFunctions(compiledDatasetDecl);
-        int numKeys = partitioningFunctions.size();
-        ISerializerDeserializer[] recordFields = new ISerializerDeserializer[1 + numKeys];
-        recordFields[0] = payloadSerde;
-        int index = 0;
-        for (Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
-            IAType keyType = evalFactoryAndType.third;
-            ISerializerDeserializer keySerde = dataFormat.getSerdeProvider().getSerializerDeserializer(keyType);
-            recordFields[index + 1] = keySerde;
-            index++;
-        }
-        return new RecordDescriptor(recordFields);
-    }
-
-    private static String stringOf(FileSplit fs) {
-        return fs.getNodeName() + ":" + fs.getLocalFile().toString();
-    }
 }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index c1be9f9..316f46e 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -12,9 +12,9 @@
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
 import edu.uci.ics.asterix.metadata.MetadataException;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -26,9 +26,7 @@
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
@@ -68,7 +66,7 @@
     protected int numSecondaryKeys;
     protected AqlCompiledMetadataDeclarations metadata;
     protected String datasetName;
-    protected AqlCompiledDatasetDecl datasetDecl;
+    protected Dataset dataset;
     protected ARecordType itemType;
     protected ISerializerDeserializer payloadSerde;
     protected IFileSplitProvider primaryFileSplitProvider;
@@ -124,16 +122,16 @@
         this.metadata = metadata;
         datasetName = createIndexStmt.getDatasetName();
         secondaryIndexName = createIndexStmt.getIndexName();
-        datasetDecl = metadata.findDataset(datasetName);
-        if (datasetDecl == null) {
+        dataset = metadata.findDataset(datasetName);
+        if (dataset == null) {
             throw new AsterixException("Unknown dataset " + datasetName);
         }
-        if (datasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
+        if (dataset.getType() == DatasetType.EXTERNAL) {
             throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
         }
-        itemType = (ARecordType) metadata.findType(datasetDecl.getItemTypeName());
+        itemType = (ARecordType) metadata.findType(dataset.getDatatypeName());
         payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
-        numPrimaryKeys = DatasetUtils.getPartitioningFunctions(datasetDecl).size();
+        numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
         numSecondaryKeys = createIndexStmt.getKeyFields().size();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadata
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
@@ -149,22 +147,18 @@
     }
 
     protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
-        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(datasetDecl).size();
+        List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+        int numPrimaryKeys = partitioningKeys.size();
         ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
         ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
         primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
-        ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
-        List<Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
-                .getPartitioningFunctions(datasetDecl);
-        int i = 0;
-        for (Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
-            IAType keyType = evalFactoryAndType.third;
-            ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
-            primaryRecFields[i] = keySerde;
+        ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();        
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            IAType keyType = itemType.getFieldType(partitioningKeys.get(i));
+            primaryRecFields[i] = serdeProvider.getSerializerDeserializer(keyType);
             primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
                     keyType, true);
             primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            ++i;
         }
         primaryRecFields[numPrimaryKeys] = payloadSerde;
         primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
index 756fab4..9bd0ea1 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
@@ -35,8 +35,6 @@
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
-import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
-import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
@@ -157,8 +155,13 @@
         return type.getDatatype();
     }
 
-    public List<String> findNodeGroupNodeNames(String nodeGroupName) throws AlgebricksException, MetadataException {
-        NodeGroup ng = metadataManager.getNodegroup(mdTxnCtx, nodeGroupName);
+    public List<String> findNodeGroupNodeNames(String nodeGroupName) throws AlgebricksException {
+        NodeGroup ng;
+        try {
+            ng = metadataManager.getNodegroup(mdTxnCtx, nodeGroupName);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
         if (ng == null) {
             throw new AlgebricksException("No node group with this name " + nodeGroupName);
         }
@@ -173,8 +176,15 @@
         return stores;
     }
 
-    public AqlCompiledDatasetDecl findDataset(String datasetName) {
+    public Dataset findDataset(String datasetName) throws AlgebricksException {
         try {
+            return metadataManager.getDataset(mdTxnCtx, dataverseName, datasetName);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
+    	
+        /*
+    	try {
             Dataset datasetRecord = this.metadataManager.getDataset(mdTxnCtx, dataverseName, datasetName);
             if (datasetRecord == null) {
                 return null;
@@ -232,6 +242,7 @@
         } catch (Exception e) {
             throw new IllegalStateException(e);
         }
+        */
     }
 
     private IndexKind getIndexKindFromType(IndexType type) {
@@ -248,6 +259,32 @@
         return null;
     }
 
+    public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
+    	try {
+            return metadataManager.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+    
+    // TODO: Check if this is correct. Not sure what the index name of the primary index is.
+    // TODO: Rename this to getPrimaryIndex().
+    public Index getDatasetPrimaryIndex(String dataverseName, String datasetName) throws AlgebricksException {
+        try {
+            return metadataManager.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    public Index getIndex(String dataverseName, String datasetName, String indexName) throws AlgebricksException {
+        try {
+            return metadataManager.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+    
     public void setOutputFile(FileSplit outputFile) {
         this.outputFile = outputFile;
     }
@@ -269,7 +306,7 @@
     }
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
-            String datasetName, String targetIdxName) throws AlgebricksException, MetadataException {
+            String datasetName, String targetIdxName) throws AlgebricksException {
         FileSplit[] splits = splitsForInternalOrFeedDataset(datasetName, targetIdxName);
         IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
         String[] loc = new String[splits.length];
@@ -296,23 +333,20 @@
     }
 
     private FileSplit[] splitsForInternalOrFeedDataset(String datasetName, String targetIdxName)
-            throws AlgebricksException, MetadataException {
+            throws AlgebricksException {
 
         File relPathFile = new File(getRelativePath(datasetName + "_idx_" + targetIdxName));
-        AqlCompiledDatasetDecl adecl = findDataset(datasetName);
-        if (adecl.getDatasetType() != DatasetType.INTERNAL & adecl.getDatasetType() != DatasetType.FEED) {
+        Dataset dataset = findDataset(datasetName);
+        if (dataset.getType() != DatasetType.INTERNAL & dataset.getType() != DatasetType.FEED) {
             throw new AlgebricksException("Not an internal or feed dataset");
         }
-        AqlCompiledInternalDatasetDetails compiledDatasetDetails = (AqlCompiledInternalDatasetDetails) adecl
-                .getAqlCompiledDatasetDetails();
-        List<String> nodeGroup = findNodeGroupNodeNames(compiledDatasetDetails.getNodegroupName());
-
+        InternalDatasetDetails datasetDetails = (InternalDatasetDetails) dataset.getDatasetDetails();
+        List<String> nodeGroup = findNodeGroupNodeNames(datasetDetails.getNodeGroupName());
         if (nodeGroup == null) {
-            throw new AlgebricksException("Couldn't find node group " + compiledDatasetDetails.getNodegroupName());
+            throw new AlgebricksException("Couldn't find node group " + datasetDetails.getNodeGroupName());
         }
 
         List<FileSplit> splitArray = new ArrayList<FileSplit>();
-
         for (String nd : nodeGroup) {
             String[] nodeStores = stores.get(nd);
             if (nodeStores == null) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
index 4ae38eb..583985f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
@@ -21,12 +21,13 @@
 import java.util.Set;
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
@@ -41,12 +42,11 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 
 public class AqlDataSource implements IDataSource<AqlSourceId> {
 
     private AqlSourceId id;
-    private AqlCompiledDatasetDecl adecl;
+    private Dataset dataset;
     private IAType[] schemaTypes;
     private INodeDomain domain;
     private AqlDataSourceType datasourceType;
@@ -58,14 +58,14 @@
         EXTERNAL_FEED
     }
 
-    public AqlDataSource(AqlSourceId id, AqlCompiledDatasetDecl adecl, IAType itemType, AqlDataSourceType datasourceType)
+    public AqlDataSource(AqlSourceId id, Dataset dataset, IAType itemType, AqlDataSourceType datasourceType)
             throws AlgebricksException {
         this.id = id;
-        this.adecl = adecl;
+        this.dataset = dataset;
         this.datasourceType = datasourceType;
         switch (datasourceType) {
             case FEED:
-                initFeedDataset(itemType, adecl);
+                initFeedDataset(itemType, dataset);
             case INTERNAL: {
                 initInternalDataset(itemType);
                 break;
@@ -81,12 +81,12 @@
         }
     }
 
-    public AqlDataSource(AqlSourceId id, AqlCompiledDatasetDecl adecl, IAType itemType) throws AlgebricksException {
+    public AqlDataSource(AqlSourceId id, Dataset dataset, IAType itemType) throws AlgebricksException {
         this.id = id;
-        this.adecl = adecl;
-        switch (adecl.getDatasetType()) {
+        this.dataset = dataset;
+        switch (dataset.getType()) {
             case FEED:
-                initFeedDataset(itemType, adecl);
+                initFeedDataset(itemType, dataset);
                 break;
             case INTERNAL:
                 initInternalDataset(itemType);
@@ -101,32 +101,32 @@
         }
     }
 
+    // TODO: Seems like initFeedDataset() could simply call this method.
     private void initInternalDataset(IAType itemType) {
-        List<Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
-                .getPartitioningFunctions(adecl);
-        int n = partitioningFunctions.size();
+        List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+        ARecordType recordType = (ARecordType) itemType;
+        int n = partitioningKeys.size();
         schemaTypes = new IAType[n + 1];
         for (int i = 0; i < n; i++) {
-            schemaTypes[i] = partitioningFunctions.get(i).third;
+            schemaTypes[i] = recordType.getFieldType(partitioningKeys.get(i));
         }
         schemaTypes[n] = itemType;
-        domain = new AsterixNodeGroupDomain(DatasetUtils.getNodegroupName(adecl));
+        domain = new AsterixNodeGroupDomain(DatasetUtils.getNodegroupName(dataset));
     }
 
-    private void initFeedDataset(IAType itemType, AqlCompiledDatasetDecl decl) {
-
-        if (decl.getAqlCompiledDatasetDetails() instanceof AqlCompiledExternalDatasetDetails) {
+    private void initFeedDataset(IAType itemType, Dataset dataset) {
+        if (dataset.getDatasetDetails() instanceof ExternalDatasetDetails) {
             initExternalDataset(itemType);
         } else {
-            List<Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
-                    .getPartitioningFunctions(adecl);
-            int n = partitioningFunctions.size();
+            List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+            int n = partitioningKeys.size();
             schemaTypes = new IAType[n + 1];
+            ARecordType recordType = (ARecordType) itemType;
             for (int i = 0; i < n; i++) {
-                schemaTypes[i] = partitioningFunctions.get(i).third;
+                schemaTypes[i] = recordType.getFieldType(partitioningKeys.get(i));
             }
             schemaTypes[n] = itemType;
-            domain = new AsterixNodeGroupDomain(DatasetUtils.getNodegroupName(adecl));
+            domain = new AsterixNodeGroupDomain(DatasetUtils.getNodegroupName(dataset));
         }
     }
 
@@ -152,8 +152,8 @@
         return id;
     }
 
-    public AqlCompiledDatasetDecl getCompiledDatasetDecl() {
-        return adecl;
+    public Dataset getDataset() {
+        return dataset;
     }
 
     @Override
@@ -170,7 +170,7 @@
 
     @Override
     public IDataSourcePropertiesProvider getPropertiesProvider() {
-        return new AqlDataSourcePartitioningProvider(adecl.getDatasetType(), domain);
+        return new AqlDataSourcePartitioningProvider(dataset.getType(), domain);
     }
 
     @Override
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlIndex.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlIndex.java
index 4a6d6e4..bac7733 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlIndex.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlIndex.java
@@ -15,24 +15,26 @@
 
 package edu.uci.ics.asterix.metadata.declared;
 
+import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
 
 public class AqlIndex implements IDataSourceIndex<String, AqlSourceId> {
 
-    private final AqlCompiledIndexDecl acid;
+    private final Index index;
     private final AqlCompiledMetadataDeclarations acmd;
     private final String datasetName;
 
     // Every transactions needs to work with its own instance of an
     // AqlMetadataProvider.
-    public AqlIndex(AqlCompiledIndexDecl acid, AqlCompiledMetadataDeclarations acmd, String datasetName) {
-        this.acid = acid;
+    public AqlIndex(Index index, AqlCompiledMetadataDeclarations acmd, String datasetName) {
+        this.index = index;
         this.acmd = acmd;
         this.datasetName = datasetName;
     }
 
+    // TODO: Maybe Index can directly implement IDataSourceIndex<String, AqlSourceId>
     @Override
     public IDataSource<AqlSourceId> getDataSource() {
         try {
@@ -45,7 +47,7 @@
 
     @Override
     public String getId() {
-        return acid.getIndexName();
+        return index.getIndexName();
     }
 
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 57b2e72..94834ec 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -19,6 +19,7 @@
 import java.util.List;
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
 import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
 import edu.uci.ics.asterix.common.parse.IParseFileSplitsDecl;
@@ -34,7 +35,10 @@
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl.IndexKind;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.types.ARecordType;
@@ -47,12 +51,10 @@
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
@@ -63,7 +65,6 @@
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
@@ -121,23 +122,23 @@
             List<LogicalVariable> projectVariables, boolean projectPushed, IOperatorSchema opSchema,
             IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)
             throws AlgebricksException {
-        AqlCompiledDatasetDecl adecl = metadata.findDataset(dataSource.getId().getDatasetName());
-        if (adecl == null) {
+    	Dataset dataset = metadata.findDataset(dataSource.getId().getDatasetName());
+        if (dataset == null) {
             throw new AlgebricksException("Unknown dataset " + dataSource.getId().getDatasetName());
         }
-        switch (adecl.getDatasetType()) {
+        switch (dataset.getType()) {
             case FEED:
                 if (dataSource instanceof ExternalFeedDataSource) {
-                    return buildExternalDatasetScan(jobSpec, adecl, dataSource);
+                    return buildExternalDatasetScan(jobSpec, dataset, dataSource);
                 } else {
-                    return buildInternalDatasetScan(jobSpec, scanVariables, opSchema, typeEnv, adecl, dataSource,
+                    return buildInternalDatasetScan(jobSpec, scanVariables, opSchema, typeEnv, dataset, dataSource,
                             context);
                 }
             case INTERNAL: {
-                return buildInternalDatasetScan(jobSpec, scanVariables, opSchema, typeEnv, adecl, dataSource, context);
+                return buildInternalDatasetScan(jobSpec, scanVariables, opSchema, typeEnv, dataset, dataSource, context);
             }
             case EXTERNAL: {
-                return buildExternalDatasetScan(jobSpec, adecl, dataSource);
+                return buildExternalDatasetScan(jobSpec, dataset, dataSource);
             }
             default: {
                 throw new IllegalArgumentException();
@@ -147,18 +148,19 @@
 
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInternalDatasetScan(JobSpecification jobSpec,
             List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
-            AqlCompiledDatasetDecl acedl, IDataSource<AqlSourceId> dataSource, JobGenContext context)
+            Dataset dataset, IDataSource<AqlSourceId> dataSource, JobGenContext context)
             throws AlgebricksException {
         AqlSourceId asid = dataSource.getId();
+        String dataverseName = asid.getDataverseName();
         String datasetName = asid.getDatasetName();
-        String indexName = DatasetUtils.getPrimaryIndex(acedl).getIndexName();
-        return buildBtreeRuntime(jobSpec, outputVars, opSchema, typeEnv, metadata, context, false, datasetName, acedl,
-                indexName, null, null, true, true);
+        Index primaryIndex = metadata.getDatasetPrimaryIndex(dataverseName, datasetName);
+        return buildBtreeRuntime(jobSpec, outputVars, opSchema, typeEnv, metadata, context, false, datasetName, dataset,
+                primaryIndex.getIndexName(), null, null, true, true);
     }
 
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetScan(JobSpecification jobSpec,
-            AqlCompiledDatasetDecl acedl, IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
-        String itemTypeName = acedl.getItemTypeName();
+            Dataset dataset, IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
+        String itemTypeName = dataset.getDatatypeName();
         IAType itemType;
         try {
             itemType = metadata.findType(itemTypeName);
@@ -167,28 +169,27 @@
         }
 
         if (dataSource instanceof ExternalFeedDataSource) {
-            AqlCompiledFeedDatasetDetails acfdd = (AqlCompiledFeedDatasetDetails) ((ExternalFeedDataSource) dataSource)
-                    .getCompiledDatasetDecl().getAqlCompiledDatasetDetails();
-
-            return buildFeedIntakeRuntime(jobSpec, metadata.getDataverseName(), acedl.getName(), itemType, acfdd,
-                    metadata.getFormat());
+            FeedDatasetDetails datasetDetails = (FeedDatasetDetails) ((ExternalFeedDataSource) dataSource).getDataset()
+                    .getDatasetDetails();
+            return buildFeedIntakeRuntime(jobSpec, metadata.getDataverseName(), dataset.getDatasetName(), itemType,
+                    datasetDetails, metadata.getFormat());
         } else {
             return buildExternalDataScannerRuntime(jobSpec, itemType,
-                    (AqlCompiledExternalDatasetDetails) acedl.getAqlCompiledDatasetDetails(), metadata.getFormat());
+                    (ExternalDatasetDetails) dataset.getDatasetDetails(), metadata.getFormat());
         }
     }
 
     @SuppressWarnings("rawtypes")
     public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataScannerRuntime(
-            JobSpecification jobSpec, IAType itemType, AqlCompiledExternalDatasetDetails decl, IDataFormat format)
+            JobSpecification jobSpec, IAType itemType, ExternalDatasetDetails datasetDetails, IDataFormat format)
             throws AlgebricksException {
         if (itemType.getTypeTag() != ATypeTag.RECORD) {
             throw new AlgebricksException("Can only scan datasets of records.");
         }
-
+        
         IDatasourceReadAdapter adapter;
         try {
-            adapter = (IDatasourceReadAdapter) Class.forName(decl.getAdapter()).newInstance();
+            adapter = (IDatasourceReadAdapter) Class.forName(datasetDetails.getAdapter()).newInstance();
         } catch (Exception e) {
             e.printStackTrace();
             throw new AlgebricksException("unable to load the adapter class " + e);
@@ -201,7 +202,7 @@
         ARecordType rt = (ARecordType) itemType;
 
         try {
-            adapter.configure(decl.getProperties(), itemType);
+            adapter.configure(datasetDetails.getProperties(), itemType);
         } catch (Exception e) {
             e.printStackTrace();
             throw new AlgebricksException("unable to configure the datasource adapter " + e);
@@ -211,7 +212,7 @@
         RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
 
         ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec,
-                decl.getAdapter(), decl.getProperties(), rt, scannerDesc);
+        		datasetDetails.getAdapter(), datasetDetails.getProperties(), rt, scannerDesc);
         dataScanner.setDatasourceAdapter(adapter);
         AlgebricksPartitionConstraint constraint = adapter.getPartitionConstraint();
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
@@ -243,13 +244,13 @@
     @SuppressWarnings("rawtypes")
     public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(
             JobSpecification jobSpec, String dataverse, String dataset, IAType itemType,
-            AqlCompiledFeedDatasetDetails decl, IDataFormat format) throws AlgebricksException {
+            FeedDatasetDetails datasetDetails, IDataFormat format) throws AlgebricksException {
         if (itemType.getTypeTag() != ATypeTag.RECORD) {
             throw new AlgebricksException("Can only consume records.");
         }
         IDatasourceAdapter adapter;
         try {
-            adapter = (IDatasourceAdapter) Class.forName(decl.getAdapter()).newInstance();
+            adapter = (IDatasourceAdapter) Class.forName(datasetDetails.getAdapter()).newInstance();
         } catch (Exception e) {
             e.printStackTrace();
             throw new AlgebricksException("unable to load the adapter class " + e);
@@ -257,7 +258,7 @@
 
         ARecordType rt = (ARecordType) itemType;
         try {
-            adapter.configure(decl.getProperties(), itemType);
+            adapter.configure(datasetDetails.getProperties(), itemType);
         } catch (Exception e) {
             e.printStackTrace();
             throw new AlgebricksException("unable to configure the datasource adapter " + e);
@@ -267,46 +268,39 @@
         RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
 
         FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(dataverse,
-                dataset), decl.getAdapter(), decl.getProperties(), rt, feedDesc);
+                dataset), datasetDetails.getAdapter(), datasetDetails.getProperties(), rt, feedDesc);
 
         AlgebricksPartitionConstraint constraint = adapter.getPartitionConstraint();
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedIngestor, constraint);
     }
 
     public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedMessengerRuntime(
-            JobSpecification jobSpec, AqlCompiledMetadataDeclarations metadata, AqlCompiledFeedDatasetDetails decl,
+            JobSpecification jobSpec, AqlCompiledMetadataDeclarations metadata, FeedDatasetDetails datasetDetails,
             String dataverse, String dataset, List<IFeedMessage> feedMessages) throws AlgebricksException {
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-        try {
-            spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset, dataset);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset, dataset);
         FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, dataverse, dataset,
                 feedMessages);
-
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, spPc.second);
     }
 
     public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
             List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
             AqlCompiledMetadataDeclarations metadata, JobGenContext context, boolean retainInput, String datasetName,
-            AqlCompiledDatasetDecl datasetDecl, String indexName, int[] lowKeyFields, int[] highKeyFields,
+            Dataset dataset, String indexName, int[] lowKeyFields, int[] highKeyFields,
             boolean lowKeyInclusive, boolean highKeyInclusive) throws AlgebricksException {
         boolean isSecondary = true;
-        AqlCompiledIndexDecl primaryIndexDecl = DatasetUtils.getPrimaryIndex(datasetDecl);
-        if (primaryIndexDecl != null) {
-            isSecondary = !indexName.equals(primaryIndexDecl.getIndexName());
+        Index primaryIndex = metadata.getDatasetPrimaryIndex(dataset.getDataverseName(), dataset.getDatasetName());
+        if (primaryIndex != null) {
+            isSecondary = !indexName.equals(primaryIndex.getIndexName());
         }
-        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(datasetDecl).size();
+        int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
         RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
         int numKeys = numPrimaryKeys;
         int keysStartIndex = outputRecDesc.getFieldCount() - numKeys - 1;
         if (isSecondary) {
-            AqlCompiledIndexDecl secondaryIndexDecl = DatasetUtils.findSecondaryIndexByName(datasetDecl, indexName);
-            int numSecondaryKeys = secondaryIndexDecl.getFieldExprs().size();
+            Index secondaryIndex = metadata.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
+            int numSecondaryKeys = secondaryIndex.getKeyFieldNames().size();
             numKeys += numSecondaryKeys;
             keysStartIndex = outputRecDesc.getFieldCount() - numKeys;
         }
@@ -332,23 +326,23 @@
     @SuppressWarnings("rawtypes")
     public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(
             AqlCompiledMetadataDeclarations metadata, JobGenContext context, JobSpecification jobSpec,
-            String datasetName, AqlCompiledDatasetDecl ddecl, String indexName, int[] keyFields)
+            String datasetName, Dataset dataset, String indexName, int[] keyFields)
             throws AlgebricksException {
-        String itemTypeName = ddecl.getItemTypeName();
-        IAType itemType;
-        try {
-            itemType = metadata.findType(itemTypeName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
+        // TODO: This can be shrunk to a single line.
+        String itemTypeName = dataset.getDatatypeName();
+        IAType itemType = metadata.findType(itemTypeName);
+        if (itemType.getTypeTag() != ATypeTag.RECORD) {
+            throw new AlgebricksException("Only record types can be indexed.");
         }
+        ARecordType recType = (ARecordType) itemType;
 
         boolean isSecondary = true;
-        AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
-        if (primIdxDecl != null) {
-            isSecondary = !indexName.equals(primIdxDecl.getIndexName());
+        Index primaryIndex = metadata.getDatasetPrimaryIndex(dataset.getDataverseName(), dataset.getDatasetName());
+        if (primaryIndex != null) {
+            isSecondary = !indexName.equals(primaryIndex.getIndexName());
         }
 
-        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl).size();
+        int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
         ISerializerDeserializer[] recordFields;
         IBinaryComparatorFactory[] comparatorFactories;
         ITypeTraits[] typeTraits;
@@ -357,12 +351,12 @@
         int numNestedSecondaryKeyFields = 0;
         int i = 0;
         if (isSecondary) {
-            AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(ddecl, indexName);
-            if (cid == null) {
+            Index secondaryIndex = metadata.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
+            if (secondaryIndex == null) {
                 throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
                         + datasetName);
             }
-            List<String> secondaryKeyFields = cid.getFieldExprs();
+            List<String> secondaryKeyFields = secondaryIndex.getKeyFieldNames();
             numSecondaryKeys = secondaryKeyFields.size();
 
             if (numSecondaryKeys != 1) {
@@ -372,11 +366,6 @@
                                 + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
             }
 
-            if (itemType.getTypeTag() != ATypeTag.RECORD) {
-                throw new AlgebricksException("Only record types can be indexed.");
-            }
-            ARecordType recType = (ARecordType) itemType;
-
             Pair<IAType, Boolean> keyTypePair = AqlCompiledIndexDecl.getNonNullableKeyFieldType(
                     secondaryKeyFields.get(0), recType);
             IAType keyType = keyTypePair.first;
@@ -407,9 +396,9 @@
             throw new AlgebricksException("R-tree can only be used as a secondary index");
         }
 
-        for (Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
-                .getPartitioningFunctions(ddecl)) {
-            IAType keyType = evalFactoryAndType.third;
+        List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+        for (String partitioningKey : partitioningKeys) {
+            IAType keyType = recType.getFieldType(partitioningKey);
             ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
                     .getSerializerDeserializer(keyType);
             recordFields[i] = keySerde;
@@ -453,20 +442,19 @@
     public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
             throws AlgebricksException {
         AqlDataSource ads = findDataSource(dataSourceId);
-        AqlCompiledDatasetDecl adecl = ads.getCompiledDatasetDecl();
-        if (adecl.getDatasetType() == DatasetType.EXTERNAL) {
+        Dataset dataset = ads.getDataset();
+        if (dataset.getType() == DatasetType.EXTERNAL) {
             throw new AlgebricksException("No index for external dataset " + dataSourceId);
         }
 
-        String idxName = (String) indexId;
-        AqlCompiledIndexDecl acid = DatasetUtils.findSecondaryIndexByName(adecl, idxName);
-        AqlSourceId asid = (AqlSourceId) dataSourceId;
-        if (acid != null) {
-            return new AqlIndex(acid, metadata, asid.getDatasetName());
+        String indexName = (String) indexId;
+        Index secondaryIndex = metadata.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
+        if (secondaryIndex != null) {
+            return new AqlIndex(secondaryIndex, metadata, dataset.getDatasetName());
         } else {
-            AqlCompiledIndexDecl primIdx = DatasetUtils.getPrimaryIndex(adecl);
-            if (primIdx.getIndexName().equals(indexId)) {
-                return new AqlIndex(primIdx, metadata, asid.getDatasetName());
+            Index primaryIndex = metadata.getDatasetPrimaryIndex(dataset.getDataverseName(), dataset.getDatasetName());
+            if (primaryIndex.getIndexName().equals(indexId)) {
+                return new AqlIndex(primaryIndex, metadata, dataset.getDatasetName());
             } else {
                 return null;
             }
@@ -478,29 +466,29 @@
         if (!aqlId.getDataverseName().equals(metadata.getDataverseName())) {
             return null;
         }
-        AqlCompiledDatasetDecl acdd = metadata.findDataset(aqlId.getDatasetName());
-        if (acdd == null) {
+        Dataset dataset = metadata.findDataset(aqlId.getDatasetName());
+        if (dataset == null) {
             throw new AlgebricksException("Datasource with id " + aqlId + " was not found.");
         }
-        String tName = acdd.getItemTypeName();
-        IAType itemType;
-        try {
-            itemType = metadata.findType(tName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-        return new AqlDataSource(aqlId, acdd, itemType);
+        String tName = dataset.getDatatypeName();
+        IAType itemType = metadata.findType(tName);
+        return new AqlDataSource(aqlId, dataset, itemType);
     }
 
     @Override
     public boolean scannerOperatorIsLeaf(IDataSource<AqlSourceId> dataSource) {
         AqlSourceId asid = dataSource.getId();
         String datasetName = asid.getDatasetName();
-        AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
-        if (adecl == null) {
+        Dataset dataset = null;
+        try {
+            dataset = metadata.findDataset(datasetName);
+        } catch (AlgebricksException e) {
+            throw new IllegalStateException(e);
+        }
+        if (dataset == null) {
             throw new IllegalArgumentException("Unknown dataset " + datasetName);
         }
-        return adecl.getDatasetType() == DatasetType.EXTERNAL;
+        return dataset.getType() == DatasetType.EXTERNAL;
     }
 
     @Override
@@ -520,27 +508,23 @@
         }
         fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
 
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
+        Dataset dataset = metadata.findDataset(datasetName);
+        if (dataset == null) {
             throw new AlgebricksException("Unknown dataset " + datasetName);
         }
-        String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
+        Index primaryIndex = metadata.getDatasetPrimaryIndex(dataset.getDataverseName(), dataset.getDatasetName());
+        String indexName = primaryIndex.getIndexName();
 
-        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
-
-        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-
+        String itemTypeName = dataset.getDatatypeName();
+        ARecordType itemType = (ARecordType) metadata.findType(itemTypeName);
+        
+        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);       
         IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
-        try {
-            splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
-                    indexName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-
+                dataset, itemType, context.getBinaryComparatorFactoryProvider());
+                
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
+        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
         TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
                 appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
                 splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation,
@@ -567,27 +551,25 @@
         }
         fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
 
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
+        Dataset dataset = metadata.findDataset(datasetName);
+        if (dataset == null) {
             throw new AlgebricksException("Unknown dataset " + datasetName);
         }
-        String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
+        Index primaryIndex = metadata.getDatasetPrimaryIndex(dataset.getDataverseName(), dataset.getDatasetName());
+        String indexName = primaryIndex.getIndexName();
 
-        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
+        String itemTypeName = dataset.getDatatypeName();
+        ARecordType itemType = (ARecordType) metadata.findType(itemTypeName);
+        
+        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
 
         IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
 
         IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
+                dataset, itemType, context.getBinaryComparatorFactoryProvider());
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
-        try {
-            splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
-                    indexName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
         TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
                 splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, IndexOp.INSERT,
@@ -595,6 +577,7 @@
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
     }
 
+    // TODO: Seems like we can share the code with getInsertRuntime().
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
             IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
@@ -613,27 +596,25 @@
         }
         fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
 
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
+        Dataset dataset = metadata.findDataset(datasetName);
+        if (dataset == null) {
             throw new AlgebricksException("Unknown dataset " + datasetName);
         }
-        String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
+        Index primaryIndex = metadata.getDatasetPrimaryIndex(dataset.getDataverseName(), dataset.getDatasetName());
+        String indexName = primaryIndex.getIndexName();
 
-        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
+        String itemTypeName = dataset.getDatatypeName();
+        ARecordType itemType = (ARecordType) metadata.findType(itemTypeName);
+
+        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
 
         IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
 
         IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
+                dataset, itemType, context.getBinaryComparatorFactoryProvider());
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
-        try {
-            splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
-                    indexName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
         TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
                 splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, IndexOp.DELETE,
@@ -641,6 +622,7 @@
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
     }
 
+    // TODO: Seems like we can share this code with getIndexDeleteRuntime().
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
             IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
@@ -649,13 +631,13 @@
             JobGenContext context, JobSpecification spec) throws AlgebricksException {
         String indexName = dataSourceIndex.getId();
         String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
+        Dataset dataset = metadata.findDataset(datasetName);
+        if (dataset == null) {
             throw new AlgebricksException("Unknown dataset " + datasetName);
         }
-        AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
+        Index secondaryIndex = metadata.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
         AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
-        if (cid.getKind() == IndexKind.BTREE) {
+        if (secondaryIndex.getIndexType() == IndexType.BTREE) {
             return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
                     filterFactory, recordDesc, context, spec, IndexOp.INSERT);
         } else {
@@ -672,13 +654,13 @@
             JobGenContext context, JobSpecification spec) throws AlgebricksException {
         String indexName = dataSourceIndex.getId();
         String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
+        Dataset dataset = metadata.findDataset(datasetName);
+        if (dataset == null) {
             throw new AlgebricksException("Unknown dataset " + datasetName);
         }
-        AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
+        Index secondaryIndex = metadata.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
         AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
-        if (cid.getKind() == IndexKind.BTREE) {
+        if (secondaryIndex.getIndexType() == IndexType.BTREE) {
             return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
                     filterFactory, recordDesc, context, spec, IndexOp.DELETE);
         } else {
@@ -719,26 +701,20 @@
             i++;
         }
 
-        // dataset
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        if (compiledDatasetDecl == null) {
+        Dataset dataset = metadata.findDataset(datasetName);
+        if (dataset == null) {
             throw new AlgebricksException("Unknown dataset " + datasetName);
         }
-        String itemTypeName = compiledDatasetDecl.getItemTypeName();
-        IAType itemType;
-        try {
-            itemType = metadata.findType(itemTypeName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
+        String itemTypeName = dataset.getDatatypeName();
+        IAType itemType = metadata.findType(itemTypeName);
         if (itemType.getTypeTag() != ATypeTag.RECORD) {
             throw new AlgebricksException("Only record types can be indexed.");
         }
         ARecordType recType = (ARecordType) itemType;
 
-        // index parameters
-        AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
-        List<String> secondaryKeyExprs = cid.getFieldExprs();
+        // Index parameters.
+        Index secondaryIndex = metadata.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
+        List<String> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
         ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
         for (i = 0; i < secondaryKeys.size(); ++i) {
@@ -749,9 +725,9 @@
                     true);
             typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
         }
-        for (Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
-                .getPartitioningFunctions(compiledDatasetDecl)) {
-            IAType keyType = evalFactoryAndType.third;
+        List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+        for (String partitioningKey : partitioningKeys) {
+            IAType keyType = recType.getFieldType(partitioningKey);
             comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
                     true);
             typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
@@ -759,13 +735,8 @@
         }
 
         IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
-        try {
-            splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
-                    indexName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
         TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
                 splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
@@ -777,20 +748,15 @@
             String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc,
             JobGenContext context, JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
-        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
-        String itemTypeName = compiledDatasetDecl.getItemTypeName();
-        IAType itemType;
-        try {
-            itemType = metadata.findType(itemTypeName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
+        Dataset dataset = metadata.findDataset(datasetName);
+        String itemTypeName = dataset.getDatatypeName();
+        IAType itemType = metadata.findType(itemTypeName);
         if (itemType.getTypeTag() != ATypeTag.RECORD) {
             throw new AlgebricksException("Only record types can be indexed.");
         }
         ARecordType recType = (ARecordType) itemType;
-        AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
-        List<String> secondaryKeyExprs = cid.getFieldExprs();
+        Index secondaryIndex = metadata.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
+        List<String> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
         Pair<IAType, Boolean> keyPairType = AqlCompiledIndexDecl.getNonNullableKeyFieldType(secondaryKeyExprs.get(0),
                 recType);
         IAType spatialType = keyPairType.first;
@@ -816,16 +782,14 @@
         IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
         IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numSecondaryKeys];
         for (i = 0; i < numSecondaryKeys; i++) {
-            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
-                    .getSerializerDeserializer(nestedKeyType);
             comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
                     nestedKeyType, true);
             typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
             valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
         }
-        for (Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
-                .getPartitioningFunctions(compiledDatasetDecl)) {
-            IAType keyType = evalFactoryAndType.third;
+        List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+        for (String partitioningKey : partitioningKeys) {
+            IAType keyType = recType.getFieldType(partitioningKey);
             comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
                     true);
             typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
@@ -833,13 +797,8 @@
         }
 
         IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
-        try {
-            splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
-                    indexName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
         TreeIndexInsertUpdateDeleteOperatorDescriptor rtreeUpdate = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
                 splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ExternalFeedDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ExternalFeedDataSource.java
index 16e8bae..566265f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ExternalFeedDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ExternalFeedDataSource.java
@@ -1,17 +1,17 @@
 package edu.uci.ics.asterix.metadata.declared;
 
+import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 
 public class ExternalFeedDataSource extends AqlDataSource {
 
-    public ExternalFeedDataSource(AqlSourceId id, AqlCompiledDatasetDecl adecl, IAType itemType) throws AlgebricksException {
-        super(id,adecl, itemType);
+    public ExternalFeedDataSource(AqlSourceId id, Dataset dataset, IAType itemType) throws AlgebricksException {
+        super(id, dataset, itemType);
     }
 
-    public ExternalFeedDataSource(AqlSourceId id, AqlCompiledDatasetDecl adecl, IAType itemType, AqlDataSourceType dataSourceType) throws AlgebricksException {
-        super(id,adecl, itemType, dataSourceType);
+    public ExternalFeedDataSource(AqlSourceId id, Dataset dataset, IAType itemType, AqlDataSourceType dataSourceType)
+            throws AlgebricksException {
+        super(id, dataset, itemType, dataSourceType);
     }
-    
-  
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
index e810bc9..36ac4ed 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
@@ -52,10 +52,12 @@
         return datasetName;
     }
 
+    // TODO: getItemTypeName()
     public String getDatatypeName() {
         return datatypeName;
     }
 
+    // TODO: getDatasetType()
     public DatasetType getType() {
         return datasetType;
     }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedDatasetDetails.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedDatasetDetails.java
index 36db14a..9761c47 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedDatasetDetails.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedDatasetDetails.java
@@ -92,7 +92,7 @@
 
         // write field 2
         listBuilder.reset((AOrderedListType) MetadataRecordTypes.FEED_DETAILS_RECORDTYPE.getFieldTypes()[2]);
-        for (String field : partitioningKey) {
+        for (String field : partitioningKeys) {
             itemValue.reset();
             aString.setValue(field);
             stringSerde.serialize(aString, itemValue.getDataOutput());
@@ -104,7 +104,7 @@
 
         // write field 3
         listBuilder.reset((AOrderedListType) MetadataRecordTypes.FEED_DETAILS_RECORDTYPE.getFieldTypes()[3]);
-        for (String field : primaryKey) {
+        for (String field : primaryKeys) {
             itemValue.reset();
             aString.setValue(field);
             stringSerde.serialize(aString, itemValue.getDataOutput());
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/InternalDatasetDetails.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/InternalDatasetDetails.java
index de51dc8..53d10e4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/InternalDatasetDetails.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/InternalDatasetDetails.java
@@ -48,29 +48,29 @@
 
     protected final FileStructure fileStructure;
     protected final PartitioningStrategy partitioningStrategy;
-    protected final List<String> partitioningKey;
-    protected final List<String> primaryKey;
-    protected final String groupName;
+    protected final List<String> partitioningKeys;
+    protected final List<String> primaryKeys;
+    protected final String nodeGroupName;
 
     public InternalDatasetDetails(FileStructure fileStructure, PartitioningStrategy partitioningStrategy,
             List<String> partitioningKey, List<String> primaryKey, String groupName) {
         this.fileStructure = fileStructure;
         this.partitioningStrategy = partitioningStrategy;
-        this.partitioningKey = partitioningKey;
-        this.primaryKey = primaryKey;
-        this.groupName = groupName;
+        this.partitioningKeys = partitioningKey;
+        this.primaryKeys = primaryKey;
+        this.nodeGroupName = groupName;
     }
 
     public String getNodeGroupName() {
-        return this.groupName;
+        return nodeGroupName;
     }
 
     public List<String> getPartitioningKey() {
-        return this.partitioningKey;
+        return partitioningKeys;
     }
 
     public List<String> getPrimaryKey() {
-        return primaryKey;
+        return primaryKeys;
     }
 
     public FileStructure getFileStructure() {
@@ -115,7 +115,7 @@
         // write field 2
         listBuilder
                 .reset((AOrderedListType) MetadataRecordTypes.INTERNAL_DETAILS_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.INTERNAL_DETAILS_ARECORD_PARTITIONKEY_FIELD_INDEX]);
-        for (String field : partitioningKey) {
+        for (String field : partitioningKeys) {
             itemValue.reset();
             aString.setValue(field);
             stringSerde.serialize(aString, itemValue.getDataOutput());
@@ -129,7 +129,7 @@
         // write field 3
         listBuilder
                 .reset((AOrderedListType) MetadataRecordTypes.INTERNAL_DETAILS_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.INTERNAL_DETAILS_ARECORD_PRIMARYKEY_FIELD_INDEX]);
-        for (String field : primaryKey) {
+        for (String field : primaryKeys) {
             itemValue.reset();
             aString.setValue(field);
             stringSerde.serialize(aString, itemValue.getDataOutput());
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java
index dd05b00..59d4b31 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java
@@ -1,8 +1,8 @@
 package edu.uci.ics.asterix.metadata.functions;
 
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
@@ -53,11 +53,11 @@
                 AsterixConstantValue acv = (AsterixConstantValue) ((ConstantExpression) a1).getValue();
                 String datasetName = ((AString) acv.getObject()).getStringValue();
                 AqlCompiledMetadataDeclarations metadata = ((AqlMetadataProvider) mp).getMetadataDeclarations();
-                AqlCompiledDatasetDecl acdd = metadata.findDataset(datasetName);
-                if (acdd == null) {
+                Dataset dataset = metadata.findDataset(datasetName);
+                if (dataset == null) {
                     throw new AlgebricksException("Could not find dataset " + datasetName);
                 }
-                String tn = acdd.getItemTypeName();
+                String tn = dataset.getDatatypeName();
                 IAType t2 = metadata.findType(tn);
                 if (t2 == null) {
                     throw new AlgebricksException("No type for dataset " + datasetName);
@@ -89,11 +89,11 @@
                 AsterixConstantValue acv = (AsterixConstantValue) ((ConstantExpression) a1).getValue();
                 String datasetName = ((AString) acv.getObject()).getStringValue();
                 AqlCompiledMetadataDeclarations metadata = ((AqlMetadataProvider) mp).getMetadataDeclarations();
-                AqlCompiledDatasetDecl acdd = metadata.findDataset(datasetName);
-                if (acdd == null) {
+                Dataset dataset = metadata.findDataset(datasetName);
+                if (dataset == null) {
                     throw new AlgebricksException("Could not find dataset " + datasetName);
                 }
-                String tn = acdd.getItemTypeName();
+                String tn = dataset.getDatatypeName();
                 IAType t2 = metadata.findType(tn);
                 if (t2 == null) {
                     throw new AlgebricksException("No type for dataset " + datasetName);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetUtils.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetUtils.java
index a34f14b..db86dbb 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetUtils.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetUtils.java
@@ -7,94 +7,87 @@
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledInternalDatasetDetails;
-import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
+import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 
 public class DatasetUtils {
-    public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(
-            AqlCompiledDatasetDecl compiledDatasetDecl, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
+    public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(Dataset dataset,
+            ARecordType itemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
             throws AlgebricksException {
-        if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL)
+        if (dataset.getType() == DatasetType.EXTERNAL) {
             throw new AlgebricksException("not implemented");
-        List<Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions;
-        partitioningFunctions = getPartitioningFunctions(compiledDatasetDecl);
-        int numKeys = partitioningFunctions.size();
-        IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[numKeys];
-        for (int i = 0; i < numKeys; i++) {
-            Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType = partitioningFunctions
-                    .get(i);
-            IAType keyType = evalFactoryAndType.third;
+        }
+        List<String> partitioningKeys = getPartitioningKeys(dataset);
+        IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[partitioningKeys.size()];
+        for (int i = 0; i < partitioningKeys.size(); i++) {
+            IAType keyType = itemType.getFieldType(partitioningKeys.get(i));
             bcfs[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
         }
         return bcfs;
     }
 
-    public static IBinaryHashFunctionFactory[] computeKeysBinaryHashFunFactories(
-            AqlCompiledDatasetDecl compiledDatasetDecl, IBinaryHashFunctionFactoryProvider hashFunProvider)
-            throws AlgebricksException {
-        if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL)
+    public static IBinaryHashFunctionFactory[] computeKeysBinaryHashFunFactories(Dataset dataset, ARecordType itemType,
+            IBinaryHashFunctionFactoryProvider hashFunProvider) throws AlgebricksException {
+        if (dataset.getType() == DatasetType.EXTERNAL) {
             throw new AlgebricksException("not implemented");
-        List<Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions;
-        partitioningFunctions = ((AqlCompiledInternalDatasetDetails) compiledDatasetDecl.getAqlCompiledDatasetDetails())
-                .getPartitioningFunctions();
-        int numKeys = partitioningFunctions.size();
-        IBinaryHashFunctionFactory[] bhffs = new IBinaryHashFunctionFactory[numKeys];
-        for (int i = 0; i < numKeys; i++) {
-            Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType = partitioningFunctions
-                    .get(i);
-            IAType keyType = evalFactoryAndType.third;
+        }
+        List<String> partitioningKeys = getPartitioningKeys(dataset);
+        IBinaryHashFunctionFactory[] bhffs = new IBinaryHashFunctionFactory[partitioningKeys.size()];
+        for (int i = 0; i < partitioningKeys.size(); i++) {
+            IAType keyType = itemType.getFieldType(partitioningKeys.get(i));
             bhffs[i] = hashFunProvider.getBinaryHashFunctionFactory(keyType);
         }
         return bhffs;
     }
 
-    public static ITypeTraits[] computeTupleTypeTraits(AqlCompiledDatasetDecl compiledDatasetDecl,
-            AqlCompiledMetadataDeclarations datasetDecls) throws AlgebricksException {
-        if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL)
+    public static ITypeTraits[] computeTupleTypeTraits(Dataset dataset, ARecordType itemType)
+            throws AlgebricksException {
+        if (dataset.getType() == DatasetType.EXTERNAL) {
             throw new AlgebricksException("not implemented");
-        List<Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions;
-        partitioningFunctions = ((AqlCompiledInternalDatasetDetails) compiledDatasetDecl.getAqlCompiledDatasetDetails())
-                .getPartitioningFunctions();
-        int numKeys = partitioningFunctions.size();
+        }
+        List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+        int numKeys = partitioningKeys.size();
         ITypeTraits[] typeTraits = new ITypeTraits[numKeys + 1];
         for (int i = 0; i < numKeys; i++) {
-            Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType = partitioningFunctions
-                    .get(i);
-            IAType keyType = evalFactoryAndType.third;
+            IAType keyType = itemType.getFieldType(partitioningKeys.get(i));
             typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
         }
-        IAType payloadType = datasetDecls.findType(compiledDatasetDecl.getItemTypeName());
-        typeTraits[numKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(payloadType);
+        typeTraits[numKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
         return typeTraits;
     }
 
-    public static List<Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType>> getPartitioningFunctions(
-            AqlCompiledDatasetDecl decl) {
-        return ((AqlCompiledInternalDatasetDetails) decl.getAqlCompiledDatasetDetails()).getPartitioningFunctions();
+    public static List<String> getPartitioningKeys(Dataset dataset) {
+        return ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey();
     }
 
-    public static String getNodegroupName(AqlCompiledDatasetDecl decl) {
-        return (((AqlCompiledInternalDatasetDetails) decl.getAqlCompiledDatasetDetails())).getNodegroupName();
+    public static String getNodegroupName(Dataset dataset) {
+        return (((AqlCompiledInternalDatasetDetails) dataset.getDatasetDetails())).getNodegroupName();
     }
 
-    public static AqlCompiledIndexDecl getPrimaryIndex(AqlCompiledDatasetDecl decl) {
+    // TODO: TO be removed.
+    /*
+    public static AqlCompiledIndexDecl getPrimaryIndex(Dataset dataset, AqlCompiledMetadataDeclarations metadata) {
+        List<Index> datasetIndexes = metadata.getDatasetIndexes(dataverseName, datasetName);
+        
         return (((AqlCompiledInternalDatasetDetails) decl.getAqlCompiledDatasetDetails())).getPrimaryIndex();
-
     }
+    */
 
+    // TODO: To be removed.
+    /*
     public static AqlCompiledIndexDecl findSecondaryIndexByName(AqlCompiledDatasetDecl decl, String indexName) {
         return (((AqlCompiledInternalDatasetDetails) decl.getAqlCompiledDatasetDetails())
                 .findSecondaryIndexByName(indexName));
     }
+    */
 
     public static List<AqlCompiledIndexDecl> findSecondaryIndexesByOneOfTheKeys(AqlCompiledDatasetDecl decl,
             String fieldExpr) {
@@ -102,17 +95,17 @@
                 .findSecondaryIndexesByOneOfTheKeys(fieldExpr);
     }
 
-    public static int getPositionOfPartitioningKeyField(AqlCompiledDatasetDecl decl, String fieldExpr) {
-        return (((AqlCompiledInternalDatasetDetails) decl.getAqlCompiledDatasetDetails()))
-                .getPositionOfPartitioningKeyField(fieldExpr);
-    }
-
-    public static List<String> getPartitioningExpressions(AqlCompiledDatasetDecl decl) {
-        return (((AqlCompiledInternalDatasetDetails) decl.getAqlCompiledDatasetDetails())).getPartitioningExprs();
+    public static int getPositionOfPartitioningKeyField(Dataset dataset, String fieldExpr) {
+        List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+        for (int i = 0; i < partitioningKeys.size(); i++) {
+            if (partitioningKeys.get(i).equals(fieldExpr)) {
+                return i;
+            }
+        }
+        return -1;
     }
 
     public static List<AqlCompiledIndexDecl> getSecondaryIndexes(AqlCompiledDatasetDecl decl) {
         return (((AqlCompiledInternalDatasetDetails) decl.getAqlCompiledDatasetDetails())).getSecondaryIndexes();
     }
-
 }
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ARecordType.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ARecordType.java
index 2953966..f09605c 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ARecordType.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ARecordType.java
@@ -2,7 +2,9 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import edu.uci.ics.asterix.common.annotations.IRecordTypeAnnotation;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -15,13 +17,17 @@
     private String[] fieldNames;
     private IAType[] fieldTypes;
     private boolean isOpen;
-    private transient List<IRecordTypeAnnotation> annotations = new ArrayList<IRecordTypeAnnotation>();
+    private transient final List<IRecordTypeAnnotation> annotations = new ArrayList<IRecordTypeAnnotation>();
+    private transient final Map<String, IAType> typeMap = new HashMap<String, IAType>();
 
     public ARecordType(String typeName, String[] fieldNames, IAType[] fieldTypes, boolean isOpen) {
         super(typeName);
         this.fieldNames = fieldNames;
         this.fieldTypes = fieldTypes;
         this.isOpen = isOpen;
+        for (int i = 0; i < fieldNames.length; i++) {
+        	typeMap.put(fieldNames[i], fieldTypes[i]);
+        }
     }
 
     public final String[] getFieldNames() {
@@ -76,6 +82,10 @@
         return -1;
     }
 
+    public IAType getFieldType(String fieldName) {
+    	return typeMap.get(fieldName);
+    }
+    
     @Override
     public String getDisplayName() {
         return "ARecord";