Coordinated change to support parameterized queries

Change-Id: Icce06a1548a4f4150545c1fda7e5be3608472af5
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
index 69145d9..c48ec54 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
@@ -108,9 +108,8 @@
     public static boolean runDeployedJobSpecCheckPeriod(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
             Map<byte[], byte[]> jobParameters, long period, EntityId entityId, ITxnIdFactory txnIdFactory,
             DeployedJobSpecEventListener listener) throws Exception {
-        long executionMilliseconds =
-                runDeployedJobSpec(distributedId, hcc, null, jobParameters, entityId, txnIdFactory, null, listener,
-                        null);
+        long executionMilliseconds = runDeployedJobSpec(distributedId, hcc, null, jobParameters, entityId, txnIdFactory,
+                null, listener, null);
         if (executionMilliseconds > period) {
             LOGGER.log(Level.SEVERE,
                     "Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
@@ -153,7 +152,6 @@
 
     }
 
-
     public static long findPeriod(String duration) {
         //TODO: Allow Repetitive Channels to use YMD durations
         String hoursMinutesSeconds = "";
@@ -189,7 +187,8 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         JobSpecification jobSpec = null;
         try {
-            jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
+            jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null, null,
+                    null);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
         } catch (Exception e) {
@@ -230,7 +229,7 @@
                 jobSpec = compilePushChannel(badStatementExecutor, metadataProvider, hcc, (Query) fStatements.get(1));
             } else {
                 jobSpec = badStatementExecutor.handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc,
-                        null, null, null, null, true, null);
+                        null, null, null, null, true, null, null, null);
             }
         } else {
             //Procedures
@@ -263,7 +262,7 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         JobSpecification jobSpec;
         try {
-            jobSpec = statementExecutor.rewriteCompileQuery(hcc, metadataProvider, q, null);
+            jobSpec = statementExecutor.rewriteCompileQuery(hcc, metadataProvider, q, null, null, null);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
@@ -277,14 +276,15 @@
             IStatementExecutor.Stats stats, Statement procedureStatement) throws Exception {
         if (procedureStatement.getKind() == Statement.Kind.INSERT) {
             return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
-                    procedureStatement, hcc, hdc, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null);
+                    procedureStatement, hcc, hdc, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null,
+                    null, null);
         } else if (procedureStatement.getKind() == Statement.Kind.QUERY) {
             return compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) procedureStatement);
         } else {
             SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
             procedureStatement.accept(visitor, null);
             return ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, procedureStatement,
-                    hcc, true);
+                    hcc, true, null, null);
         }
     }
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
index 2a3d4fb..99f0d66 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
@@ -18,17 +18,11 @@
  */
 package org.apache.asterix.bad.lang;
 
-import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.compiler.provider.IRuleSetFactory;
-import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory;
+import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.lang.common.base.IParserFactory;
-import org.apache.asterix.lang.common.base.IRewriterFactory;
-import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory;
-import org.apache.asterix.lang.sqlpp.visitor.SqlppAstPrintVisitorFactory;
-import org.apache.asterix.translator.SqlppExpressionToPlanTranslatorFactory;
 
-public class BADCompilationProvider implements ILangCompilationProvider {
+public class BADCompilationProvider extends SqlppCompilationProvider {
 
     @Override
     public IParserFactory getParserFactory() {
@@ -36,21 +30,6 @@
     }
 
     @Override
-    public IRewriterFactory getRewriterFactory() {
-        return new SqlppRewriterFactory();
-    }
-
-    @Override
-    public IAstPrintVisitorFactory getAstPrintVisitorFactory() {
-        return new SqlppAstPrintVisitorFactory();
-    }
-
-    @Override
-    public ILangExpressionToPlanTranslatorFactory getExpressionToPlanTranslatorFactory() {
-        return new SqlppExpressionToPlanTranslatorFactory();
-    }
-
-    @Override
     public IRuleSetFactory getRuleSetFactory() {
         return new BADRuleSetFactory();
     }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index 2f23a9c..3ea2c0d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -334,7 +334,7 @@
                 }
             }
         }
-        final IRequestParameters requestParameters = new RequestParameters(null, null, null, null, null, null);
+        final IRequestParameters requestParameters = new RequestParameters(null, null, null, null, null, null, null);
         for (Channel channel : channels) {
             if (!channel.getChannelId().getDataverse().equals(dvId.getValue())) {
                 continue;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index ca1241c..7583f0b 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -207,13 +207,13 @@
                 InsertStatement insert = new InsertStatement(new Identifier(dataverse),
                         new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar, accessor);
                 ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc, hdc,
-                        resultDelivery, null, stats, false, null);
+                        resultDelivery, null, stats, false, null, null, null);
             } else {
                 //To update an existing subscription
                 UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse),
                         new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, null, null);
                 ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, upsert, hcc, hdc,
-                        resultDelivery, null, stats, false, null);
+                        resultDelivery, null, stats, false, null, null, null);
             }
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 1b18f83..b23bf3b 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -139,7 +139,8 @@
             MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
                     metadataProvider.getDefaultDataverse());
             tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
-            ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false);
+            ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false, null,
+                    null);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             QueryTranslator.abort(e, e, mdTxnCtx);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 204e8aa..a28666a 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -253,7 +253,7 @@
                     (Query) fStatements.get(1));
         }
         return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
-                hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null);
+                hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null, null);
     }
 
     @Override
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index be5bedb..aaf2bfa 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -176,7 +176,7 @@
 
     private Pair<JobSpecification, PrecompiledType> createProcedureJob(IStatementExecutor statementExecutor,
             MetadataProvider metadataProvider, IHyracksClientConnection hcc, Stats stats)
-                    throws Exception {
+            throws Exception {
         if (getProcedureBodyStatement().getKind() == Statement.Kind.INSERT) {
             if (!varList.isEmpty()) {
                 throw new CompilationException("Insert procedures cannot have parameters");
@@ -185,9 +185,8 @@
             dependencies.get(0).add(Arrays.asList(
                     ((QueryTranslator) statementExecutor).getActiveDataverse(insertStatement.getDataverseName()),
                     insertStatement.getDatasetName().getValue()));
-            return new Pair<>(
-                    ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
-                            getProcedureBodyStatement(), hcc, null, ResultDelivery.ASYNC, null, stats, true, null),
+            return new Pair<>(((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
+                    getProcedureBodyStatement(), hcc, null, ResultDelivery.ASYNC, null, stats, true, null, null, null),
                     PrecompiledType.INSERT);
         } else if (getProcedureBodyStatement().getKind() == Statement.Kind.QUERY) {
             SqlppRewriterFactory fact = new SqlppRewriterFactory();
@@ -209,7 +208,7 @@
                     metadataProvider);
             Pair<JobSpecification, PrecompiledType> pair =
                     new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider,
-                    getProcedureBodyStatement(), hcc, true), PrecompiledType.DELETE);
+                            getProcedureBodyStatement(), hcc, true, null, null), PrecompiledType.DELETE);
             return pair;
         } else {
             throw new CompilationException("Procedure can only execute a single delete, insert, or query");
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
index d34d170..89d940e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
@@ -122,7 +122,7 @@
             listener.suspend();
             activeEventHandler.registerListener(listener);
             BADJobService.redeployJobSpec(entityId, channel.getChannelBody(), metadataProvider, badStatementExecutor,
-                    hcc, new RequestParameters(null, null, null, null, null, null), true);
+                    hcc, new RequestParameters(null, null, null, null, null, null, null), true);
 
             ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(listener.getDeployedJobSpecId(),
                     hcc,
@@ -149,7 +149,7 @@
                             new HyracksDataset(hcc, appCtx.getCompilerProperties().getFrameSize(),
                                     ResultReader.NUM_READERS),
                             new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
-                            new IStatementExecutor.Stats(), null, null, null),
+                            new IStatementExecutor.Stats(), null, null, null, null),
                     true);
             metadataProvider.getLocks().unlock();
             //Log that the procedure stopped by cluster restart. Procedure is available again now.