Coordinated change to support parameterized queries
Change-Id: Icce06a1548a4f4150545c1fda7e5be3608472af5
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
index 69145d9..c48ec54 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
@@ -108,9 +108,8 @@
public static boolean runDeployedJobSpecCheckPeriod(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
Map<byte[], byte[]> jobParameters, long period, EntityId entityId, ITxnIdFactory txnIdFactory,
DeployedJobSpecEventListener listener) throws Exception {
- long executionMilliseconds =
- runDeployedJobSpec(distributedId, hcc, null, jobParameters, entityId, txnIdFactory, null, listener,
- null);
+ long executionMilliseconds = runDeployedJobSpec(distributedId, hcc, null, jobParameters, entityId, txnIdFactory,
+ null, listener, null);
if (executionMilliseconds > period) {
LOGGER.log(Level.SEVERE,
"Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
@@ -153,7 +152,6 @@
}
-
public static long findPeriod(String duration) {
//TODO: Allow Repetitive Channels to use YMD durations
String hoursMinutesSeconds = "";
@@ -189,7 +187,8 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
JobSpecification jobSpec = null;
try {
- jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
+ jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null, null,
+ null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
} catch (Exception e) {
@@ -230,7 +229,7 @@
jobSpec = compilePushChannel(badStatementExecutor, metadataProvider, hcc, (Query) fStatements.get(1));
} else {
jobSpec = badStatementExecutor.handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc,
- null, null, null, null, true, null);
+ null, null, null, null, true, null, null, null);
}
} else {
//Procedures
@@ -263,7 +262,7 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
JobSpecification jobSpec;
try {
- jobSpec = statementExecutor.rewriteCompileQuery(hcc, metadataProvider, q, null);
+ jobSpec = statementExecutor.rewriteCompileQuery(hcc, metadataProvider, q, null, null, null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
@@ -277,14 +276,15 @@
IStatementExecutor.Stats stats, Statement procedureStatement) throws Exception {
if (procedureStatement.getKind() == Statement.Kind.INSERT) {
return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
- procedureStatement, hcc, hdc, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null);
+ procedureStatement, hcc, hdc, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null,
+ null, null);
} else if (procedureStatement.getKind() == Statement.Kind.QUERY) {
return compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) procedureStatement);
} else {
SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
procedureStatement.accept(visitor, null);
return ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, procedureStatement,
- hcc, true);
+ hcc, true, null, null);
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
index 2a3d4fb..99f0d66 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
@@ -18,17 +18,11 @@
*/
package org.apache.asterix.bad.lang;
-import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.compiler.provider.IRuleSetFactory;
-import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory;
+import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
import org.apache.asterix.lang.common.base.IParserFactory;
-import org.apache.asterix.lang.common.base.IRewriterFactory;
-import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory;
-import org.apache.asterix.lang.sqlpp.visitor.SqlppAstPrintVisitorFactory;
-import org.apache.asterix.translator.SqlppExpressionToPlanTranslatorFactory;
-public class BADCompilationProvider implements ILangCompilationProvider {
+public class BADCompilationProvider extends SqlppCompilationProvider {
@Override
public IParserFactory getParserFactory() {
@@ -36,21 +30,6 @@
}
@Override
- public IRewriterFactory getRewriterFactory() {
- return new SqlppRewriterFactory();
- }
-
- @Override
- public IAstPrintVisitorFactory getAstPrintVisitorFactory() {
- return new SqlppAstPrintVisitorFactory();
- }
-
- @Override
- public ILangExpressionToPlanTranslatorFactory getExpressionToPlanTranslatorFactory() {
- return new SqlppExpressionToPlanTranslatorFactory();
- }
-
- @Override
public IRuleSetFactory getRuleSetFactory() {
return new BADRuleSetFactory();
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index 2f23a9c..3ea2c0d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -334,7 +334,7 @@
}
}
}
- final IRequestParameters requestParameters = new RequestParameters(null, null, null, null, null, null);
+ final IRequestParameters requestParameters = new RequestParameters(null, null, null, null, null, null, null);
for (Channel channel : channels) {
if (!channel.getChannelId().getDataverse().equals(dvId.getValue())) {
continue;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index ca1241c..7583f0b 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -207,13 +207,13 @@
InsertStatement insert = new InsertStatement(new Identifier(dataverse),
new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar, accessor);
((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc, hdc,
- resultDelivery, null, stats, false, null);
+ resultDelivery, null, stats, false, null, null, null);
} else {
//To update an existing subscription
UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse),
new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, null, null);
((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, upsert, hcc, hdc,
- resultDelivery, null, stats, false, null);
+ resultDelivery, null, stats, false, null, null, null);
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 1b18f83..b23bf3b 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -139,7 +139,8 @@
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse());
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
- ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false);
+ ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false, null,
+ null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
QueryTranslator.abort(e, e, mdTxnCtx);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 204e8aa..a28666a 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -253,7 +253,7 @@
(Query) fStatements.get(1));
}
return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
- hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null);
+ hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null, null);
}
@Override
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index be5bedb..aaf2bfa 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -176,7 +176,7 @@
private Pair<JobSpecification, PrecompiledType> createProcedureJob(IStatementExecutor statementExecutor,
MetadataProvider metadataProvider, IHyracksClientConnection hcc, Stats stats)
- throws Exception {
+ throws Exception {
if (getProcedureBodyStatement().getKind() == Statement.Kind.INSERT) {
if (!varList.isEmpty()) {
throw new CompilationException("Insert procedures cannot have parameters");
@@ -185,9 +185,8 @@
dependencies.get(0).add(Arrays.asList(
((QueryTranslator) statementExecutor).getActiveDataverse(insertStatement.getDataverseName()),
insertStatement.getDatasetName().getValue()));
- return new Pair<>(
- ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
- getProcedureBodyStatement(), hcc, null, ResultDelivery.ASYNC, null, stats, true, null),
+ return new Pair<>(((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
+ getProcedureBodyStatement(), hcc, null, ResultDelivery.ASYNC, null, stats, true, null, null, null),
PrecompiledType.INSERT);
} else if (getProcedureBodyStatement().getKind() == Statement.Kind.QUERY) {
SqlppRewriterFactory fact = new SqlppRewriterFactory();
@@ -209,7 +208,7 @@
metadataProvider);
Pair<JobSpecification, PrecompiledType> pair =
new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider,
- getProcedureBodyStatement(), hcc, true), PrecompiledType.DELETE);
+ getProcedureBodyStatement(), hcc, true, null, null), PrecompiledType.DELETE);
return pair;
} else {
throw new CompilationException("Procedure can only execute a single delete, insert, or query");
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
index d34d170..89d940e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
@@ -122,7 +122,7 @@
listener.suspend();
activeEventHandler.registerListener(listener);
BADJobService.redeployJobSpec(entityId, channel.getChannelBody(), metadataProvider, badStatementExecutor,
- hcc, new RequestParameters(null, null, null, null, null, null), true);
+ hcc, new RequestParameters(null, null, null, null, null, null, null), true);
ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(listener.getDeployedJobSpecId(),
hcc,
@@ -149,7 +149,7 @@
new HyracksDataset(hcc, appCtx.getCompilerProperties().getFrameSize(),
ResultReader.NUM_READERS),
new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
- new IStatementExecutor.Stats(), null, null, null),
+ new IStatementExecutor.Stats(), null, null, null, null),
true);
metadataProvider.getLocks().unlock();
//Log that the procedure stopped by cluster restart. Procedure is available again now.