SQL++ support in AsterixDB:
1. implemented SQL++ expression to logical plan translator;
2. refactored REST API to be agnostic of query languages;
3. disabled fuzzy join queries for SQL++ runtime tests;
4. fixed several rewriting rules.

Change-Id: I82919c4527b304325059519d819a2c30cf2902a9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/479
Reviewed-by: Till Westmann <tillw@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 9cdaf31..c0613ed 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -23,32 +23,33 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslator;
+import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
 import org.apache.asterix.api.common.Job.SubmissionMode;
 import org.apache.asterix.common.config.AsterixCompilerProperties;
 import org.apache.asterix.common.config.AsterixExternalProperties;
 import org.apache.asterix.common.config.OptimizationConfUtil;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.dataflow.data.common.AqlExpressionTypeComputer;
 import org.apache.asterix.dataflow.data.common.AqlMergeAggregationExpressionFactory;
 import org.apache.asterix.dataflow.data.common.AqlNullableTypeComputer;
 import org.apache.asterix.dataflow.data.common.AqlPartialAggregationTypeComputer;
 import org.apache.asterix.formats.base.IDataFormat;
-import org.apache.asterix.jobgen.AqlLogicalExpressionJobGen;
-import org.apache.asterix.lang.aql.rewrites.AqlRewriter;
-import org.apache.asterix.lang.aql.visitor.AQLPrintVisitor;
+import org.apache.asterix.jobgen.QueryLogicalExpressionJobGen;
+import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory;
+import org.apache.asterix.lang.common.base.IQueryRewriter;
+import org.apache.asterix.lang.common.base.IRewriterFactory;
 import org.apache.asterix.lang.common.base.Statement.Kind;
+import org.apache.asterix.lang.common.rewrites.LangRewritingContext;
 import org.apache.asterix.lang.common.statement.FunctionDecl;
 import org.apache.asterix.lang.common.statement.Query;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.optimizer.base.RuleCollections;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
-import org.apache.asterix.translator.AqlExpressionToPlanTranslator;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -85,6 +86,16 @@
 public class APIFramework {
     public static final String HTML_STATEMENT_SEPARATOR = "<!-- BEGIN -->";
 
+    private final IRewriterFactory rewriterFactory;
+    private final IAstPrintVisitorFactory astPrintVisitorFactory;
+    private final ILangExpressionToPlanTranslatorFactory translatorFactory;
+
+    public APIFramework(ILangCompilationProvider compilationProvider) {
+        this.rewriterFactory = compilationProvider.getRewriterFactory();
+        this.astPrintVisitorFactory = compilationProvider.getAstPrintVisitorFactory();
+        this.translatorFactory = compilationProvider.getExpressionToPlanTranslatorFactory();
+    }
+
     private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> buildDefaultLogicalRewrites() {
         List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> defaultLogicalRewrites = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
         SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(false);
@@ -158,8 +169,8 @@
 
     }
 
-    public static Pair<Query, Integer> reWriteQuery(List<FunctionDecl> declaredFunctions,
-            AqlMetadataProvider metadataProvider, Query q, SessionConfig conf) throws AsterixException {
+    public Pair<Query, Integer> reWriteQuery(List<FunctionDecl> declaredFunctions, AqlMetadataProvider metadataProvider,
+            Query q, SessionConfig conf) throws AsterixException {
 
         if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_EXPR_TREE)) {
             conf.out().println();
@@ -172,23 +183,22 @@
             }
 
             if (q != null) {
-                q.accept(new AQLPrintVisitor(conf.out()), 0);
+                q.accept(astPrintVisitorFactory.createLangVisitor(conf.out()), 0);
             }
 
             if (conf.is(SessionConfig.FORMAT_HTML)) {
                 conf.out().println("</pre>");
             }
         }
-        AqlRewriter rw = new AqlRewriter(declaredFunctions, q, metadataProvider);
-        rw.rewrite();
-        Query rwQ = rw.getExpr();
-        return new Pair(rwQ, rw.getVarCounter());
+        IQueryRewriter rw = rewriterFactory.createQueryRewriter();
+        rw.rewrite(declaredFunctions, q, metadataProvider, new LangRewritingContext(q.getVarCounter()));
+        return new Pair<Query, Integer>(q, q.getVarCounter());
     }
 
-    public static JobSpecification compileQuery(List<FunctionDecl> declaredFunctions,
+    public JobSpecification compileQuery(List<FunctionDecl> declaredFunctions,
             AqlMetadataProvider queryMetadataProvider, Query rwQ, int varCounter, String outputDatasetName,
-            SessionConfig conf, ICompiledDmlStatement statement) throws AsterixException, AlgebricksException,
-            JSONException, RemoteException, ACIDException {
+            SessionConfig conf, ICompiledDmlStatement statement)
+                    throws AsterixException, AlgebricksException, JSONException, RemoteException, ACIDException {
 
         if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_REWRITTEN_EXPR_TREE)) {
             conf.out().println();
@@ -201,7 +211,7 @@
             }
 
             if (rwQ != null) {
-                rwQ.accept(new AQLPrintVisitor(conf.out()), 0);
+                rwQ.accept(astPrintVisitorFactory.createLangVisitor(conf.out()), 0);
             }
 
             if (conf.is(SessionConfig.FORMAT_HTML)) {
@@ -211,15 +221,15 @@
 
         org.apache.asterix.common.transactions.JobId asterixJobId = JobIdFactory.generateJobId();
         queryMetadataProvider.setJobId(asterixJobId);
-        AqlExpressionToPlanTranslator t = new AqlExpressionToPlanTranslator(queryMetadataProvider, varCounter,
-                outputDatasetName, statement);
+        ILangExpressionToPlanTranslator t = translatorFactory.createExpressionToPlanTranslator(queryMetadataProvider,
+                varCounter);
 
         ILogicalPlan plan;
         // statement = null when it's a query
         if (statement == null || statement.getKind() != Kind.LOAD) {
-            plan = t.translate(rwQ);
+            plan = t.translate(rwQ, outputDatasetName, statement);
         } else {
-            plan = t.translateLoad();
+            plan = t.translateLoad(statement);
         }
 
         LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
@@ -316,8 +326,8 @@
         builder.setBinaryIntegerInspectorFactory(format.getBinaryIntegerInspectorFactory());
         builder.setClusterLocations(clusterLocs);
         builder.setComparatorFactoryProvider(format.getBinaryComparatorFactoryProvider());
-        builder.setExpressionRuntimeProvider(new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter(
-                AqlLogicalExpressionJobGen.INSTANCE));
+        builder.setExpressionRuntimeProvider(
+                new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter(QueryLogicalExpressionJobGen.INSTANCE));
         builder.setHashFunctionFactoryProvider(format.getBinaryHashFunctionFactoryProvider());
         builder.setHashFunctionFamilyProvider(format.getBinaryHashFunctionFamilyProvider());
         builder.setNullWriterFactory(format.getNullWriterFactory());
@@ -368,7 +378,7 @@
         return spec;
     }
 
-    public static void executeJobArray(IHyracksClientConnection hcc, JobSpecification[] specs, PrintWriter out)
+    public void executeJobArray(IHyracksClientConnection hcc, JobSpecification[] specs, PrintWriter out)
             throws Exception {
         for (int i = 0; i < specs.length; i++) {
             specs[i].setMaxReattempts(0);
@@ -382,7 +392,7 @@
 
     }
 
-    public static void executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out) throws Exception {
+    public void executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out) throws Exception {
         for (int i = 0; i < jobs.length; i++) {
             jobs[i].getJobSpec().setMaxReattempts(0);
             long startTime = System.currentTimeMillis();
@@ -400,19 +410,6 @@
             double duration = (endTime - startTime) / 1000.00;
             out.println("<pre>Duration: " + duration + " sec</pre>");
         }
-
-    }
-
-    private static IDataFormat getDataFormat(MetadataTransactionContext mdTxnCtx, String dataverseName)
-            throws AsterixException {
-        Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
-        IDataFormat format;
-        try {
-            format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
-        } catch (Exception e) {
-            throw new AsterixException(e);
-        }
-        return format;
     }
 
 }
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/FeedWorkCollection.java b/asterix-app/src/main/java/org/apache/asterix/api/common/FeedWorkCollection.java
index f25c5dd..6aad64b 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/FeedWorkCollection.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/FeedWorkCollection.java
@@ -25,11 +25,13 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.AqlTranslator;
+import org.apache.asterix.aql.translator.QueryTranslator;
 import org.apache.asterix.common.feeds.FeedConnectionRequest;
 import org.apache.asterix.common.feeds.FeedConnectionRequest.ConnectionStatus;
 import org.apache.asterix.common.feeds.api.IFeedWork;
 import org.apache.asterix.common.feeds.api.IFeedWorkEventListener;
+import org.apache.asterix.compiler.provider.AqlCompilationProvider;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.feeds.FeedCollectInfo;
 import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
 import org.apache.asterix.lang.common.base.Statement;
@@ -44,6 +46,7 @@
 public class FeedWorkCollection {
 
     private static Logger LOGGER = Logger.getLogger(FeedWorkCollection.class.getName());
+    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
 
     /**
      * The task of subscribing to a feed to obtain data.
@@ -79,14 +82,15 @@
                 try {
                     PrintWriter writer = new PrintWriter(System.out, true);
                     SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
-                    DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(request.getReceivingFeedId()
-                            .getDataverse()));
+                    DataverseDecl dataverseDecl = new DataverseDecl(
+                            new Identifier(request.getReceivingFeedId().getDataverse()));
                     SubscribeFeedStatement subscribeStmt = new SubscribeFeedStatement(locations, request);
                     List<Statement> statements = new ArrayList<Statement>();
                     statements.add(dataverseDecl);
                     statements.add(subscribeStmt);
-                    AqlTranslator translator = new AqlTranslator(statements, pc);
-                    translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, AqlTranslator.ResultDelivery.SYNC);
+                    QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
+                    translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
+                            QueryTranslator.ResultDelivery.SYNC);
                     if (LOGGER.isLoggable(Level.INFO)) {
                         LOGGER.info("Submitted connection requests for execution: " + request);
                     }
@@ -187,8 +191,8 @@
                                 }
                             } catch (Exception e) {
                                 if (LOGGER.isLoggable(Level.WARNING)) {
-                                    LOGGER.warning("Unable to resume feed " + finfo.feedConnectionId + " "
-                                            + e.getMessage());
+                                    LOGGER.warning(
+                                            "Unable to resume feed " + finfo.feedConnectionId + " " + e.getMessage());
                                 }
                             }
                         }
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java
index 2c0f897..2f998bb 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java
@@ -37,11 +37,15 @@
 import org.apache.asterix.api.common.APIFramework;
 import org.apache.asterix.api.common.SessionConfig;
 import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.AqlTranslator;
+import org.apache.asterix.aql.translator.QueryTranslator;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.lang.aql.parser.AQLParser;
+import org.apache.asterix.compiler.provider.AqlCompilationProvider;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.lang.aql.parser.TokenMgrError;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.result.ResultReader;
@@ -57,8 +61,36 @@
 
     private static final String HYRACKS_DATASET_ATTR = "org.apache.asterix.HYRACKS_DATASET";
 
+    private final ILangCompilationProvider aqlCompilationProvider;
+    private final IParserFactory aqlParserFactory;
+    private final ILangCompilationProvider sqlppCompilationProvider;
+    private final IParserFactory sqlppParserFactory;
+
+    public APIServlet() {
+        this.aqlCompilationProvider = new AqlCompilationProvider();
+        this.aqlParserFactory = aqlCompilationProvider.getParserFactory();
+
+        this.sqlppCompilationProvider = new SqlppCompilationProvider();
+        this.sqlppParserFactory = sqlppCompilationProvider.getParserFactory();
+    }
+
     @Override
     public void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException {
+        // Query language
+        ILangCompilationProvider compilationProvider;
+        IParserFactory parserFactory;
+        String lang = request.getParameter("query-language");
+        if (lang.equals("AQL")) {
+            // Uses AQL compiler.
+            compilationProvider = aqlCompilationProvider;
+            parserFactory = aqlParserFactory;
+        } else {
+            // Uses SQL++ compiler.
+            compilationProvider = sqlppCompilationProvider;
+            parserFactory = sqlppParserFactory;
+        }
+
+        // Output format.
         OutputFormat format;
         boolean csv_and_header = false;
         String output = request.getParameter("output-format");
@@ -101,7 +133,7 @@
                     context.setAttribute(HYRACKS_DATASET_ATTR, hds);
                 }
             }
-            AQLParser parser = new AQLParser(query);
+            IParser parser = parserFactory.createParser(query);
             List<Statement> aqlStatements = parser.parse();
             SessionConfig sessionConfig = new SessionConfig(out, format, true, isSet(executeQuery), true);
             sessionConfig.set(SessionConfig.FORMAT_HTML, true);
@@ -110,10 +142,10 @@
             sessionConfig.setOOBData(isSet(printExprParam), isSet(printRewrittenExprParam),
                     isSet(printLogicalPlanParam), isSet(printOptimizedLogicalPlanParam), isSet(printJob));
             MetadataManager.INSTANCE.init();
-            AqlTranslator aqlTranslator = new AqlTranslator(aqlStatements, sessionConfig);
+            QueryTranslator translator = new QueryTranslator(aqlStatements, sessionConfig, compilationProvider);
             double duration = 0;
             long startTime = System.currentTimeMillis();
-            aqlTranslator.compileAndExecute(hcc, hds, AqlTranslator.ResultDelivery.SYNC);
+            translator.compileAndExecute(hcc, hds, QueryTranslator.ResultDelivery.SYNC);
             long endTime = System.currentTimeMillis();
             duration = (endTime - startTime) / 1000.00;
             out.println(APIFramework.HTML_STATEMENT_SEPARATOR);
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/AQLAPIServlet.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/AQLAPIServlet.java
index 9a9b345..f9dc45c 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/AQLAPIServlet.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/AQLAPIServlet.java
@@ -23,6 +23,7 @@
 
 import javax.servlet.http.HttpServletRequest;
 
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement.Kind;
 
 public class AQLAPIServlet extends RESTAPIServlet {
@@ -33,6 +34,10 @@
 
     private static final List<Kind> allowedStatements = new ArrayList<>();
 
+    public AQLAPIServlet(ILangCompilationProvider compilationProvider) {
+        super(compilationProvider);
+    }
+
     static {
         for (Kind k : Kind.values()) {
             allowedStatements.add(k);
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/DDLAPIServlet.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/DDLAPIServlet.java
index 65760a0d..0254d6d 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/DDLAPIServlet.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/DDLAPIServlet.java
@@ -23,12 +23,17 @@
 
 import javax.servlet.http.HttpServletRequest;
 
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.base.Statement.Kind;
 
 public class DDLAPIServlet extends RESTAPIServlet {
     private static final long serialVersionUID = 1L;
 
+    public DDLAPIServlet(ILangCompilationProvider compilationProvider) {
+        super(compilationProvider);
+    }
+
     protected String getQueryParameter(HttpServletRequest request) {
         return request.getParameter("ddl");
     }
@@ -37,7 +42,8 @@
         Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.DATAVERSE_DROP, Kind.DATASET_DECL, Kind.NODEGROUP_DECL,
                 Kind.NODEGROUP_DROP, Kind.TYPE_DECL, Kind.TYPE_DROP, Kind.CREATE_INDEX, Kind.INDEX_DECL,
                 Kind.CREATE_DATAVERSE, Kind.DATASET_DROP, Kind.INDEX_DROP, Kind.CREATE_FUNCTION, Kind.FUNCTION_DROP,
-                Kind.CREATE_PRIMARY_FEED, Kind.CREATE_SECONDARY_FEED, Kind.DROP_FEED, Kind.CREATE_FEED_POLICY, Kind.DROP_FEED_POLICY };
+                Kind.CREATE_PRIMARY_FEED, Kind.CREATE_SECONDARY_FEED, Kind.DROP_FEED, Kind.CREATE_FEED_POLICY,
+                Kind.DROP_FEED_POLICY };
         return Arrays.asList(statementsArray);
     }
 
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryAPIServlet.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryAPIServlet.java
index 9f7657d..9ada5ff 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryAPIServlet.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryAPIServlet.java
@@ -23,18 +23,24 @@
 
 import javax.servlet.http.HttpServletRequest;
 
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.base.Statement.Kind;
 
 public class QueryAPIServlet extends RESTAPIServlet {
     private static final long serialVersionUID = 1L;
 
+    public QueryAPIServlet(ILangCompilationProvider compilationProvider) {
+        super(compilationProvider);
+    }
+
     protected String getQueryParameter(HttpServletRequest request) {
         return request.getParameter("query");
     }
 
     protected List<Statement.Kind> getAllowedStatements() {
-        Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.FUNCTION_DECL, Kind.QUERY, Kind.SET, Kind.WRITE, Kind.RUN };
+        Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.FUNCTION_DECL, Kind.QUERY, Kind.SET, Kind.WRITE,
+                Kind.RUN };
         return Arrays.asList(statementsArray);
     }
 
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java
index fdaee18..0fd782d 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java
@@ -32,12 +32,13 @@
 
 import org.apache.asterix.api.common.SessionConfig;
 import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.AqlTranslator;
+import org.apache.asterix.aql.translator.QueryTranslator;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.lang.aql.parser.AQLParser;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.aql.parser.TokenMgrError;
-import org.apache.asterix.lang.aql.util.AQLFormatPrintUtil;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.base.Statement.Kind;
 import org.apache.asterix.metadata.MetadataManager;
@@ -56,6 +57,14 @@
 
     public static final String HYRACKS_DATASET_ATTR = "org.apache.asterix.HYRACKS_DATASET";
 
+    private final ILangCompilationProvider compilationProvider;
+    private final IParserFactory parserFactory;
+
+    public RESTAPIServlet(ILangCompilationProvider compilationProvider) {
+        this.compilationProvider = compilationProvider;
+        this.parserFactory = compilationProvider.getParserFactory();
+    }
+
     /**
      * Initialize the Content-Type of the response, and construct a
      * SessionConfig with the appropriate output writer and output-format
@@ -90,8 +99,9 @@
         }
 
         // If it's JSON, check for the "lossless" flag
-        if (format == OutputFormat.CLEAN_JSON &&
-                ("true".equals(request.getParameter("lossless")) || accept.contains("lossless=true")) ) {
+
+        if (format == OutputFormat.CLEAN_JSON
+                && ("true".equals(request.getParameter("lossless")) || accept.contains("lossless=true"))) {
             format = OutputFormat.LOSSLESS_JSON;
         }
 
@@ -131,8 +141,7 @@
                 break;
             case CSV: {
                 // Check for header parameter or in Accept:.
-                if ("present".equals(request.getParameter("header")) ||
-                    accept.contains("header=present")) {
+                if ("present".equals(request.getParameter("header")) || accept.contains("header=present")) {
                     response.setContentType("text/csv; header=present");
                     sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, true);
                 } else {
@@ -145,8 +154,8 @@
     }
 
     @Override
-    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException,
-            IOException {
+    protected void doPost(HttpServletRequest request, HttpServletResponse response)
+            throws ServletException, IOException {
         StringWriter sw = new StringWriter();
         IOUtils.copy(request.getInputStream(), sw, StandardCharsets.UTF_8.name());
         String query = sw.toString();
@@ -162,7 +171,7 @@
     public void handleRequest(HttpServletRequest request, HttpServletResponse response, String query)
             throws IOException {
         SessionConfig sessionConfig = initResponse(request, response);
-        AqlTranslator.ResultDelivery resultDelivery = whichResultDelivery(request);
+        QueryTranslator.ResultDelivery resultDelivery = whichResultDelivery(request);
 
         ServletContext context = getServletContext();
         IHyracksClientConnection hcc;
@@ -178,12 +187,12 @@
                 }
             }
 
-            AQLParser parser = new AQLParser(query);
+            IParser parser = parserFactory.createParser(query);
             List<Statement> aqlStatements = parser.parse();
             if (!containsForbiddenStatements(aqlStatements)) {
                 MetadataManager.INSTANCE.init();
-                AqlTranslator aqlTranslator = new AqlTranslator(aqlStatements, sessionConfig);
-                aqlTranslator.compileAndExecute(hcc, hds, resultDelivery);
+                QueryTranslator translator = new QueryTranslator(aqlStatements, sessionConfig, compilationProvider);
+                translator.compileAndExecute(hcc, hds, resultDelivery);
             }
         } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
@@ -207,16 +216,16 @@
         return false;
     }
 
-    protected AqlTranslator.ResultDelivery whichResultDelivery(HttpServletRequest request) {
+    protected QueryTranslator.ResultDelivery whichResultDelivery(HttpServletRequest request) {
         String mode = request.getParameter("mode");
         if (mode != null) {
             if (mode.equals("asynchronous")) {
-                return AqlTranslator.ResultDelivery.ASYNC;
+                return QueryTranslator.ResultDelivery.ASYNC;
             } else if (mode.equals("asynchronous-deferred")) {
-                return AqlTranslator.ResultDelivery.ASYNC_DEFERRED;
+                return QueryTranslator.ResultDelivery.ASYNC_DEFERRED;
             }
         }
-        return AqlTranslator.ResultDelivery.SYNC;
+        return QueryTranslator.ResultDelivery.SYNC;
     }
 
     protected abstract String getQueryParameter(HttpServletRequest request);
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java
index d08134f..d75625f 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java
@@ -23,19 +23,25 @@
 
 import javax.servlet.http.HttpServletRequest;
 
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.base.Statement.Kind;
 
 public class UpdateAPIServlet extends RESTAPIServlet {
     private static final long serialVersionUID = 1L;
 
+    public UpdateAPIServlet(ILangCompilationProvider compilationProvider) {
+        super(compilationProvider);
+    }
+
     protected String getQueryParameter(HttpServletRequest request) {
         return request.getParameter("statements");
     }
 
     protected List<Statement.Kind> getAllowedStatements() {
         Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.DELETE, Kind.INSERT, Kind.UPDATE, Kind.DML_CMD_LIST,
-                Kind.LOAD, Kind.CONNECT_FEED, Kind.DISCONNECT_FEED, Kind.SET, Kind.COMPACT, Kind.EXTERNAL_DATASET_REFRESH };
+                Kind.LOAD, Kind.CONNECT_FEED, Kind.DISCONNECT_FEED, Kind.SET, Kind.COMPACT,
+                Kind.EXTERNAL_DATASET_REFRESH };
         return Arrays.asList(statementsArray);
     }
 
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java b/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index a9b012d..964893b 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -26,8 +26,10 @@
 import org.apache.asterix.api.common.Job;
 import org.apache.asterix.api.common.SessionConfig;
 import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.AqlTranslator;
-import org.apache.asterix.lang.aql.parser.AQLParser;
+import org.apache.asterix.aql.translator.QueryTranslator;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -41,14 +43,23 @@
     private Job[] dmlJobs;
     private JobSpecification queryJobSpec;
 
-    public AsterixJavaClient(IHyracksClientConnection hcc, Reader queryText, PrintWriter writer) {
+    private final ILangCompilationProvider compilationProvider;
+    private final IParserFactory parserFactory;
+    private final APIFramework apiFramework;
+
+    public AsterixJavaClient(IHyracksClientConnection hcc, Reader queryText, PrintWriter writer,
+            ILangCompilationProvider compilationProvider) {
         this.hcc = hcc;
         this.queryText = queryText;
         this.writer = writer;
+        this.compilationProvider = compilationProvider;
+        parserFactory = compilationProvider.getParserFactory();
+        this.apiFramework = new APIFramework(compilationProvider);
     }
 
-    public AsterixJavaClient(IHyracksClientConnection hcc, Reader queryText) {
-        this(hcc, queryText, new PrintWriter(System.out, true));
+    public AsterixJavaClient(IHyracksClientConnection hcc, Reader queryText,
+            ILangCompilationProvider compilationProvider) {
+        this(hcc, queryText, new PrintWriter(System.out, true), compilationProvider);
     }
 
     public void compile() throws Exception {
@@ -69,8 +80,8 @@
         while ((ch = queryText.read()) != -1) {
             builder.append((char) ch);
         }
-        AQLParser parser = new AQLParser(builder.toString());
-        List<Statement> aqlStatements = parser.parse();
+        IParser parser = parserFactory.createParser(builder.toString());
+        List<Statement> statements = parser.parse();
         MetadataManager.INSTANCE.init();
 
         SessionConfig conf = new SessionConfig(writer, OutputFormat.ADM, optimize, true, generateBinaryRuntime);
@@ -79,17 +90,17 @@
             conf.set(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS, true);
         }
 
-        AqlTranslator aqlTranslator = new AqlTranslator(aqlStatements, conf);
-        aqlTranslator.compileAndExecute(hcc, null, AqlTranslator.ResultDelivery.SYNC);
+        QueryTranslator translator = new QueryTranslator(statements, conf, compilationProvider);
+        translator.compileAndExecute(hcc, null, QueryTranslator.ResultDelivery.SYNC);
         writer.flush();
     }
 
     public void execute() throws Exception {
         if (dmlJobs != null) {
-            APIFramework.executeJobArray(hcc, dmlJobs, writer);
+            apiFramework.executeJobArray(hcc, dmlJobs, writer);
         }
         if (queryJobSpec != null) {
-            APIFramework.executeJobArray(hcc, new JobSpecification[] { queryJobSpec }, writer);
+            apiFramework.executeJobArray(hcc, new JobSpecification[] { queryJobSpec }, writer);
         }
     }
 
diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
similarity index 93%
rename from asterix-app/src/main/java/org/apache/asterix/aql/translator/AqlTranslator.java
rename to asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 8029e0d..1fa610e 100644
--- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -39,10 +39,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.commons.lang3.StringUtils;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
 import org.apache.asterix.api.common.APIFramework;
 import org.apache.asterix.api.common.Job;
 import org.apache.asterix.api.common.SessionConfig;
@@ -67,6 +63,7 @@
 import org.apache.asterix.common.feeds.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
 import org.apache.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
 import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.feeds.CentralFeedManager;
 import org.apache.asterix.feeds.FeedJoint;
 import org.apache.asterix.feeds.FeedLifecycleListener;
@@ -77,7 +74,8 @@
 import org.apache.asterix.file.IndexOperations;
 import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
 import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
-import org.apache.asterix.lang.aql.util.FunctionUtils;
+import org.apache.asterix.lang.common.base.IRewriterFactory;
+import org.apache.asterix.lang.common.base.IStatementRewriter;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.expression.TypeExpression;
 import org.apache.asterix.lang.common.statement.CompactStatement;
@@ -115,6 +113,7 @@
 import org.apache.asterix.lang.common.statement.TypeDropStatement;
 import org.apache.asterix.lang.common.statement.WriteStatement;
 import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
@@ -157,7 +156,7 @@
 import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
-import org.apache.asterix.translator.AbstractAqlTranslator;
+import org.apache.asterix.translator.AbstractLangTranslator;
 import org.apache.asterix.translator.CompiledStatements.CompiledConnectFeedStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledDatasetDropStatement;
@@ -169,10 +168,12 @@
 import org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.TypeTranslator;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
 import org.apache.hyracks.algebricks.data.IAWriterFactory;
 import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
@@ -190,19 +191,21 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 /*
- * Provides functionality for executing a batch of AQL statements (queries included)
+ * Provides functionality for executing a batch of Query statements (queries included)
  * sequentially.
  */
-public class AqlTranslator extends AbstractAqlTranslator {
+public class QueryTranslator extends AbstractLangTranslator {
 
-    private static Logger LOGGER = Logger.getLogger(AqlTranslator.class.getName());
+    private static Logger LOGGER = Logger.getLogger(QueryTranslator.class.getName());
 
     private enum ProgressState {
         NO_PROGRESS,
@@ -216,15 +219,20 @@
     }
 
     public static final boolean IS_DEBUG_MODE = false;//true
-    private final List<Statement> aqlStatements;
+    private final List<Statement> statements;
     private final SessionConfig sessionConfig;
     private Dataverse activeDefaultDataverse;
     private final List<FunctionDecl> declaredFunctions;
+    private final APIFramework apiFramework;
+    private final IRewriterFactory rewriterFactory;
 
-    public AqlTranslator(List<Statement> aqlStatements, SessionConfig conf) throws MetadataException, AsterixException {
-        this.aqlStatements = aqlStatements;
+    public QueryTranslator(List<Statement> aqlStatements, SessionConfig conf,
+            ILangCompilationProvider compliationProvider) throws MetadataException, AsterixException {
+        this.statements = aqlStatements;
         this.sessionConfig = conf;
-        declaredFunctions = getDeclaredFunctions(aqlStatements);
+        this.declaredFunctions = getDeclaredFunctions(aqlStatements);
+        this.apiFramework = new APIFramework(compliationProvider);
+        this.rewriterFactory = compliationProvider.getRewriterFactory();
     }
 
     private List<FunctionDecl> getDeclaredFunctions(List<Statement> statements) {
@@ -239,7 +247,7 @@
 
     /**
      * Compiles and submits for execution a list of AQL statements.
-     * 
+     *
      * @param hcc
      *            A Hyracks client connection that is used to submit a jobspec to Hyracks.
      * @param hdc
@@ -257,11 +265,12 @@
         IResultSerializerFactoryProvider resultSerializerFactoryProvider = ResultSerializerFactoryProvider.INSTANCE;
         Map<String, String> config = new HashMap<String, String>();
 
-        for (Statement stmt : aqlStatements) {
+        for (Statement stmt : statements) {
             if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
                 sessionConfig.out().println(APIFramework.HTML_STATEMENT_SEPARATOR);
             }
             validateOperation(activeDefaultDataverse, stmt);
+            rewriteStatement(stmt); // Rewrite the statement's AST.
             AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse,
                     CentralFeedManager.getInstance());
             metadataProvider.setWriterFactory(writerFactory);
@@ -379,8 +388,8 @@
 
                 case QUERY: {
                     metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
-                    metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
-                            || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
+                    metadataProvider.setResultAsyncMode(
+                            resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
                     handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery);
                     break;
                 }
@@ -408,6 +417,9 @@
                     handleRunStatement(metadataProvider, stmt, hcc);
                     break;
                 }
+
+                default:
+                    break;
             }
         }
     }
@@ -472,8 +484,8 @@
                     throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
                 }
             }
-            MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
-                    stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
+            MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(),
+                    new Dataverse(dvName, stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
@@ -491,8 +503,8 @@
             throw new AsterixException("Unknown compaction policy: " + compactionPolicy);
         }
         String compactionPolicyFactoryClassName = compactionPolicyEntity.getClassName();
-        ILSMMergePolicyFactory mergePolicyFactory = (ILSMMergePolicyFactory) Class.forName(
-                compactionPolicyFactoryClassName).newInstance();
+        ILSMMergePolicyFactory mergePolicyFactory = (ILSMMergePolicyFactory) Class
+                .forName(compactionPolicyFactoryClassName).newInstance();
         if (isExternalDataset && mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) {
             throw new AsterixException("The correlated-prefix merge policy cannot be used with external dataset.");
         }
@@ -555,8 +567,8 @@
             if (dt == null) {
                 throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
             }
-            String ngName = ngNameId != null ? ngNameId.getValue() : configureNodegroupForDataset(dd, dataverseName,
-                    mdTxnCtx);
+            String ngName = ngNameId != null ? ngNameId.getValue()
+                    : configureNodegroupForDataset(dd, dataverseName, mdTxnCtx);
 
             if (compactionPolicy == null) {
                 compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
@@ -777,7 +789,6 @@
 
     }
 
-    @SuppressWarnings("unchecked")
     private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         ProgressState progress = ProgressState.NO_PROGRESS;
@@ -804,8 +815,8 @@
             ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
                     datasetName);
             if (ds == null) {
-                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-                        + dataverseName);
+                throw new AlgebricksException(
+                        "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
             }
 
             indexName = stmtCreateIndex.getIndexName().getValue();
@@ -894,9 +905,9 @@
                 // External dataset
                 // Check if the dataset is indexible
                 if (!ExternalIndexingOperations.isIndexible((ExternalDatasetDetails) ds.getDatasetDetails())) {
-                    throw new AlgebricksException("dataset using "
-                            + ((ExternalDatasetDetails) ds.getDatasetDetails()).getAdapter()
-                            + " Adapter can't be indexed");
+                    throw new AlgebricksException(
+                            "dataset using " + ((ExternalDatasetDetails) ds.getDatasetDetails()).getAdapter()
+                                    + " Adapter can't be indexed");
                 }
                 // check if the name of the index is valid
                 if (!ExternalIndexingOperations.isValidIndexName(datasetName, indexName)) {
@@ -948,14 +959,14 @@
 
             //check whether there exists another enforced index on the same field
             if (stmtCreateIndex.isEnforced()) {
-                List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(
-                        metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
+                List<Index> indexes = MetadataManager.INSTANCE
+                        .getDatasetIndexes(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
                 for (Index index : indexes) {
                     if (index.getKeyFieldNames().equals(indexFields)
                             && !index.getKeyFieldTypes().equals(indexFieldTypes) && index.isEnforcingKeyFileds())
-                        throw new AsterixException("Cannot create index " + indexName + " , enforced index "
-                                + index.getIndexName() + " on field \"" + StringUtils.join(indexFields, ',')
-                                + "\" already exist");
+                        throw new AsterixException(
+                                "Cannot create index " + indexName + " , enforced index " + index.getIndexName()
+                                        + " on field \"" + StringUtils.join(indexFields, ',') + "\" already exist");
                 }
             }
 
@@ -1013,8 +1024,8 @@
             MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
             // add another new files index with PendingNoOp after deleting the index with PendingAddOp
             if (firstExternalDatasetIndex) {
-                MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                        datasetName, filesIndex.getIndexName());
+                MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
+                        filesIndex.getIndexName());
                 filesIndex.setPendingOp(IMetadataEntity.PENDING_NO_OP);
                 MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
                 // update transaction timestamp
@@ -1055,8 +1066,8 @@
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
                 try {
-                    JobSpecification jobSpec = IndexOperations
-                            .buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds);
+                    JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider,
+                            ds);
 
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
@@ -1188,8 +1199,8 @@
             for (FeedConnectionId connection : activeFeedConnections) {
                 FeedId feedId = connection.getFeedId();
                 if (feedId.getDataverse().equals(dataverseName)) {
-                    disStmt = new DisconnectFeedStatement(dvId, new Identifier(feedId.getFeedName()), new Identifier(
-                            connection.getDatasetName()));
+                    disStmt = new DisconnectFeedStatement(dvId, new Identifier(feedId.getFeedName()),
+                            new Identifier(connection.getDatasetName()));
                     try {
                         handleDisconnectFeedStatement(metadataProvider, disStmt, hcc);
                         if (LOGGER.isLoggable(Level.INFO)) {
@@ -1250,8 +1261,8 @@
             //   first, deleting the dataverse record from the DATAVERSE_DATASET
             //   second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
             MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
-            MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dv.getDataFormat(),
-                    IMetadataEntity.PENDING_DROP_OP));
+            MetadataManager.INSTANCE.addDataverse(mdTxnCtx,
+                    new Dataverse(dataverseName, dv.getDataFormat(), IMetadataEntity.PENDING_DROP_OP));
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -1332,8 +1343,8 @@
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     return;
                 } else {
-                    throw new AlgebricksException("There is no dataset with this name " + datasetName
-                            + " in dataverse " + dataverseName + ".");
+                    throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+                            + dataverseName + ".");
                 }
             }
 
@@ -1367,11 +1378,11 @@
 
                 //#. mark the existing dataset as PendingDropOp
                 MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-                MetadataManager.INSTANCE.addDataset(
-                        mdTxnCtx,
-                        new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getNodeGroupName(), ds
-                                .getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(), ds
-                                .getHints(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
+                MetadataManager.INSTANCE.addDataset(mdTxnCtx,
+                        new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getNodeGroupName(),
+                                ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(),
+                                ds.getHints(), ds.getDatasetType(), ds.getDatasetId(),
+                                IMetadataEntity.PENDING_DROP_OP));
 
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
@@ -1403,18 +1414,18 @@
                     } else {
                         CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
                                 indexes.get(j).getIndexName());
-                        jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider,
-                                ds));
+                        jobsToExecute
+                                .add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds));
                     }
                 }
 
                 //#. mark the existing dataset as PendingDropOp
                 MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-                MetadataManager.INSTANCE.addDataset(
-                        mdTxnCtx,
-                        new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getNodeGroupName(), ds
-                                .getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(), ds
-                                .getHints(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
+                MetadataManager.INSTANCE.addDataset(mdTxnCtx,
+                        new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getNodeGroupName(),
+                                ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(),
+                                ds.getHints(), ds.getDatasetType(), ds.getDatasetId(),
+                                IMetadataEntity.PENDING_DROP_OP));
 
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
@@ -1500,8 +1511,8 @@
 
             Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
             if (ds == null) {
-                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-                        + dataverseName);
+                throw new AlgebricksException(
+                        "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
             }
 
             List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
@@ -1515,9 +1526,9 @@
                     }
                 }
                 if (resourceInUse) {
-                    throw new AsterixException("Dataset" + datasetName
-                            + " is currently being fed into by the following feeds " + "." + builder.toString()
-                            + "\nOperation not supported.");
+                    throw new AsterixException(
+                            "Dataset" + datasetName + " is currently being fed into by the following feeds " + "."
+                                    + builder.toString() + "\nOperation not supported.");
                 }
             }
 
@@ -1538,11 +1549,10 @@
 
                 //#. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-                MetadataManager.INSTANCE.addIndex(
-                        mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, index.getIndexType(),
-                                index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index
-                                        .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+                MetadataManager.INSTANCE.addIndex(mdTxnCtx,
+                        new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
+                                index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
+                                IMetadataEntity.PENDING_DROP_OP));
 
                 //#. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1586,28 +1596,26 @@
                         if (ExternalIndexingOperations.isFileIndex(externalIndex)) {
                             cds = new CompiledIndexDropStatement(dataverseName, datasetName,
                                     externalIndex.getIndexName());
-                            jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds,
-                                    metadataProvider, ds));
+                            jobsToExecute.add(
+                                    ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds));
                             //#. mark PendingDropOp on the existing files index
                             MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName,
                                     externalIndex.getIndexName());
-                            MetadataManager.INSTANCE.addIndex(
-                                    mdTxnCtx,
-                                    new Index(dataverseName, datasetName, externalIndex.getIndexName(), externalIndex
-                                            .getIndexType(), externalIndex.getKeyFieldNames(),
-                                            index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), externalIndex
-                                                    .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+                            MetadataManager.INSTANCE.addIndex(mdTxnCtx,
+                                    new Index(dataverseName, datasetName, externalIndex.getIndexName(),
+                                            externalIndex.getIndexType(), externalIndex.getKeyFieldNames(),
+                                            index.getKeyFieldTypes(), index.isEnforcingKeyFileds(),
+                                            externalIndex.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
                         }
                     }
                 }
 
                 //#. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-                MetadataManager.INSTANCE.addIndex(
-                        mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, index.getIndexType(),
-                                index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index
-                                        .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+                MetadataManager.INSTANCE.addIndex(mdTxnCtx,
+                        new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
+                                index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
+                                IMetadataEntity.PENDING_DROP_OP));
 
                 //#. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1666,8 +1674,8 @@
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName
-                            + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
+                    throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName + "."
+                            + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
                 }
             }
 
@@ -1763,8 +1771,8 @@
         FunctionSignature signature = stmtDropFunction.getFunctionSignature();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.functionStatementBegin(signature.getNamespace(), signature.getNamespace() + "."
-                + signature.getName());
+        MetadataLockManager.INSTANCE.functionStatementBegin(signature.getNamespace(),
+                signature.getNamespace() + "." + signature.getName());
         try {
             Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
             if (function == null) {
@@ -1778,8 +1786,8 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.functionStatementEnd(signature.getNamespace(), signature.getNamespace() + "."
-                    + signature.getName());
+            MetadataLockManager.INSTANCE.functionStatementEnd(signature.getNamespace(),
+                    signature.getNamespace() + "." + signature.getName());
         }
     }
 
@@ -1793,11 +1801,11 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         MetadataLockManager.INSTANCE.modifyDatasetBegin(dataverseName, dataverseName + "." + datasetName);
         try {
-            CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt
-                    .getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
+            CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName,
+                    loadStmt.getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
                     loadStmt.dataIsAlreadySorted());
-            JobSpecification spec = APIFramework
-                    .compileQuery(null, metadataProvider, null, 0, null, sessionConfig, cls);
+            JobSpecification spec = apiFramework.compileQuery(null, metadataProvider, null, 0, null, sessionConfig,
+                    cls);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             if (spec != null) {
@@ -1822,13 +1830,13 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.insertDeleteBegin(dataverseName,
-                dataverseName + "." + stmtInsert.getDatasetName(), query.getDataverses(), query.getDatasets());
+        MetadataLockManager.INSTANCE.insertDeleteBegin(dataverseName, dataverseName + "." + stmtInsert.getDatasetName(),
+                query.getDataverses(), query.getDatasets());
 
         try {
             metadataProvider.setWriteTransaction(true);
-            CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
-                    .getValue(), query, stmtInsert.getVarCounter());
+            CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName,
+                    stmtInsert.getDatasetName().getValue(), query, stmtInsert.getVarCounter());
             JobSpecification compiled = rewriteCompileQuery(metadataProvider, query, clfrqs);
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1857,15 +1865,14 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE
-                .insertDeleteBegin(dataverseName, dataverseName + "." + stmtDelete.getDatasetName(),
-                        stmtDelete.getDataverses(), stmtDelete.getDatasets());
+        MetadataLockManager.INSTANCE.insertDeleteBegin(dataverseName, dataverseName + "." + stmtDelete.getDatasetName(),
+                stmtDelete.getDataverses(), stmtDelete.getDatasets());
 
         try {
             metadataProvider.setWriteTransaction(true);
             CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
                     stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getVarCounter(),
-                    metadataProvider);
+                    stmtDelete.getQuery());
             JobSpecification compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1888,15 +1895,15 @@
     }
 
     private JobSpecification rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
-            ICompiledDmlStatement stmt) throws AsterixException, RemoteException, AlgebricksException, JSONException,
-            ACIDException {
+            ICompiledDmlStatement stmt)
+                    throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
 
         // Query Rewriting (happens under the same ongoing metadata transaction)
-        Pair<Query, Integer> reWrittenQuery = APIFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
+        Pair<Query, Integer> reWrittenQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
                 sessionConfig);
 
         // Query Compilation (happens under the same ongoing metadata transaction)
-        JobSpecification spec = APIFramework.compileQuery(declaredFunctions, metadataProvider, reWrittenQuery.first,
+        JobSpecification spec = apiFramework.compileQuery(declaredFunctions, metadataProvider, reWrittenQuery.first,
                 reWrittenQuery.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, stmt);
 
         return spec;
@@ -1976,8 +1983,8 @@
             boolean extendingExisting = cfps.getSourcePolicyName() != null;
             String description = cfps.getDescription() == null ? "" : cfps.getDescription();
             if (extendingExisting) {
-                FeedPolicy sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(
-                        metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName());
+                FeedPolicy sourceFeedPolicy = MetadataManager.INSTANCE
+                        .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName());
                 if (sourceFeedPolicy == null) {
                     sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
                             MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
@@ -2100,13 +2107,13 @@
         IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
         FeedConnectionId feedConnId = null;
 
-        MetadataLockManager.INSTANCE.connectFeedBegin(dataverseName, dataverseName + "." + datasetName, dataverseName
-                + "." + feedName);
+        MetadataLockManager.INSTANCE.connectFeedBegin(dataverseName, dataverseName + "." + datasetName,
+                dataverseName + "." + feedName);
         try {
             metadataProvider.setWriteTransaction(true);
 
-            CompiledConnectFeedStatement cbfs = new CompiledConnectFeedStatement(dataverseName, cfs.getFeedName(), cfs
-                    .getDatasetName().getValue(), cfs.getPolicy(), cfs.getQuery(), cfs.getVarCounter());
+            CompiledConnectFeedStatement cbfs = new CompiledConnectFeedStatement(dataverseName, cfs.getFeedName(),
+                    cfs.getDatasetName().getValue(), cfs.getPolicy(), cfs.getQuery(), cfs.getVarCounter());
 
             FeedUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(),
                     metadataProvider.getMetadataTxnContext());
@@ -2164,8 +2171,8 @@
                 eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_ENDED); // blocking call
             }
             String waitForCompletionParam = metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION);
-            boolean waitForCompletion = waitForCompletionParam == null ? false : Boolean
-                    .valueOf(waitForCompletionParam);
+            boolean waitForCompletion = waitForCompletionParam == null ? false
+                    : Boolean.valueOf(waitForCompletionParam);
             if (waitForCompletion) {
                 MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
                         dataverseName + "." + feedName);
@@ -2190,7 +2197,7 @@
     /**
      * Generates a subscription request corresponding to a connect feed request. In addition, provides a boolean
      * flag indicating if feed intake job needs to be started (source primary feed not found to be active).
-     * 
+     *
      * @param dataverse
      * @param feed
      * @param dataset
@@ -2201,7 +2208,7 @@
      */
     private Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> getFeedConnectionRequest(String dataverse,
             Feed feed, String dataset, FeedPolicy feedPolicy, MetadataTransactionContext mdTxnCtx)
-            throws MetadataException {
+                    throws MetadataException {
         IFeedJoint sourceFeedJoint = null;
         FeedConnectionRequest request = null;
         List<String> functionsToApply = new ArrayList<String>();
@@ -2216,7 +2223,7 @@
             sourceFeedJoint = FeedLifecycleListener.INSTANCE.getAvailableFeedJoint(feedJointKey);
             if (sourceFeedJoint == null) { // the feed is currently not being ingested, i.e., it is unavailable.
                 connectionLocation = ConnectionLocation.SOURCE_FEED_INTAKE_STAGE;
-                FeedId sourceFeedId = feedJointKey.getFeedId(); // the root/primary feedId 
+                FeedId sourceFeedId = feedJointKey.getFeedId(); // the root/primary feedId
                 Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, sourceFeedId.getFeedName());
                 FeedJointKey intakeFeedJointKey = new FeedJointKey(sourceFeedId, new ArrayList<String>());
                 sourceFeedJoint = new FeedJoint(intakeFeedJointKey, primaryFeed.getFeedId(), connectionLocation,
@@ -2235,7 +2242,7 @@
                 }
             }
             // register the compute feed point that represents the final output from the collection of
-            // functions that will be applied. 
+            // functions that will be applied.
             if (!functionsToApply.isEmpty()) {
                 FeedJointKey computeFeedJointKey = new FeedJointKey(feed.getFeedId(), functionsToApply);
                 IFeedJoint computeFeedJoint = new FeedJoint(computeFeedJointKey, feed.getFeedId(),
@@ -2258,7 +2265,7 @@
     }
 
     /*
-     * Gets the feed joint corresponding to the feed definition. Tuples constituting the feed are 
+     * Gets the feed joint corresponding to the feed definition. Tuples constituting the feed are
      * available at this feed joint.
      */
     private FeedJointKey getFeedJointKey(Feed feed, MetadataTransactionContext ctx) throws MetadataException {
@@ -2306,12 +2313,12 @@
             Dataset dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
                     dataverseName, cfs.getDatasetName().getValue());
             if (dataset == null) {
-                throw new AsterixException("Unknown dataset :" + cfs.getDatasetName().getValue() + " in dataverse "
-                        + dataverseName);
+                throw new AsterixException(
+                        "Unknown dataset :" + cfs.getDatasetName().getValue() + " in dataverse " + dataverseName);
             }
 
-            Pair<JobSpecification, Boolean> specDisconnectType = FeedOperations.buildDisconnectFeedJobSpec(
-                    metadataProvider, connectionId);
+            Pair<JobSpecification, Boolean> specDisconnectType = FeedOperations
+                    .buildDisconnectFeedJobSpec(metadataProvider, connectionId);
             JobSpecification jobSpec = specDisconnectType.first;
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -2349,23 +2356,23 @@
 
         CompiledSubscribeFeedStatement csfs = new CompiledSubscribeFeedStatement(bfs.getSubscriptionRequest(),
                 bfs.getQuery(), bfs.getVarCounter());
-        metadataProvider.getConfig().put(FunctionUtils.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
+        metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
         metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, "" + bfs.getPolicy());
         metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS,
                 StringUtils.join(bfs.getLocations(), ','));
 
         JobSpecification compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), csfs);
-        FeedConnectionId feedConnectionId = new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(), bfs
-                .getSubscriptionRequest().getTargetDataset());
+        FeedConnectionId feedConnectionId = new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(),
+                bfs.getSubscriptionRequest().getTargetDataset());
         String dataverse = feedConnectionId.getFeedId().getDataverse();
         String dataset = feedConnectionId.getDatasetName();
-        MetadataLockManager.INSTANCE.subscribeFeedBegin(dataverse, dataverse + "." + dataset, dataverse + "."
-                + feedConnectionId.getFeedId().getFeedName());
+        MetadataLockManager.INSTANCE.subscribeFeedBegin(dataverse, dataverse + "." + dataset,
+                dataverse + "." + feedConnectionId.getFeedId().getFeedName());
 
         try {
 
-            JobSpecification alteredJobSpec = FeedUtil.alterJobSpecificationForFeed(compiled, feedConnectionId, bfs
-                    .getSubscriptionRequest().getPolicyParameters());
+            JobSpecification alteredJobSpec = FeedUtil.alterJobSpecificationForFeed(compiled, feedConnectionId,
+                    bfs.getSubscriptionRequest().getPolicyParameters());
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
@@ -2380,8 +2387,8 @@
             }
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.subscribeFeedEnd(dataverse, dataverse + "." + dataset, dataverse + "."
-                    + feedConnectionId.getFeedId().getFeedName());
+            MetadataLockManager.INSTANCE.subscribeFeedEnd(dataverse, dataverse + "." + dataset,
+                    dataverse + "." + feedConnectionId.getFeedId().getFeedName());
         }
     }
 
@@ -2399,8 +2406,8 @@
         try {
             Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
             if (ds == null) {
-                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-                        + dataverseName + ".");
+                throw new AlgebricksException(
+                        "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName + ".");
             }
 
             String itemTypeName = ds.getItemTypeName();
@@ -2410,22 +2417,17 @@
             // Prepare jobs to compact the datatset and its indexes
             List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
             if (indexes.size() == 0) {
-                throw new AlgebricksException("Cannot compact the extrenal dataset " + datasetName
-                        + " because it has no indexes");
+                throw new AlgebricksException(
+                        "Cannot compact the extrenal dataset " + datasetName + " because it has no indexes");
             }
 
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 for (int j = 0; j < indexes.size(); j++) {
                     if (indexes.get(j).isSecondaryIndex()) {
-                        CompiledIndexCompactStatement cics = new CompiledIndexCompactStatement(dataverseName,
-                                datasetName, indexes.get(j).getIndexName(), indexes.get(j).getKeyFieldNames(), indexes
-                                        .get(j).getKeyFieldTypes(), indexes.get(j).isEnforcingKeyFileds(), indexes.get(
-                                        j).getGramLength(), indexes.get(j).getIndexType());
-
-                        Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(
-                                metadataProvider.getMetadataTxnContext(), dataverseName);
-                        jobsToExecute.add(DatasetOperations.compactDatasetJobSpec(dataverse, datasetName,
-                                metadataProvider));
+                        Dataverse dataverse = MetadataManager.INSTANCE
+                                .getDataverse(metadataProvider.getMetadataTxnContext(), dataverseName);
+                        jobsToExecute
+                                .add(DatasetOperations.compactDatasetJobSpec(dataverse, datasetName, metadataProvider));
 
                     }
                 }
@@ -2433,9 +2435,9 @@
                 for (int j = 0; j < indexes.size(); j++) {
                     if (!ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
                         CompiledIndexCompactStatement cics = new CompiledIndexCompactStatement(dataverseName,
-                                datasetName, indexes.get(j).getIndexName(), indexes.get(j).getKeyFieldNames(), indexes
-                                        .get(j).getKeyFieldTypes(), indexes.get(j).isEnforcingKeyFileds(), indexes.get(
-                                        j).getGramLength(), indexes.get(j).getIndexType());
+                                datasetName, indexes.get(j).getIndexName(), indexes.get(j).getKeyFieldNames(),
+                                indexes.get(j).getKeyFieldTypes(), indexes.get(j).isEnforcingKeyFileds(),
+                                indexes.get(j).getGramLength(), indexes.get(j).getIndexType());
                         ARecordType aRecordType = (ARecordType) dt.getDatatype();
                         ARecordType enforcedType = null;
                         if (cics.isEnforced()) {
@@ -2595,13 +2597,13 @@
 
             // Dataset exists ?
             if (ds == null) {
-                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-                        + dataverseName);
+                throw new AlgebricksException(
+                        "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
             }
             // Dataset external ?
             if (ds.getDatasetType() != DatasetType.EXTERNAL) {
-                throw new AlgebricksException("dataset " + datasetName + " in dataverse " + dataverseName
-                        + " is not an external dataset");
+                throw new AlgebricksException(
+                        "dataset " + datasetName + " in dataverse " + dataverseName + " is not an external dataset");
             }
             // Dataset has indexes ?
             indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
@@ -2625,8 +2627,8 @@
 
             // Compute delta
             // Now we compare snapshot with external file system
-            if (ExternalIndexingOperations
-                    .isDatasetUptodate(ds, metadataFiles, addedFiles, deletedFiles, appendedFiles)) {
+            if (ExternalIndexingOperations.isDatasetUptodate(ds, metadataFiles, addedFiles, deletedFiles,
+                    appendedFiles)) {
                 ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(txnTime);
                 MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2802,8 +2804,8 @@
                 handlePregelixStatement(metadataProvider, runStmt, hcc);
                 break;
             default:
-                throw new AlgebricksException("The system \"" + runStmt.getSystem()
-                        + "\" specified in your run statement is not supported.");
+                throw new AlgebricksException(
+                        "The system \"" + runStmt.getSystem() + "\" specified in your run statement is not supported.");
         }
 
     }
@@ -2832,8 +2834,8 @@
 
             // construct input paths
             Index fromIndex = null;
-            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameFrom, pregelixStmt
-                    .getDatasetNameFrom().getValue());
+            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameFrom,
+                    pregelixStmt.getDatasetNameFrom().getValue());
             for (Index ind : indexes) {
                 if (ind.isPrimaryIndex())
                     fromIndex = ind;
@@ -2845,8 +2847,8 @@
 
             Dataset datasetFrom = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseNameFrom, datasetNameFrom);
             IFileSplitProvider fromSplits = metadataProvider.splitProviderAndPartitionConstraintsForDataset(
-                    dataverseNameFrom, datasetNameFrom, fromIndex.getIndexName(), datasetFrom.getDatasetDetails()
-                            .isTemp()).first;
+                    dataverseNameFrom, datasetNameFrom, fromIndex.getIndexName(),
+                    datasetFrom.getDatasetDetails().isTemp()).first;
             StringBuilder fromSplitsPaths = new StringBuilder();
 
             for (FileSplit f : fromSplits.getFileSplits()) {
@@ -2857,8 +2859,8 @@
 
             // Construct output paths
             Index toIndex = null;
-            indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo, pregelixStmt
-                    .getDatasetNameTo().getValue());
+            indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo,
+                    pregelixStmt.getDatasetNameTo().getValue());
             for (Index ind : indexes) {
                 if (ind.isPrimaryIndex())
                     toIndex = ind;
@@ -2870,7 +2872,8 @@
 
             Dataset datasetTo = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseNameTo, datasetNameTo);
             IFileSplitProvider toSplits = metadataProvider.splitProviderAndPartitionConstraintsForDataset(
-                    dataverseNameTo, datasetNameTo, toIndex.getIndexName(), datasetTo.getDatasetDetails().isTemp()).first;
+                    dataverseNameTo, datasetNameTo, toIndex.getIndexName(),
+                    datasetTo.getDatasetDetails().isTemp()).first;
             StringBuilder toSplitsPaths = new StringBuilder();
 
             for (FileSplit f : toSplits.getFileSplits()) {
@@ -2885,11 +2888,11 @@
                         pregelixStmt.getDatasetNameTo(), true);
                 this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
 
-                IDatasetDetailsDecl idd = new InternalDetailsDecl(toIndex.getKeyFieldNames(), false, null, toDataset
-                        .getDatasetDetails().isTemp());
+                IDatasetDetailsDecl idd = new InternalDetailsDecl(toIndex.getKeyFieldNames(), false, null,
+                        toDataset.getDatasetDetails().isTemp());
                 DatasetDecl createToDataset = new DatasetDecl(new Identifier(dataverseNameTo),
-                        pregelixStmt.getDatasetNameTo(), new Identifier(toDataset.getItemTypeName()), new Identifier(
-                                toDataset.getNodeGroupName()), toDataset.getCompactionPolicy(),
+                        pregelixStmt.getDatasetNameTo(), new Identifier(toDataset.getItemTypeName()),
+                        new Identifier(toDataset.getNodeGroupName()), toDataset.getCompactionPolicy(),
                         toDataset.getCompactionPolicyProperties(), toDataset.getHints(), toDataset.getDatasetType(),
                         idd, false);
                 this.handleCreateDatasetStatement(metadataProvider, createToDataset, hcc);
@@ -2996,7 +2999,7 @@
 
     private void flushDataset(IHyracksClientConnection hcc, AqlMetadataProvider metadataProvider,
             MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String indexName)
-            throws Exception {
+                    throws Exception {
         AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
         int frameSize = compilerProperties.getFrameSize();
         JobSpecification spec = new JobSpecification(frameSize);
@@ -3013,8 +3016,8 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);
 
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, dataset
-                        .getDatasetDetails().isTemp());
+                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName,
+                        dataset.getDatasetDetails().isTemp());
         AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
 
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,
@@ -3072,4 +3075,9 @@
         }
     }
 
+    private void rewriteStatement(Statement stmt) throws AsterixException {
+        IStatementRewriter rewriter = rewriterFactory.createStatementRewriter();
+        rewriter.rewrite(stmt);
+    }
+
 }
diff --git a/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java b/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
index bfb356c..b4fa1d3 100644
--- a/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
+++ b/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
@@ -23,15 +23,16 @@
 import java.io.Reader;
 import java.util.List;
 
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.api.java.AsterixJavaClient;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.compiler.provider.AqlCompilationProvider;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.commons.io.FileUtils;
 import org.kohsuke.args4j.Argument;
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
-import org.apache.asterix.api.java.AsterixJavaClient;
-import org.apache.asterix.common.config.GlobalConfig;
-
 public class AsterixCLI {
     private static class Options {
         @Option(name = "-properties", usage = "Name of properties file", required = true)
@@ -48,13 +49,13 @@
         Options options = new Options();
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
-
+        ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
         setUp(options);
         try {
             for (String queryFile : options.args) {
                 Reader in = new FileReader(queryFile);
                 AsterixJavaClient ajc = new AsterixJavaClient(
-                        AsterixHyracksIntegrationUtil.getHyracksClientConnection(), in);
+                        AsterixHyracksIntegrationUtil.getHyracksClientConnection(), in, compilationProvider);
                 try {
                     ajc.compile(true, false, false, false, false, true, false);
                 } finally {
diff --git a/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixClientDriver.java b/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixClientDriver.java
index ff2a376..14e0e77 100644
--- a/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixClientDriver.java
+++ b/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixClientDriver.java
@@ -20,12 +20,13 @@
 
 import java.io.FileReader;
 
-import org.kohsuke.args4j.CmdLineParser;
-
 import org.apache.asterix.api.common.AsterixClientConfig;
 import org.apache.asterix.api.java.AsterixJavaClient;
+import org.apache.asterix.compiler.provider.AqlCompilationProvider;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.kohsuke.args4j.CmdLineParser;
 
 public class AsterixClientDriver {
 
@@ -50,8 +51,8 @@
         }
         boolean exec = new Boolean(acc.execute);
         IHyracksClientConnection hcc = exec ? new HyracksConnection("localhost", acc.hyracksPort) : null;
-        AsterixJavaClient q = compileQuery(hcc, acc.getArguments().get(0), new Boolean(acc.optimize), new Boolean(
-                acc.onlyPhysical), exec || new Boolean(acc.hyracksJob));
+        AsterixJavaClient q = compileQuery(hcc, acc.getArguments().get(0), new Boolean(acc.optimize),
+                new Boolean(acc.onlyPhysical), exec || new Boolean(acc.hyracksJob));
         if (exec) {
             q.execute();
         }
@@ -59,8 +60,9 @@
 
     private static AsterixJavaClient compileQuery(IHyracksClientConnection hcc, String filename, boolean optimize,
             boolean onlyPhysical, boolean createBinaryRuntime) throws Exception {
+        ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
         FileReader reader = new FileReader(filename);
-        AsterixJavaClient q = new AsterixJavaClient(hcc, reader);
+        AsterixJavaClient q = new AsterixJavaClient(hcc, reader, compilationProvider);
         q.compile(optimize, true, true, true, onlyPhysical, createBinaryRuntime, createBinaryRuntime);
         return q;
     }
diff --git a/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixWebServer.java b/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixWebServer.java
index 56701fd..8347476 100644
--- a/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixWebServer.java
+++ b/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixWebServer.java
@@ -18,12 +18,11 @@
  */
 package org.apache.asterix.drivers;
 
+import org.apache.asterix.api.http.servlet.APIServlet;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 
-import org.apache.asterix.api.http.servlet.APIServlet;
-
 public class AsterixWebServer {
     public static void main(String args[]) throws Exception {
         Server server = new Server(8080);
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/CentralFeedManager.java b/asterix-app/src/main/java/org/apache/asterix/feeds/CentralFeedManager.java
index 8183a06..7326d08 100644
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/CentralFeedManager.java
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/CentralFeedManager.java
@@ -25,12 +25,16 @@
 
 import org.apache.asterix.api.common.SessionConfig;
 import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.AqlTranslator;
+import org.apache.asterix.aql.translator.QueryTranslator;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.feeds.api.ICentralFeedManager;
 import org.apache.asterix.common.feeds.api.IFeedLoadManager;
 import org.apache.asterix.common.feeds.api.IFeedTrackingManager;
-import org.apache.asterix.lang.aql.parser.AQLParser;
+import org.apache.asterix.compiler.provider.AqlCompilationProvider;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.aql.parser.AQLParserFactory;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.metadata.feeds.SocketMessageListener;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
@@ -41,6 +45,7 @@
 public class CentralFeedManager implements ICentralFeedManager {
 
     private static final ICentralFeedManager centralFeedManager = new CentralFeedManager();
+    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
 
     public static ICentralFeedManager getInstance() {
         return centralFeedManager;
@@ -90,15 +95,15 @@
     public static class AQLExecutor {
 
         private static final PrintWriter out = new PrintWriter(System.out, true);
+        private static final IParserFactory parserFactory = new AQLParserFactory();
 
         public static void executeAQL(String aql) throws Exception {
-            AQLParser parser = new AQLParser(new StringReader(aql));
-            List<Statement> statements;
-            statements = parser.Statement();
+            IParser parser = parserFactory.createParser(new StringReader(aql));
+            List<Statement> statements = parser.parse();
             SessionConfig pc = new SessionConfig(out, OutputFormat.ADM);
-            AqlTranslator translator = new AqlTranslator(statements, pc);
+            QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
             translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
-                    AqlTranslator.ResultDelivery.SYNC);
+                    QueryTranslator.ResultDelivery.SYNC);
         }
     }
 
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLifecycleListener.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLifecycleListener.java
index 4ca9961..202451b 100644
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLifecycleListener.java
@@ -35,7 +35,7 @@
 
 import org.apache.asterix.api.common.SessionConfig;
 import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.AqlTranslator;
+import org.apache.asterix.aql.translator.QueryTranslator;
 import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.api.IClusterManagementWorkResponse;
@@ -52,6 +52,8 @@
 import org.apache.asterix.common.feeds.api.IFeedLifecycleListener;
 import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
 import org.apache.asterix.common.feeds.message.StorageReportFeedMessage;
+import org.apache.asterix.compiler.provider.AqlCompilationProvider;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.statement.DisconnectFeedStatement;
@@ -83,6 +85,7 @@
     private static final Logger LOGGER = Logger.getLogger(FeedLifecycleListener.class.getName());
 
     public static FeedLifecycleListener INSTANCE = new FeedLifecycleListener();
+    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
 
     private final LinkedBlockingQueue<Message> jobEventInbox;
     private final LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
@@ -363,9 +366,9 @@
                 DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(feedId.getDataverse()));
                 statements.add(dataverseDecl);
                 statements.add(stmt);
-                AqlTranslator translator = new AqlTranslator(statements, pc);
+                QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
                 translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
-                        AqlTranslator.ResultDelivery.SYNC);
+                        QueryTranslator.ResultDelivery.SYNC);
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("End irrecoverable feed: " + cInfo.getConnectionId());
                 }
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedsActivator.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedsActivator.java
index 9e8864c..7660007 100644
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedsActivator.java
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedsActivator.java
@@ -26,7 +26,9 @@
 
 import org.apache.asterix.api.common.SessionConfig;
 import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.AqlTranslator;
+import org.apache.asterix.aql.translator.QueryTranslator;
+import org.apache.asterix.compiler.provider.AqlCompilationProvider;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
@@ -37,6 +39,7 @@
 public class FeedsActivator implements Runnable {
 
     private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName());
+    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
 
     private List<FeedCollectInfo> feedsToRevive;
     private Mode mode;
@@ -97,8 +100,9 @@
             List<Statement> statements = new ArrayList<Statement>();
             statements.add(dataverseDecl);
             statements.add(stmt);
-            AqlTranslator translator = new AqlTranslator(statements, pc);
-            translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, AqlTranslator.ResultDelivery.SYNC);
+            QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
+            translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
+                    QueryTranslator.ResultDelivery.SYNC);
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Resumed feed: " + dataverse + ":" + dataset + " using policy " + feedPolicy);
             }
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 90697ab..24d2283 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -34,11 +34,11 @@
 import org.apache.asterix.api.http.servlet.VersionAPIServlet;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.asterix.common.config.AsterixBuildProperties;
 import org.apache.asterix.common.config.AsterixExternalProperties;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
-import org.apache.asterix.common.feeds.api.ICentralFeedManager;
+import org.apache.asterix.compiler.provider.AqlCompilationProvider;
+import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.event.service.ILookupService;
 import org.apache.asterix.feeds.FeedLifecycleListener;
 import org.apache.asterix.metadata.MetadataManager;
@@ -66,7 +66,6 @@
     private Server webServer;
     private Server jsonAPIServer;
     private Server feedServer;
-    private ICentralFeedManager centralFeedManager;
 
     private static IAsterixStateProxy proxy;
     private ICCApplicationContext appCtx;
@@ -94,7 +93,6 @@
         AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
         setupWebServer(externalProperties);
         webServer.start();
-        AsterixBuildProperties buildProperties = AsterixAppContextInfo.getInstance().getBuildProperties();
         setupJSONAPIServer(externalProperties);
         jsonAPIServer.start();
 
@@ -164,12 +162,22 @@
         context.setAttribute(ASTERIX_BUILD_PROP_ATTR, AsterixAppContextInfo.getInstance());
 
         jsonAPIServer.setHandler(context);
-        context.addServlet(new ServletHolder(new QueryAPIServlet()), "/query");
+
+        // AQL rest APIs.
+        context.addServlet(new ServletHolder(new QueryAPIServlet(new AqlCompilationProvider())), "/query");
+        context.addServlet(new ServletHolder(new UpdateAPIServlet(new AqlCompilationProvider())), "/update");
+        context.addServlet(new ServletHolder(new DDLAPIServlet(new AqlCompilationProvider())), "/ddl");
+        context.addServlet(new ServletHolder(new AQLAPIServlet(new AqlCompilationProvider())), "/aql");
+
+        // SQL++ rest APIs.
+        context.addServlet(new ServletHolder(new QueryAPIServlet(new SqlppCompilationProvider())), "/query/sqlpp");
+        context.addServlet(new ServletHolder(new UpdateAPIServlet(new SqlppCompilationProvider())), "/update/sqlpp");
+        context.addServlet(new ServletHolder(new DDLAPIServlet(new SqlppCompilationProvider())), "/ddl/sqlpp");
+        context.addServlet(new ServletHolder(new AQLAPIServlet(new SqlppCompilationProvider())), "/sqlpp");
+
+        // Other APIs.
         context.addServlet(new ServletHolder(new QueryStatusAPIServlet()), "/query/status");
         context.addServlet(new ServletHolder(new QueryResultAPIServlet()), "/query/result");
-        context.addServlet(new ServletHolder(new UpdateAPIServlet()), "/update");
-        context.addServlet(new ServletHolder(new DDLAPIServlet()), "/ddl");
-        context.addServlet(new ServletHolder(new AQLAPIServlet()), "/aql");
         context.addServlet(new ServletHolder(new ConnectorAPIServlet()), "/connector");
         context.addServlet(new ServletHolder(new ShutdownAPIServlet()), "/admin/shutdown");
         context.addServlet(new ServletHolder(new VersionAPIServlet()), "/admin/version");
diff --git a/asterix-app/src/main/resources/webui/querytemplate.html b/asterix-app/src/main/resources/webui/querytemplate.html
index 6ad4818..9115b89 100644
--- a/asterix-app/src/main/resources/webui/querytemplate.html
+++ b/asterix-app/src/main/resources/webui/querytemplate.html
@@ -220,6 +220,12 @@
             </div>
 
             <div>
+              <label id="query-language" class="optlabel"> Query Language:<br/>
+                <select name="query-language" class="btn">
+                  <option selected value="AQL">AQL</option>
+                  <option value="SQLPP">SQL++</option>
+                </select>
+              </label>
               <label id="output-format" class="optlabel"> Output Format:<br/>
                 <select name="output-format" class="btn">
                   <option selected value="ADM">ADM</option>