Allow BAD jobs to update their specifications to use new indexes
- storage format changes: new field for Channel body
This changes uses the Asterix upsertDeployedJobSpec to
recompile and update the channel job when new indexes are
created.
Added test case
Moved methods from Asterix DeployedJobService to BADJobService
Change-Id: If0a4d37a5b91063fcb1673dbfd008c140ed54ae6
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
index d2d0fa3..d422663 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -50,6 +50,7 @@
String FIELD_NAME_RETURN_TYPE = "ReturnType";
String FIELD_NAME_DEFINITION = "Definition";
String FIELD_NAME_LANGUAGE = "Language";
+ String FIELD_NAME_BODY = "Body";
//To enable new Asterix TxnId for separate deployed job spec invocations
byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
int EXECUTOR_TIMEOUT = 20;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
new file mode 100644
index 0000000..e326ce6
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad;
+
+import java.io.StringReader;
+import java.time.Instant;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.lang.BADParserFactory;
+import org.apache.asterix.bad.lang.BADStatementExecutor;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.Query;
+import org.apache.asterix.lang.common.statement.SetStatement;
+import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IRequestParameters;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+/**
+ * Provides functionality for channel jobs
+ */
+public class BADJobService {
+
+ private static final Logger LOGGER = Logger.getLogger(BADJobService.class.getName());
+
+ //pool size one (only running one thread at a time)
+ private static final int POOL_SIZE = 1;
+
+ private static final long millisecondTimeout = BADConstants.EXECUTOR_TIMEOUT * 1000;
+
+ //Starts running a deployed job specification periodically with an interval of "period" seconds
+ public static ScheduledExecutorService startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId,
+ IHyracksClientConnection hcc, long period, Map<byte[], byte[]> jobParameters, EntityId entityId,
+ ITxnIdFactory txnIdFactory, DeployedJobSpecEventListener listener) {
+ ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(POOL_SIZE);
+ scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (!runDeployedJobSpecCheckPeriod(distributedId, hcc, jobParameters, period, entityId,
+ txnIdFactory, listener)) {
+ scheduledExecutorService.shutdown();
+ }
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Job Failed to run for " + entityId.getExtensionName() + " "
+ + entityId.getDataverse() + "." + entityId.getEntityName() + ".", e);
+ }
+ }
+ }, period, period, TimeUnit.MILLISECONDS);
+ return scheduledExecutorService;
+ }
+
+ public static boolean runDeployedJobSpecCheckPeriod(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
+ Map<byte[], byte[]> jobParameters, long period, EntityId entityId, ITxnIdFactory txnIdFactory,
+ DeployedJobSpecEventListener listener) throws Exception {
+ long executionMilliseconds =
+ runDeployedJobSpec(distributedId, hcc, jobParameters, entityId, txnIdFactory, null, listener, null);
+ if (executionMilliseconds > period) {
+ LOGGER.log(Level.SEVERE,
+ "Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
+ + entityId.getEntityName() + " was unable to meet the required period of " + period
+ + " milliseconds. Actually took " + executionMilliseconds + " execution will shutdown"
+ + new Date());
+ return false;
+ }
+ return true;
+ }
+
+ public static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
+ Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory,
+ ICcApplicationContext appCtx, DeployedJobSpecEventListener listener, QueryTranslator statementExecutor)
+ throws Exception {
+ listener.waitWhileAtState(ActivityState.SUSPENDED);
+
+ //Add the Asterix Transaction Id to the map
+ jobParameters.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME,
+ String.valueOf(txnIdFactory.create().getId()).getBytes());
+
+ long startTime = Instant.now().toEpochMilli();
+ JobId jobId = hcc.startJob(distributedId, jobParameters);
+
+ hcc.waitForCompletion(jobId);
+ long executionMilliseconds = Instant.now().toEpochMilli() - startTime;
+
+ if (listener.getType() == DeployedJobSpecEventListener.PrecompiledType.QUERY) {
+ ResultReader resultReader = new ResultReader(listener.getResultDataset(), jobId, listener.getResultId());
+
+ ResultUtil.printResults(appCtx, resultReader, statementExecutor.getSessionOutput(),
+ new IStatementExecutor.Stats(), null);
+ }
+
+ LOGGER.log(Level.SEVERE,
+ "Deployed Job execution completed for " + entityId.getExtensionName() + " " + entityId.getDataverse()
+ + "." + entityId.getEntityName() + ". Took " + executionMilliseconds + " milliseconds ");
+
+ return executionMilliseconds;
+
+ }
+
+
+ public static long findPeriod(String duration) {
+ //TODO: Allow Repetitive Channels to use YMD durations
+ String hoursMinutesSeconds = "";
+ if (duration.indexOf('T') != -1) {
+ hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 1);
+ }
+ double seconds = 0;
+ if (hoursMinutesSeconds != "") {
+ int pos = 0;
+ if (hoursMinutesSeconds.indexOf('H') != -1) {
+ Double hours = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('H')));
+ seconds += (hours * 60 * 60);
+ pos = hoursMinutesSeconds.indexOf('H') + 1;
+ }
+ if (hoursMinutesSeconds.indexOf('M') != -1) {
+ Double minutes =
+ Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('M')));
+ seconds += (minutes * 60);
+ pos = hoursMinutesSeconds.indexOf('M') + 1;
+ }
+ if (hoursMinutesSeconds.indexOf('S') != -1) {
+ Double s = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('S')));
+ seconds += (s);
+ }
+ }
+ return (long) (seconds * 1000);
+ }
+
+ public static JobSpecification compilePushChannel(IStatementExecutor statementExecutor,
+ MetadataProvider metadataProvider, IHyracksClientConnection hcc, Query q) throws Exception {
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ JobSpecification jobSpec = null;
+ try {
+ jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+ } catch (Exception e) {
+ LOGGER.log(Level.INFO, e.getMessage(), e);
+ if (bActiveTxn) {
+ ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
+ }
+ throw e;
+ } finally {
+ metadataProvider.getLocks().unlock();
+ }
+ return jobSpec;
+ }
+
+ public static void redeployJobSpec(EntityId entityId, String queryBodyString, MetadataProvider metadataProvider,
+ BADStatementExecutor badStatementExecutor, IHyracksClientConnection hcc,
+ IRequestParameters requestParameters) throws Exception {
+
+ ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
+ DeployedJobSpecEventListener listener =
+ (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
+ if (listener == null) {
+ LOGGER.severe("Tried to redeploy the job for " + entityId + " but no listener exists.");
+ return;
+ }
+
+ BADParserFactory factory = new BADParserFactory();
+ List<Statement> fStatements = factory.createParser(new StringReader(queryBodyString)).parse();
+ JobSpecification jobSpec = null;
+ if (listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.PUSH_CHANNEL)
+ || listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.CHANNEL)) {
+ //Channels
+ SetStatement ss = (SetStatement) fStatements.get(0);
+ metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
+ if (listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.PUSH_CHANNEL)) {
+ jobSpec = compilePushChannel(badStatementExecutor, metadataProvider, hcc, (Query) fStatements.get(1));
+ } else {
+ jobSpec = badStatementExecutor.handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc,
+ null, null, null, null, true, null);
+ }
+ } else {
+ //Procedures
+ metadataProvider.setResultSetId(listener.getResultId());
+ final IStatementExecutor.ResultDelivery resultDelivery =
+ requestParameters.getResultProperties().getDelivery();
+ final IHyracksDataset hdc = requestParameters.getHyracksDataset();
+ final IStatementExecutor.Stats stats = requestParameters.getStats();
+ boolean resultsAsync = resultDelivery == IStatementExecutor.ResultDelivery.ASYNC
+ || resultDelivery == IStatementExecutor.ResultDelivery.DEFERRED;
+ metadataProvider.setResultAsyncMode(resultsAsync);
+ metadataProvider.setMaxResultReads(1);
+
+ jobSpec = compileProcedureJob(badStatementExecutor, metadataProvider, hcc, hdc, stats, fStatements.get(1));
+
+ }
+ hcc.upsertDeployedJobSpec(listener.getDeployedJobSpecId(), jobSpec);
+
+ listener.resume();
+
+ }
+
+ public static JobSpecification compileQueryJob(IStatementExecutor statementExecutor,
+ MetadataProvider metadataProvider, IHyracksClientConnection hcc, Query q) throws Exception {
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ JobSpecification jobSpec = null;
+ try {
+ jobSpec = statementExecutor.rewriteCompileQuery(hcc, metadataProvider, q, null);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+ } catch (Exception e) {
+ ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
+ throw e;
+ }
+ return jobSpec;
+ }
+
+ private static JobSpecification compileProcedureJob(IStatementExecutor statementExecutor,
+ MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc,
+ IStatementExecutor.Stats stats, Statement procedureStatement) throws Exception {
+ if (procedureStatement.getKind() == Statement.Kind.INSERT) {
+ return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
+ procedureStatement, hcc, hdc, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null);
+ } else if (procedureStatement.getKind() == Statement.Kind.QUERY) {
+ return compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) procedureStatement);
+ } else {
+ SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
+ procedureStatement.accept(visitor, null);
+ return ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, procedureStatement,
+ hcc, true);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "BADJobService";
+ }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
deleted file mode 100644
index 3df9a76..0000000
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad;
-
-import java.util.logging.Logger;
-
-
-/**
- * Provides functionality for channel jobs
- */
-public class ChannelJobService {
-
- private static final Logger LOGGER = Logger.getLogger(ChannelJobService.class.getName());
-
-
- public static long findPeriod(String duration) {
- //TODO: Allow Repetitive Channels to use YMD durations
- String hoursMinutesSeconds = "";
- if (duration.indexOf('T') != -1) {
- hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 1);
- }
- double seconds = 0;
- if (hoursMinutesSeconds != "") {
- int pos = 0;
- if (hoursMinutesSeconds.indexOf('H') != -1) {
- Double hours = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('H')));
- seconds += (hours * 60 * 60);
- pos = hoursMinutesSeconds.indexOf('H') + 1;
- }
- if (hoursMinutesSeconds.indexOf('M') != -1) {
- Double minutes =
- Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('M')));
- seconds += (minutes * 60);
- pos = hoursMinutesSeconds.indexOf('M') + 1;
- }
- if (hoursMinutesSeconds.indexOf('S') != -1) {
- Double s = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('S')));
- seconds += (s);
- }
- }
- return (long) (seconds * 1000);
- }
-
-
- @Override
- public String toString() {
- return "ChannelJobService";
- }
-
-}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index 8c7143f..4ab7530 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -18,22 +18,27 @@
*/
package org.apache.asterix.bad.lang;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.app.translator.RequestParameters;
+import org.apache.asterix.bad.BADJobService;
import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
import org.apache.asterix.bad.lang.statement.ProcedureDropStatement;
import org.apache.asterix.bad.metadata.Broker;
import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
import org.apache.asterix.bad.metadata.Procedure;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.CreateIndexStatement;
import org.apache.asterix.lang.common.statement.DataverseDropStatement;
import org.apache.asterix.lang.common.statement.DropDatasetStatement;
import org.apache.asterix.lang.common.statement.FunctionDropStatement;
@@ -42,9 +47,12 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.client.IHyracksClientConnection;
public class BADStatementExecutor extends QueryTranslator {
@@ -56,16 +64,21 @@
//TODO: Most of this file could go away if we had metadata dependencies
- private void checkIfDatasetIsInUse(MetadataTransactionContext mdTxnCtx, String dataverse, String dataset)
- throws CompilationException, AlgebricksException {
+ private Pair<List<Channel>, List<Procedure>> checkIfDatasetIsInUse(MetadataTransactionContext mdTxnCtx,
+ String dataverse, String dataset, boolean checkAll) throws AlgebricksException {
+ List<Channel> channelsUsingDataset = new ArrayList<>();
+ List<Procedure> proceduresUsingDataset = new ArrayList<>();
List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
for (Channel channel : channels) {
List<List<List<String>>> dependencies = channel.getDependencies();
List<List<String>> datasetDependencies = dependencies.get(0);
for (List<String> dependency : datasetDependencies) {
if (dependency.get(0).equals(dataverse) && dependency.get(1).equals(dataset)) {
- throw new CompilationException("Cannot alter dataset " + dataverse + "." + dataset + ". "
- + channel.getChannelId() + " depends on it!");
+ channelsUsingDataset.add(channel);
+ if (!checkAll) {
+ return new Pair<>(channelsUsingDataset, proceduresUsingDataset);
+ }
+
}
}
@@ -76,12 +89,82 @@
List<List<String>> datasetDependencies = dependencies.get(0);
for (List<String> dependency : datasetDependencies) {
if (dependency.get(0).equals(dataverse) && dependency.get(1).equals(dataset)) {
- throw new CompilationException("Cannot alter dataset " + dataverse + "." + dataset + ". "
- + procedure.getEntityId() + " depends on it!");
+ proceduresUsingDataset.add(procedure);
+ if (!checkAll) {
+ return new Pair<>(channelsUsingDataset, proceduresUsingDataset);
+ }
}
}
}
+ return new Pair<>(channelsUsingDataset, proceduresUsingDataset);
+ }
+
+ private Pair<List<Channel>, List<Procedure>> checkIfFunctionIsInUse(MetadataTransactionContext mdTxnCtx,
+ String dvId, String function, String arity, boolean checkAll)
+ throws CompilationException, AlgebricksException {
+ List<Channel> channelsUsingFunction = new ArrayList<>();
+ List<Procedure> proceduresUsingFunction = new ArrayList<>();
+
+ List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
+ for (Channel channel : channels) {
+ List<List<List<String>>> dependencies = channel.getDependencies();
+ List<List<String>> datasetDependencies = dependencies.get(1);
+ for (List<String> dependency : datasetDependencies) {
+ if (dependency.get(0).equals(dvId) && dependency.get(1).equals(function)
+ && dependency.get(2).equals(arity)) {
+ channelsUsingFunction.add(channel);
+ if (!checkAll) {
+ return new Pair<>(channelsUsingFunction, proceduresUsingFunction);
+ }
+ }
+ }
+
+ }
+ List<Procedure> procedures = BADLangExtension.getAllProcedures(mdTxnCtx);
+ for (Procedure procedure : procedures) {
+ List<List<List<String>>> dependencies = procedure.getDependencies();
+ List<List<String>> datasetDependencies = dependencies.get(1);
+ for (List<String> dependency : datasetDependencies) {
+ if (dependency.get(0).equals(dvId) && dependency.get(1).equals(function)
+ && dependency.get(2).equals(arity)) {
+ proceduresUsingFunction.add(procedure);
+ if (!checkAll) {
+ return new Pair<>(channelsUsingFunction, proceduresUsingFunction);
+ }
+ }
+ }
+
+ }
+ return new Pair<>(channelsUsingFunction, proceduresUsingFunction);
+ }
+
+ private void throwErrorIfDatasetUsed(MetadataTransactionContext mdTxnCtx, String dataverse, String dataset)
+ throws CompilationException, AlgebricksException {
+ Pair<List<Channel>, List<Procedure>> dependents = checkIfDatasetIsInUse(mdTxnCtx, dataverse, dataset, false);
+ if (dependents.first.size() > 0) {
+ throw new CompilationException("Cannot alter dataset " + dataverse + "." + dataset + ". "
+ + dependents.first.get(0).getChannelId() + " depends on it!");
+ }
+ if (dependents.second.size() > 0) {
+ throw new CompilationException("Cannot alter dataset " + dataverse + "." + dataset + ". "
+ + dependents.second.get(0).getEntityId() + " depends on it!");
+ }
+ }
+
+ private void throwErrorIfFunctionUsed(MetadataTransactionContext mdTxnCtx, String dataverse, String function,
+ String arity, FunctionSignature sig) throws CompilationException, AlgebricksException {
+ Pair<List<Channel>, List<Procedure>> dependents =
+ checkIfFunctionIsInUse(mdTxnCtx, dataverse, function, arity, false);
+ String errorStart = sig != null ? "Cannot drop function " + sig + "." : "Cannot drop index.";
+ if (dependents.first.size() > 0) {
+ throw new CompilationException(
+ errorStart + " " + dependents.first.get(0).getChannelId() + " depends on it!");
+ }
+ if (dependents.second.size() > 0) {
+ throw new CompilationException(
+ errorStart + " " + dependents.second.get(0).getEntityId() + " depends on it!");
+ }
}
@Override
@@ -92,13 +175,88 @@
String dvId = getActiveDataverse(((DropDatasetStatement) stmt).getDataverseName());
Identifier dsId = ((DropDatasetStatement) stmt).getDatasetName();
- checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId.getValue());
+ throwErrorIfDatasetUsed(mdTxnCtx, dvId, dsId.getValue());
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
super.handleDatasetDropStatement(metadataProvider, stmt, hcc, requestParameters);
}
@Override
+ public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
+
+ //TODO: Check whether a delete or insert procedure using the index. If so, we will need to
+ // disallow the procedure until after the newly distributed version is ready
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ //Allow channels to use the new index
+ String dvId = getActiveDataverse(((CreateIndexStatement) stmt).getDataverseName());
+ String dsId = ((CreateIndexStatement) stmt).getDatasetName().getValue();
+
+ Pair<List<Channel>, List<Procedure>> usages = checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId, true);
+
+ List<Dataverse> dataverseList = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
+ for (Dataverse dv : dataverseList) {
+ List<Function> functions = MetadataManager.INSTANCE.getFunctions(mdTxnCtx, dv.getDataverseName());
+ for (Function function : functions) {
+ for (List<String> datasetDependency : function.getDependencies().get(0)) {
+ if (datasetDependency.get(0).equals(dvId) && datasetDependency.get(1).equals(dsId)) {
+ Pair<List<Channel>, List<Procedure>> functionUsages =
+ checkIfFunctionIsInUse(mdTxnCtx, function.getDataverseName(), function.getName(),
+ Integer.toString(function.getArity()), true);
+ for (Channel channel : functionUsages.first) {
+ if (!usages.first.contains(channel)) {
+ usages.first.add(channel);
+ }
+ }
+ for (Procedure procedure : functionUsages.second) {
+ if (!usages.second.contains(procedure)) {
+ usages.second.add(procedure);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
+
+ for (Channel channel : usages.first) {
+ DeployedJobSpecEventListener listener =
+ (DeployedJobSpecEventListener) activeEventHandler.getListener(channel.getChannelId());
+ listener.suspend();
+ }
+ for (Procedure procedure : usages.second) {
+ DeployedJobSpecEventListener listener =
+ (DeployedJobSpecEventListener) activeEventHandler.getListener(procedure.getEntityId());
+ listener.suspend();
+ }
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ metadataProvider.getLocks().unlock();
+
+ metadataProvider = new MetadataProvider(appCtx, activeDataverse);
+ super.handleCreateIndexStatement(metadataProvider, stmt, hcc, requestParameters);
+
+ for (Channel channel : usages.first) {
+ metadataProvider = new MetadataProvider(appCtx, activeDataverse);
+ BADJobService.redeployJobSpec(channel.getChannelId(), channel.getChannelBody(), metadataProvider, this, hcc,
+ requestParameters);
+ metadataProvider.getLocks().unlock();
+ }
+ for (Procedure procedure : usages.second) {
+ metadataProvider = new MetadataProvider(appCtx, activeDataverse);
+ BADJobService.redeployJobSpec(procedure.getEntityId(), procedure.getBody(), metadataProvider, this, hcc,
+ requestParameters);
+ metadataProvider.getLocks().unlock();
+ }
+
+
+ }
+
+ @Override
protected void handleIndexDropStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -106,7 +264,20 @@
String dvId = getActiveDataverse(((IndexDropStatement) stmt).getDataverseName());
Identifier dsId = ((IndexDropStatement) stmt).getDatasetName();
- checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId.getValue());
+ throwErrorIfDatasetUsed(mdTxnCtx, dvId, dsId.getValue());
+
+ List<Dataverse> dataverseList = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
+ for (Dataverse dv : dataverseList) {
+ List<Function> functions = MetadataManager.INSTANCE.getFunctions(mdTxnCtx, dv.getDataverseName());
+ for (Function function : functions) {
+ for (List<String> datasetDependency : function.getDependencies().get(0)) {
+ if (datasetDependency.get(0).equals(dvId) && datasetDependency.get(1).equals(dsId.getValue())) {
+ throwErrorIfFunctionUsed(mdTxnCtx, function.getDataverseName(), function.getName(),
+ Integer.toString(function.getArity()), null);
+ }
+ }
+ }
+ }
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
super.handleIndexDropStatement(metadataProvider, stmt, hcc, requestParameters);
@@ -122,32 +293,7 @@
String function = sig.getName();
String arity = Integer.toString(sig.getArity());
- List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
- for (Channel channel : channels) {
- List<List<List<String>>> dependencies = channel.getDependencies();
- List<List<String>> datasetDependencies = dependencies.get(1);
- for (List<String> dependency : datasetDependencies) {
- if (dependency.get(0).equals(dvId) && dependency.get(1).equals(function)
- && dependency.get(2).equals(arity)) {
- throw new CompilationException(
- "Cannot drop function " + sig + ". " + channel.getChannelId() + " depends on it!");
- }
- }
-
- }
- List<Procedure> procedures = BADLangExtension.getAllProcedures(mdTxnCtx);
- for (Procedure procedure : procedures) {
- List<List<List<String>>> dependencies = procedure.getDependencies();
- List<List<String>> datasetDependencies = dependencies.get(1);
- for (List<String> dependency : datasetDependencies) {
- if (dependency.get(0).equals(dvId) && dependency.get(1).equals(function)
- && dependency.get(2).equals(arity)) {
- throw new CompilationException(
- "Cannot drop function " + sig + ". " + procedure.getEntityId() + " depends on it!");
- }
- }
-
- }
+ throwErrorIfFunctionUsed(mdTxnCtx, dvId, function, arity, sig);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
super.handleFunctionDropStatement(metadataProvider, stmt);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 87ac320..22767f2 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -28,13 +28,12 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.active.DeployedJobService;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.ExtensionStatement;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.bad.BADJobService;
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.lang.BADParserFactory;
import org.apache.asterix.bad.metadata.Channel;
@@ -57,7 +56,6 @@
import org.apache.asterix.lang.common.statement.CreateIndexStatement;
import org.apache.asterix.lang.common.statement.DatasetDecl;
import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
-import org.apache.asterix.lang.common.statement.InsertStatement;
import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
import org.apache.asterix.lang.common.statement.Query;
import org.apache.asterix.lang.common.statement.SetStatement;
@@ -89,7 +87,7 @@
private final CallExpr period;
private Identifier dataverseName;
private String duration;
- private InsertStatement channelResultsInsertQuery;
+ private String body;
private String subscriptionsTableName;
private String resultsTableName;
private String dataverse;
@@ -133,10 +131,6 @@
return period;
}
- public InsertStatement getChannelResultsInsertQuery() {
- return channelResultsInsertQuery;
- }
-
@Override
public byte getCategory() {
return Category.DDL;
@@ -221,28 +215,6 @@
}
- private JobSpecification compilePushChannel(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
- IHyracksClientConnection hcc, Query q) throws Exception {
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- boolean bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- JobSpecification jobSpec = null;
- try {
- jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- bActiveTxn = false;
- } catch (Exception e) {
- LOGGER.log(Level.INFO, e.getMessage(), e);
- if (bActiveTxn) {
- ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
- }
- throw e;
- } finally {
- metadataProvider.getLocks().unlock();
- }
- return jobSpec;
- }
-
private JobSpecification createChannelJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats) throws Exception {
StringBuilder builder = new StringBuilder();
@@ -271,13 +243,15 @@
builder.append(" returning a");
}
builder.append(";");
+ body = builder.toString();
BADParserFactory factory = new BADParserFactory();
List<Statement> fStatements = factory.createParser(new StringReader(builder.toString())).parse();
SetStatement ss = (SetStatement) fStatements.get(0);
metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
if (push) {
- return compilePushChannel(statementExecutor, metadataProvider, hcc, (Query) fStatements.get(1));
+ return BADJobService.compilePushChannel(statementExecutor, metadataProvider, hcc,
+ (Query) fStatements.get(1));
}
return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null);
@@ -286,9 +260,10 @@
private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc,
DeployedJobSpecEventListener listener, ITxnIdFactory txnIdFactory) throws Exception {
if (channeljobSpec != null) {
+ channeljobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
DeployedJobSpecId destributedId = hcc.deployJobSpec(channeljobSpec);
- ScheduledExecutorService ses = DeployedJobService.startRepetitiveDeployedJobSpec(destributedId, hcc,
- ChannelJobService.findPeriod(duration), new HashMap<>(), entityId, txnIdFactory);
+ ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(destributedId, hcc,
+ BADJobService.findPeriod(duration), new HashMap<>(), entityId, txnIdFactory, listener);
listener.storeDistributedInfo(destributedId, ses, null, null);
}
@@ -354,14 +329,15 @@
// Now we subscribe
if (listener == null) {
- listener = new DeployedJobSpecEventListener(appCtx, entityId, PrecompiledType.CHANNEL, null,
+ listener = new DeployedJobSpecEventListener(appCtx, entityId,
+ push ? PrecompiledType.PUSH_CHANNEL : PrecompiledType.CHANNEL, null,
"BadListener");
activeEventHandler.registerListener(listener);
}
setupExecutorJob(entityId, channeljobSpec, hcc, listener, metadataProvider.getTxnIdFactory());
channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function,
- duration, null);
+ duration, null, body);
MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index f3561a4..03db7bc 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -20,18 +20,23 @@
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
+import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.ExtensionStatement;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.BADJobService;
import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.lang.BADParserFactory;
import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
import org.apache.asterix.bad.metadata.Procedure;
@@ -42,7 +47,6 @@
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.clause.LetClause;
import org.apache.asterix.lang.common.expression.CallExpr;
import org.apache.asterix.lang.common.expression.LiteralExpr;
import org.apache.asterix.lang.common.expression.VariableExpr;
@@ -54,7 +58,6 @@
import org.apache.asterix.lang.common.struct.VarIdentifier;
import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory;
import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor;
import org.apache.asterix.metadata.MetadataManager;
@@ -62,14 +65,12 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.om.base.temporal.ADurationParserFactory;
-import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.dataset.ResultSetId;
@@ -84,7 +85,7 @@
private final FunctionSignature signature;
private final String procedureBody;
- private final Statement procedureBodyStatement;
+ private Statement procedureBodyStatement;
private final List<String> paramList;
private final List<VariableExpr> varList;
private final CallExpr period;
@@ -92,22 +93,32 @@
private List<List<List<String>>> dependencies;
public CreateProcedureStatement(FunctionSignature signature, List<VarIdentifier> parameterList,
- List<Integer> paramIds, String functionBody, Statement procedureBodyStatement, Expression period) {
+ List<Integer> paramIds, String functionBody, Expression period) {
this.signature = signature;
- this.procedureBody = functionBody;
- this.procedureBodyStatement = procedureBodyStatement;
this.paramList = new ArrayList<>();
this.varList = new ArrayList<>();
for (int i = 0; i < parameterList.size(); i++) {
- this.paramList.add(parameterList.get(i).getValue());
- this.varList.add(new VariableExpr(new VarIdentifier(parameterList.get(i).toString(), paramIds.get(i))));
+ this.paramList.add(parameterList.get(i).getValue().substring(1));
+ this.varList.add(
+ new VariableExpr(new VarIdentifier(parameterList.get(i).getValue().substring(1), paramIds.get(i))));
}
+ procedureBody = rewriteJobParams(functionBody);
this.period = (CallExpr) period;
this.dependencies = new ArrayList<>();
this.dependencies.add(new ArrayList<>());
this.dependencies.add(new ArrayList<>());
}
+ private String rewriteJobParams(String body) {
+ String newBody = body;
+ for (VariableExpr var : varList) {
+ Pattern variableReference = Pattern.compile("([^\\w\\d])" + var.getVar() + "([^\\w\\d]|$)");
+ Matcher matcher = variableReference.matcher(newBody);
+ newBody = matcher.replaceAll("$1get_job_param(\"" + var.getVar() + "\")$2");
+ }
+ return "use " + signature.getNamespace() + ";\n" + newBody + ";";
+ }
+
public String getProcedureBody() {
return procedureBody;
}
@@ -142,7 +153,14 @@
return null;
}
- private void initialize() throws MetadataException, HyracksDataException {
+ private void initialize() throws CompilationException, HyracksDataException {
+ BADParserFactory factory = new BADParserFactory();
+ List<Statement> fStatements = factory.createParser(new StringReader(procedureBody)).parse();
+ if (fStatements.size() != 2) {
+ //TODO: Add a test for this error
+ throw new CompilationException("Procedure can only execute a single statement");
+ }
+ procedureBodyStatement = fStatements.get(1);
if (period == null) {
return;
}
@@ -157,40 +175,6 @@
durationParser.parse(duration.toCharArray(), 0, duration.toCharArray().length, outputStream);
}
- private JobSpecification compileQueryJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
- IHyracksClientConnection hcc, Query q) throws Exception {
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- boolean bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- JobSpecification jobSpec = null;
- try {
- jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- bActiveTxn = false;
- } catch (Exception e) {
- LOGGER.log(Level.INFO, e.getMessage(), e);
- if (bActiveTxn) {
- ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
- }
- throw e;
- }
- return jobSpec;
- }
-
- private void addLets(SelectExpression s) {
- FunctionIdentifier function = BuiltinFunctions.GET_JOB_PARAMETER;
- FunctionSignature sig =
- new FunctionSignature(function.getNamespace(), function.getName(), function.getArity());
- for (VariableExpr var : varList) {
- List<Expression> strListForCall = new ArrayList<>();
- LiteralExpr l = new LiteralExpr(new StringLiteral(var.getVar().getValue()));
- strListForCall.add(l);
- Expression con = new CallExpr(sig, strListForCall);
- LetClause let = new LetClause(var, con);
- s.getLetList().add(let);
- }
- }
-
private Pair<JobSpecification, PrecompiledType> createProcedureJob(IStatementExecutor statementExecutor,
MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats)
throws Exception {
@@ -207,28 +191,23 @@
getProcedureBodyStatement(), hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null),
PrecompiledType.INSERT);
} else if (getProcedureBodyStatement().getKind() == Statement.Kind.QUERY) {
- Query s = (Query) getProcedureBodyStatement();
- addLets((SelectExpression) s.getBody());
SqlppRewriterFactory fact = new SqlppRewriterFactory();
dependencies.get(1).addAll(FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(),
((Query) getProcedureBodyStatement()).getBody(), metadataProvider).get(1));
- Pair<JobSpecification, PrecompiledType> pair = new Pair<>(
- compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) getProcedureBodyStatement()),
+ Pair<JobSpecification, PrecompiledType> pair = new Pair<>(BADJobService.compileQueryJob(statementExecutor,
+ metadataProvider, hcc, (Query) getProcedureBodyStatement()),
PrecompiledType.QUERY);
dependencies.get(0).addAll(FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(),
((Query) getProcedureBodyStatement()).getBody(), metadataProvider).get(0));
- metadataProvider.getLocks().unlock();
return pair;
} else if (getProcedureBodyStatement().getKind() == Statement.Kind.DELETE) {
SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
getProcedureBodyStatement().accept(visitor, null);
DeleteStatement delete = (DeleteStatement) getProcedureBodyStatement();
- addLets((SelectExpression) delete.getQuery().getBody());
SqlppRewriterFactory fact = new SqlppRewriterFactory();
dependencies = FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(), delete.getQuery().getBody(),
metadataProvider);
-
Pair<JobSpecification, PrecompiledType> pair =
new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider,
getProcedureBodyStatement(), hcc, true), PrecompiledType.DELETE);
@@ -276,24 +255,16 @@
if (alreadyActive) {
throw new AsterixException("Procedure " + signature.getName() + " is already running");
}
- MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
- metadataProvider.getDefaultDataverse());
- tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
metadataProvider.setResultSetId(new ResultSetId(resultSetId++));
final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery();
final IHyracksDataset hdc = requestParameters.getHyracksDataset();
final Stats stats = requestParameters.getStats();
boolean resultsAsync = resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED;
metadataProvider.setResultAsyncMode(resultsAsync);
- tempMdProvider.setResultSetId(metadataProvider.getResultSetId());
- tempMdProvider.setResultAsyncMode(resultsAsync);
- tempMdProvider.setWriterFactory(metadataProvider.getWriterFactory());
- tempMdProvider.setResultSerializerFactoryProvider(metadataProvider.getResultSerializerFactoryProvider());
- tempMdProvider.setOutputFile(metadataProvider.getOutputFile());
- tempMdProvider.setMaxResultReads(requestParameters.getResultProperties().getMaxReads());
+ metadataProvider.setMaxResultReads(1);
//Create Procedure Internal Job
Pair<JobSpecification, PrecompiledType> procedureJobSpec =
- createProcedureJob(statementExecutor, tempMdProvider, hcc, hdc, stats);
+ createProcedureJob(statementExecutor, metadataProvider, hcc, hdc, stats);
// Now we subscribe
if (listener == null) {
@@ -301,7 +272,8 @@
"BadListener");
activeEventHandler.registerListener(listener);
}
- setupDeployedJobSpec(entityId, procedureJobSpec.first, hcc, listener, tempMdProvider.getResultSetId(), hdc,
+ setupDeployedJobSpec(entityId, procedureJobSpec.first, hcc, listener, metadataProvider.getResultSetId(),
+ hdc,
stats);
procedure = new Procedure(dataverse, signature.getName(), signature.getArity(), getParamList(),
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 635f2ce..025b9e6 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -24,18 +24,14 @@
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
-import org.apache.asterix.active.DeployedJobService;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.ExtensionStatement;
-import org.apache.asterix.api.http.server.ResultUtil;
import org.apache.asterix.app.active.ActiveNotificationHandler;
-import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.bad.BADJobService;
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
-import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
import org.apache.asterix.bad.metadata.Procedure;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -54,12 +50,10 @@
import org.apache.asterix.translator.ConstantHelper;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.DeployedJobSpecId;
-import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
public class ExecuteProcedureStatement extends ExtensionStatement {
@@ -111,10 +105,9 @@
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, procedureName);
DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
- Procedure procedure = null;
+ Procedure procedure;
MetadataTransactionContext mdTxnCtx = null;
- JobId jobId;
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
txnActive = true;
@@ -125,31 +118,14 @@
Map<byte[], byte[]> contextRuntimeVarMap = createParameterMap(procedure);
DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
if (procedure.getDuration().equals("")) {
+ BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc, contextRuntimeVarMap, entityId,
+ metadataProvider.getTxnIdFactory(), appCtx, listener, (QueryTranslator) statementExecutor);
- //Add the Asterix Transaction Id to the map
- long newTxId = metadataProvider.getTxnIdFactory().create().getId();
- contextRuntimeVarMap.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME,
- String.valueOf(newTxId).getBytes());
- jobId = hcc.startJob(deployedJobSpecId, contextRuntimeVarMap);
-
- boolean wait = Boolean.parseBoolean(metadataProvider.getConfig().get(
- ExecuteProcedureStatement.WAIT_FOR_COMPLETION));
- if (wait || listener.getType() == PrecompiledType.QUERY) {
- hcc.waitForCompletion(jobId);
- }
-
- if (listener.getType() == PrecompiledType.QUERY) {
- ResultReader resultReader =
- new ResultReader(listener.getResultDataset(), jobId, listener.getResultId());
-
- ResultUtil.printResults(appCtx, resultReader,
- ((QueryTranslator) statementExecutor).getSessionOutput(), new Stats(), null);
- }
} else {
- ScheduledExecutorService ses = DeployedJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc,
- ChannelJobService.findPeriod(procedure.getDuration()), contextRuntimeVarMap, entityId,
- metadataProvider.getTxnIdFactory());
+ ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc,
+ BADJobService.findPeriod(procedure.getDuration()), contextRuntimeVarMap, entityId,
+ metadataProvider.getTxnIdFactory(), listener);
listener.storeDistributedInfo(deployedJobSpecId, ses, listener.getResultDataset(),
listener.getResultId());
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
index 526e091..1e5e627 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
@@ -50,18 +50,20 @@
public static final int CHANNEL_ARECORD_FUNCTION_FIELD_INDEX = 4;
public static final int CHANNEL_ARECORD_DURATION_FIELD_INDEX = 5;
public static final int CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX = 6;
+ public static final int CHANNEL_ARECORD_BODY_FIELD_INDEX = 7;
public static final ARecordType CHANNEL_RECORDTYPE = MetadataRecordTypes.createRecordType(
// RecordTypeName
BADConstants.RECORD_TYPENAME_CHANNEL,
// FieldNames
new String[] { BADConstants.DataverseName, BADConstants.ChannelName, BADConstants.SubscriptionsDatasetName,
BADConstants.ResultsDatasetName, BADConstants.Function, BADConstants.Duration,
- BADConstants.FIELD_NAME_DEPENDENCIES },
+ BADConstants.FIELD_NAME_DEPENDENCIES, BADConstants.FIELD_NAME_BODY },
// FieldTypes
new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING,
new AOrderedListType(new AOrderedListType(new AOrderedListType(BuiltinType.ASTRING, null), null),
- null) },
+ null),
+ BuiltinType.ASTRING },
//IsOpen?
true);
//------------------------------------------ Broker ----------------------------------------//
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
index 5f7dad0..ed9346c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
@@ -37,6 +37,7 @@
private final String subscriptionsDatasetName;
private final String resultsDatasetName;
private final String duration;
+ private final String channelBody;
private final FunctionSignature function;
private final List<String> functionAsPath;
/*
@@ -49,12 +50,13 @@
private final List<List<List<String>>> dependencies;
public Channel(String dataverseName, String channelName, String subscriptionsDataset, String resultsDataset,
- FunctionSignature function, String duration, List<List<List<String>>> dependencies) {
+ FunctionSignature function, String duration, List<List<List<String>>> dependencies, String channelBody) {
this.channelId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
this.function = function;
this.duration = duration;
this.resultsDatasetName = resultsDataset;
this.subscriptionsDatasetName = subscriptionsDataset;
+ this.channelBody = channelBody;
if (this.function.getNamespace() == null) {
this.function.setNamespace(dataverseName);
}
@@ -94,6 +96,10 @@
return duration;
}
+ public String getChannelBody() {
+ return channelBody;
+ }
+
public List<String> getFunctionAsPath() {
return functionAsPath;
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
index 14db134..175280e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
@@ -123,11 +123,15 @@
}
+ String channelBody =
+ ((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX))
+ .getStringValue();
+
FunctionSignature signature = new FunctionSignature(functionSignature.get(0), functionSignature.get(1),
Integer.parseInt(functionSignature.get(2)));
channel = new Channel(dataverseName, channelName, subscriptionsName, resultsName, signature, duration,
- dependencies);
+ dependencies, channelBody);
return channel;
}
@@ -217,6 +221,12 @@
dependenciesListBuilder.write(fieldValue.getDataOutput(), true);
recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX, fieldValue);
+ // write field 7
+ fieldValue.reset();
+ aString.setValue(channel.getChannelBody());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX, fieldValue);
+
// write record
recordBuilder.write(tupleBuilder.getDataOutput(), true);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
index 070c148..78f7c95 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.bad.metadata;
+import java.util.concurrent.ScheduledExecutorService;
+
import org.apache.asterix.active.ActiveEvent;
import org.apache.asterix.active.ActiveEvent.Kind;
import org.apache.asterix.active.ActivityState;
@@ -25,39 +27,28 @@
import org.apache.asterix.active.IActiveEntityEventSubscriber;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.metadata.IDataset;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.DeployedJobSpecId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-
public class DeployedJobSpecEventListener implements IActiveEntityEventsListener {
private static final Logger LOGGER = Logger.getLogger(DeployedJobSpecEventListener.class);
-
public enum PrecompiledType {
CHANNEL,
+ PUSH_CHANNEL,
QUERY,
INSERT,
DELETE
}
- enum RequestState {
- INIT,
- STARTED,
- FINISHED
- }
-
private DeployedJobSpecId deployedJobSpecId;
private ScheduledExecutorService executorService = null;
private final PrecompiledType type;
@@ -67,14 +58,11 @@
// members
protected volatile ActivityState state;
- protected JobId jobId;
- protected final List<IActiveEntityEventSubscriber> subscribers = new ArrayList<>();
protected final ICcApplicationContext appCtx;
protected final EntityId entityId;
protected final ActiveEvent statsUpdatedEvent;
protected long statsTimestamp;
protected String stats;
- protected RequestState statsRequestState;
protected final String runtimeName;
protected final AlgebricksAbsolutePartitionConstraint locations;
private int runningInstance;
@@ -83,18 +71,15 @@
AlgebricksAbsolutePartitionConstraint locations, String runtimeName) {
this.appCtx = appCtx;
this.entityId = entityId;
- this.state = ActivityState.STOPPED;
+ setState(ActivityState.STOPPED);
this.statsTimestamp = -1;
- this.statsRequestState = RequestState.INIT;
this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, entityId, null);
this.stats = "{\"Stats\":\"N/A\"}";
this.runtimeName = runtimeName;
this.locations = locations;
- state = ActivityState.STOPPED;
this.type = type;
}
-
public IHyracksDataset getResultDataset() {
return hdc;
}
@@ -122,10 +107,6 @@
return false;
}
- public JobId getJobId() {
- return jobId;
- }
-
@Override
public String getStats() {
return stats;
@@ -136,40 +117,6 @@
return statsTimestamp;
}
- public String formatStats(List<String> responses) {
- StringBuilder strBuilder = new StringBuilder();
- strBuilder.append("{\"Stats\": [").append(responses.get(0));
- for (int i = 1; i < responses.size(); i++) {
- strBuilder.append(", ").append(responses.get(i));
- }
- strBuilder.append("]}");
- return strBuilder.toString();
- }
-
- protected synchronized void notifySubscribers(ActiveEvent event) {
- notifyAll();
- Iterator<IActiveEntityEventSubscriber> it = subscribers.iterator();
- while (it.hasNext()) {
- IActiveEntityEventSubscriber subscriber = it.next();
- if (subscriber.isDone()) {
- it.remove();
- } else {
- try {
- subscriber.notify(event);
- } catch (HyracksDataException e) {
- LOGGER.log(Level.WARN, "Failed to notify subscriber", e);
- }
- if (subscriber.isDone()) {
- it.remove();
- }
- }
- }
- }
-
- public AlgebricksAbsolutePartitionConstraint getLocations() {
- return locations;
- }
-
public PrecompiledType getType() {
return type;
}
@@ -214,12 +161,18 @@
// no op
}
+ protected synchronized void setState(ActivityState newState) {
+ LOGGER.info("State of " + getEntityId() + "is being set to " + newState + " from " + state);
+ this.state = newState;
+ notifyAll();
+ }
+
private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Channel Job started for " + entityId);
}
runningInstance++;
- state = ActivityState.RUNNING;
+ setState(ActivityState.RUNNING);
}
private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
@@ -228,10 +181,34 @@
}
runningInstance--;
if (runningInstance == 0) {
- state = ActivityState.STOPPED;
+ setState(ActivityState.STOPPED);
}
}
+ public synchronized void waitWhileAtState(ActivityState undesiredState) throws InterruptedException {
+ while (state == undesiredState) {
+ this.wait();
+ }
+ }
+
+ public synchronized void suspend() throws HyracksDataException, InterruptedException {
+ LOGGER.info("Suspending entity " + entityId);
+ LOGGER.info("Waiting for ongoing activities of " + entityId);
+ waitWhileAtState(ActivityState.RUNNING);
+ LOGGER.info("Proceeding with suspension of " + entityId + ". Current state is " + state);
+ setState(ActivityState.SUSPENDED);
+ LOGGER.info("Successfully Suspended " + entityId);
+ }
+
+ public synchronized void resume() throws HyracksDataException {
+ LOGGER.info("Resuming entity " + entityId);
+ if (state != ActivityState.SUSPENDED) {
+ throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_RESUME_FROM_STATE, entityId, state);
+ }
+ setState(ActivityState.STOPPED);
+ LOGGER.info("Successfully resumed " + entityId);
+ }
+
@Override
public synchronized void subscribe(IActiveEntityEventSubscriber subscriber) throws HyracksDataException {
// no op
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
index 5712539..50d506b 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
@@ -28,10 +28,7 @@
public class Procedure implements IExtensionMetadataEntity {
private static final long serialVersionUID = 1L;
- public static final String LANGUAGE_JAVA = "JAVA";
-
public static final String RETURNTYPE_VOID = "VOID";
- public static final String NOT_APPLICABLE = "N/A";
private final EntityId procedureId;
private final int arity;
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 2d7ba75..4c83dc5 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -161,7 +161,7 @@
}
("period" period = FunctionCallExpr())?
{
- return new CreateProcedureStatement(signature, paramList, paramIds, functionBody, functionBodyExpr, period);
+ return new CreateProcedureStatement(signature, paramList, paramIds, functionBody, period);
}
}
diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADListenerTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADListenerTest.java
new file mode 100644
index 0000000..1cd49e3
--- /dev/null
+++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADListenerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.test;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BADListenerTest {
+
+ private static DeployedJobSpecEventListener djsel;
+
+ private class suspend extends Thread {
+ @Override
+ public void run() {
+ try {
+ djsel.suspend();
+ Thread.sleep(5000);
+ djsel.resume();
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ private class run extends Thread {
+ @Override
+ public void run() {
+ try {
+ djsel.notify(new ActiveEvent(null, ActiveEvent.Kind.JOB_STARTED, null, null));
+ djsel.notify(new ActiveEvent(null, ActiveEvent.Kind.JOB_STARTED, null, null));
+ djsel.notify(new ActiveEvent(null, ActiveEvent.Kind.JOB_FINISHED, null, null));
+ Thread.sleep(5000);
+ djsel.notify(new ActiveEvent(null, ActiveEvent.Kind.JOB_FINISHED, null, null));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ @BeforeClass
+ public static void init() {
+ djsel = new DeployedJobSpecEventListener(null,
+ new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, "test", "test"),
+ DeployedJobSpecEventListener.PrecompiledType.CHANNEL, null, "BadListener");
+ }
+
+ @Test
+ public void DistributedTest() throws Exception {
+ new suspend().run();
+ djsel.waitWhileAtState(ActivityState.SUSPENDED);
+ new run().run();
+ djsel.suspend();
+ }
+
+ @AfterClass
+ public static void deinit() throws Exception {
+
+ }
+}
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.1.ddl.sqlpp
new file mode 100644
index 0000000..819d052
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.1.ddl.sqlpp
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date : Apr 2018
+* Author : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+
+create type UserLocation as {
+ location: circle,
+ userName: string
+};
+
+
+create type EmergencyReport as {
+ reportId: uuid,
+ Etype: string,
+ location: circle
+};
+
+
+create type EmergencyShelter as {
+ shelterName: string,
+ location: point
+};
+
+create dataset UserLocations(UserLocation)
+primary key userName;
+create dataset Shelters(EmergencyShelter)
+primary key shelterName;
+create dataset Reports(EmergencyReport)
+primary key reportId autogenerated;
+
+create index u_location on UserLocations(location) type RTREE;
+
+
+create function RecentEmergenciesNearUser(userName) {
+ (
+ select report, shelters from
+ ( select value r from Reports r)report,
+ UserLocations u
+ let shelters = (select s.location from Shelters s where spatial_intersect(s.location,u.location))
+ where u.userName = userName
+ and spatial_intersect(report.location,u.location)
+ )
+};
+
+create repetitive channel EmergencyChannel using RecentEmergenciesNearUser@1 period duration("PT10S");
+
+create broker brokerA at "http://www.notifyA.com";
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
new file mode 100644
index 0000000..0a38e41
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date : Apr 2018
+* Author : channels Jacobs
+*/
+
+use channels;
+
+insert into EmergencyChannelSubscriptions(
+[
+{"param0" : "w2294u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4321u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t3398u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "w2488u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t3666u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4489u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p78u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p544u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p711u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t2828u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4796u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4082u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4923u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "w2324u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "c1339u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p520u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "c1092u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4979u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "c1487u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4330u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t3682u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p117u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "w1741u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "w2434u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t3833u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "c1373u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p89u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4003u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "c910u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4961u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4475u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "w1960u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p438u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "c1362u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p588u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "c902u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4684u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "c1609u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "c1510u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t3851u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "c1418u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t2559u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "w1815u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4924u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t3320u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p663u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4571u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p781u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "c919u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "c1121u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p814u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4006u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t2822u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4953u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t3486u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t3107u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t2836u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "w2003u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t3256u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4762u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4900u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p357u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t3630u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t3166u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4687u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p817u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4433u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t3426u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p582u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t3388u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4823u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "c1664u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4051u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "c857u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "c1412u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t2521u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t3114u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p404u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p111u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t3006u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t2903u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t2823u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4153u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t2589u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "c1459u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p766u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p593u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p168u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4253u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4177u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p387u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t2571u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "c1513u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p618u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t2735u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t4859u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "w1848u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t3306u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "t2558u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
+{"param0" : "p180u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" }
+]
+);
+
+insert into Shelters (
+[
+{"shelterName" : "s5064" , "location" : point("2437.34,1330.59") },
+{"shelterName" : "s5180" , "location" : point("2479.66,939.05") },
+{"shelterName" : "s5025" , "location" : point("3343.08,1139.29") },
+{"shelterName" : "s5218" , "location" : point("1361.85,795.99") },
+{"shelterName" : "s5199" , "location" : point("3619.88,1245.75") },
+{"shelterName" : "s5167" , "location" : point("2836.86,237.96") },
+{"shelterName" : "s5068" , "location" : point("2580.27,881.71") },
+{"shelterName" : "s5026" , "location" : point("2040.75,816.3") },
+{"shelterName" : "s5080" , "location" : point("2785.14,1090.68") },
+{"shelterName" : "s5190" , "location" : point("1296.09,1256.87") },
+{"shelterName" : "s5126" , "location" : point("1121.37,1478.3") },
+{"shelterName" : "s5075" , "location" : point("2679.51,488.23") },
+{"shelterName" : "s5187" , "location" : point("2544.14,2464.15") },
+{"shelterName" : "s5061" , "location" : point("1674.35,1265.46") },
+{"shelterName" : "s5057" , "location" : point("1677.96,1151.83") },
+{"shelterName" : "s5149" , "location" : point("2236.78,1068.51") },
+{"shelterName" : "s5058" , "location" : point("2919.37,2055.98") },
+{"shelterName" : "s5144" , "location" : point("731.17,1504.74") },
+{"shelterName" : "s5099" , "location" : point("2594.22,1078.11") },
+{"shelterName" : "s5030" , "location" : point("3678.46,978.38") },
+{"shelterName" : "s5050" , "location" : point("2395.61,1332.9") },
+{"shelterName" : "s5209" , "location" : point("3093.56,1833.18") },
+{"shelterName" : "s5052" , "location" : point("3052.36,1076.89") },
+{"shelterName" : "s5208" , "location" : point("1513.18,796.41") },
+{"shelterName" : "s5027" , "location" : point("671.97,2004.72") },
+{"shelterName" : "s5093" , "location" : point("2297.68,1933.79") },
+{"shelterName" : "s5202" , "location" : point("2854.73,1153.14") },
+{"shelterName" : "s5106" , "location" : point("1971.88,737.51") },
+{"shelterName" : "s5192" , "location" : point("2638.68,1688.7") },
+{"shelterName" : "s5128" , "location" : point("1970.48,2569.09") },
+{"shelterName" : "s5098" , "location" : point("2596.02,1499.58") },
+{"shelterName" : "s5038" , "location" : point("2297.07,995.53") },
+{"shelterName" : "s5176" , "location" : point("1570.34,1701.0") },
+{"shelterName" : "s5066" , "location" : point("820.41,2092.07") },
+{"shelterName" : "s5108" , "location" : point("459.09,395.34") },
+{"shelterName" : "s5162" , "location" : point("2543.99,1822.57") },
+{"shelterName" : "s5195" , "location" : point("3190.14,979.74") },
+{"shelterName" : "s5212" , "location" : point("2478.36,921.15") },
+{"shelterName" : "s5217" , "location" : point("2411.71,807.77") },
+{"shelterName" : "s5049" , "location" : point("447.77,1527.37") },
+{"shelterName" : "s5163" , "location" : point("2356.91,1221.63") },
+{"shelterName" : "s5048" , "location" : point("1495.47,1597.15") },
+{"shelterName" : "s5154" , "location" : point("1503.31,884.66") },
+{"shelterName" : "s5116" , "location" : point("1359.98,314.68") },
+{"shelterName" : "s5143" , "location" : point("3000.77,1080.81") },
+{"shelterName" : "s5073" , "location" : point("2404.16,1516.85") },
+{"shelterName" : "s5032" , "location" : point("2370.75,1879.99") },
+{"shelterName" : "s5152" , "location" : point("1091.19,706.09") },
+{"shelterName" : "s5044" , "location" : point("2395.55,1378.92") },
+{"shelterName" : "s5081" , "location" : point("2057.88,838.33") },
+{"shelterName" : "s5130" , "location" : point("1511.27,1082.85") },
+{"shelterName" : "s5142" , "location" : point("2634.22,1631.85") },
+{"shelterName" : "s5071" , "location" : point("1799.13,2130.04") },
+{"shelterName" : "s5051" , "location" : point("2414.52,807.3") },
+{"shelterName" : "s5147" , "location" : point("1960.71,976.11") },
+{"shelterName" : "s5021" , "location" : point("2659.55,1957.04") },
+{"shelterName" : "s5139" , "location" : point("1863.26,1216.29") },
+{"shelterName" : "s5062" , "location" : point("937.48,1608.88") },
+{"shelterName" : "s5168" , "location" : point("2245.27,1012.57") },
+{"shelterName" : "s5196" , "location" : point("2341.23,1883.0") },
+{"shelterName" : "s5119" , "location" : point("1702.71,2557.47") },
+{"shelterName" : "s5150" , "location" : point("1613.07,1804.66") },
+{"shelterName" : "s5171" , "location" : point("2789.6,931.73") },
+{"shelterName" : "s5102" , "location" : point("3483.61,1156.18") },
+{"shelterName" : "s5091" , "location" : point("1081.51,1596.95") },
+{"shelterName" : "s5132" , "location" : point("2099.11,1086.28") },
+{"shelterName" : "s5104" , "location" : point("2896.78,461.73") },
+{"shelterName" : "s5022" , "location" : point("1920.77,1657.89") },
+{"shelterName" : "s5219" , "location" : point("2913.07,954.89") },
+{"shelterName" : "s5088" , "location" : point("1831.54,2241.22") },
+{"shelterName" : "s5166" , "location" : point("1087.15,2048.13") },
+{"shelterName" : "s5203" , "location" : point("2543.5,1815.38") },
+{"shelterName" : "s5136" , "location" : point("2503.29,1270.07") },
+{"shelterName" : "s5194" , "location" : point("1595.1,1634.0") },
+{"shelterName" : "s5060" , "location" : point("2230.74,818.1") },
+{"shelterName" : "s5127" , "location" : point("2567.07,1104.86") },
+{"shelterName" : "s5092" , "location" : point("1732.18,1170.23") },
+{"shelterName" : "s5124" , "location" : point("2456.58,983.76") },
+{"shelterName" : "s5201" , "location" : point("1875.2,1300.76") },
+{"shelterName" : "s5029" , "location" : point("2581.92,690.14") },
+{"shelterName" : "s5146" , "location" : point("2437.06,2491.18") },
+{"shelterName" : "s5074" , "location" : point("1761.92,2035.44") },
+{"shelterName" : "s5173" , "location" : point("1000.01,1488.16") },
+{"shelterName" : "s5039" , "location" : point("2604.75,630.95") },
+{"shelterName" : "s5020" , "location" : point("1920.62,670.07") },
+{"shelterName" : "s5120" , "location" : point("1562.27,1045.02") },
+{"shelterName" : "s5083" , "location" : point("964.22,1606.66") },
+{"shelterName" : "s5122" , "location" : point("2253.13,1556.55") },
+{"shelterName" : "s5103" , "location" : point("2023.99,2505.02") },
+{"shelterName" : "s5155" , "location" : point("2996.58,390.66") },
+{"shelterName" : "s5076" , "location" : point("1025.57,515.66") },
+{"shelterName" : "s5086" , "location" : point("2384.13,886.5") },
+{"shelterName" : "s5053" , "location" : point("1173.98,2173.29") },
+{"shelterName" : "s5216" , "location" : point("2865.64,1182.0") },
+{"shelterName" : "s5065" , "location" : point("2633.42,574.61") },
+{"shelterName" : "s5055" , "location" : point("1236.87,876.11") },
+{"shelterName" : "s5215" , "location" : point("2272.8,1115.83") },
+{"shelterName" : "s5210" , "location" : point("1314.22,729.82") },
+{"shelterName" : "s5200" , "location" : point("1776.8,1176.29") },
+{"shelterName" : "s5165" , "location" : point("3485.44,980.34") },
+{"shelterName" : "s5042" , "location" : point("1812.4,1252.84") }
+]
+);
+
+insert into UserLocations (
+[
+{"userName" : "w2294u1" , "location" : circle("2683.3,480.84 100.0")},
+{"userName" : "t4321u1" , "location" : circle("1990.77,754.24 100.0")},
+{"userName" : "t3398u1" , "location" : circle("2791.2,962.92 100.0")},
+{"userName" : "w2488u1" , "location" : circle("2040.19,767.35 100.0")},
+{"userName" : "t3666u1" , "location" : circle("3968.68,1308.04 100.0")},
+{"userName" : "t4489u1" , "location" : circle("1713.82,252.6 100.0")},
+{"userName" : "p78u1" , "location" : circle("2588.34,735.72 100.0")},
+{"userName" : "p544u1" , "location" : circle("1993.69,1465.29 100.0")},
+{"userName" : "p711u1" , "location" : circle("3221.64,1062.22 100.0")},
+{"userName" : "t2828u1" , "location" : circle("2534.49,978.95 100.0")},
+{"userName" : "t4796u1" , "location" : circle("1752.58,323.55 100.0")},
+{"userName" : "t4082u1" , "location" : circle("3571.17,1213.78 100.0")},
+{"userName" : "t4923u1" , "location" : circle("1878.19,587.26 100.0")},
+{"userName" : "w2324u1" , "location" : circle("2851.94,1429.02 100.0")},
+{"userName" : "c1339u1" , "location" : circle("2881.61,956.95 100.0")},
+{"userName" : "p520u1" , "location" : circle("2927.9,986.41 100.0")},
+{"userName" : "c1092u1" , "location" : circle("1670.02,1019.43 100.0")},
+{"userName" : "t4979u1" , "location" : circle("1856.46,543.47 100.0")},
+{"userName" : "c1487u1" , "location" : circle("1757.21,745.68 100.0")},
+{"userName" : "t4330u1" , "location" : circle("1596.34,59.37 100.0")},
+{"userName" : "t3682u1" , "location" : circle("3557.22,1204.63 100.0")},
+{"userName" : "p117u1" , "location" : circle("1717.46,1166.14 100.0")},
+{"userName" : "w1741u1" , "location" : circle("2517.33,980.01 100.0")},
+{"userName" : "w2434u1" , "location" : circle("527.29,1520.52 100.0")},
+{"userName" : "t3833u1" , "location" : circle("3521.17,1180.97 100.0")},
+{"userName" : "c1373u1" , "location" : circle("1733.98,742.36 100.0")},
+{"userName" : "p89u1" , "location" : circle("3768.73,1013.03 100.0")},
+{"userName" : "t4003u1" , "location" : circle("2945.36,952.79 100.0")},
+{"userName" : "c910u1" , "location" : circle("1124.86,1275.91 100.0")},
+{"userName" : "t4961u1" , "location" : circle("1575.25,0.0 100.0")},
+{"userName" : "t4475u1" , "location" : circle("1607.59,79.94 100.0")},
+{"userName" : "w1960u1" , "location" : circle("1768.23,2263.45 100.0")},
+{"userName" : "p438u1" , "location" : circle("898.13,238.92 100.0")},
+{"userName" : "c1362u1" , "location" : circle("1438.64,400.67 100.0")},
+{"userName" : "p588u1" , "location" : circle("3206.01,1052.3 100.0")},
+{"userName" : "c902u1" , "location" : circle("1615.16,1251.39 100.0")},
+{"userName" : "t4684u1" , "location" : circle("1876.6,584.06 100.0")},
+{"userName" : "c1609u1" , "location" : circle("2320.02,725.01 100.0")},
+{"userName" : "c1510u1" , "location" : circle("2256.6,816.78 100.0")},
+{"userName" : "t3851u1" , "location" : circle("3807.01,1304.1 100.0")},
+{"userName" : "c1418u1" , "location" : circle("2273.25,815.93 100.0")},
+{"userName" : "t2559u1" , "location" : circle("2297.49,1887.46 100.0")},
+{"userName" : "w1815u1" , "location" : circle("1539.89,343.08 100.0")},
+{"userName" : "t4924u1" , "location" : circle("1609.65,83.72 100.0")},
+{"userName" : "t3320u1" , "location" : circle("2322.45,1277.89 100.0")},
+{"userName" : "p663u1" , "location" : circle("2291.38,254.83 100.0")},
+{"userName" : "t4571u1" , "location" : circle("2253.78,1090.84 100.0")},
+{"userName" : "p781u1" , "location" : circle("1154.4,712.8 100.0")},
+{"userName" : "c919u1" , "location" : circle("1721.46,485.13 100.0")},
+{"userName" : "c1121u1" , "location" : circle("4171.58,1083.41 100.0")},
+{"userName" : "p814u1" , "location" : circle("2176.13,990.55 100.0")},
+{"userName" : "t4006u1" , "location" : circle("3977.9,1303.22 100.0")},
+{"userName" : "t2822u1" , "location" : circle("3087.51,1849.01 100.0")},
+{"userName" : "t4953u1" , "location" : circle("1923.91,676.56 100.0")},
+{"userName" : "t3486u1" , "location" : circle("3832.72,1320.87 100.0")},
+{"userName" : "t3107u1" , "location" : circle("2994.64,2023.92 100.0")},
+{"userName" : "t2836u1" , "location" : circle("2307.27,2096.07 100.0")},
+{"userName" : "w2003u1" , "location" : circle("1976.08,2279.1 100.0")},
+{"userName" : "t3256u1" , "location" : circle("2161.32,1700.73 100.0")},
+{"userName" : "t4762u1" , "location" : circle("1916.88,662.71 100.0")},
+{"userName" : "t4900u1" , "location" : circle("2253.78,1090.84 100.0")},
+{"userName" : "p357u1" , "location" : circle("1699.66,1976.29 100.0")},
+{"userName" : "t3630u1" , "location" : circle("3778.3,1285.37 100.0")},
+{"userName" : "t3166u1" , "location" : circle("2743.85,965.95 100.0")},
+{"userName" : "t4687u1" , "location" : circle("1798.2,426.06 100.0")},
+{"userName" : "p817u1" , "location" : circle("1446.92,909.48 100.0")},
+{"userName" : "t4433u1" , "location" : circle("1805.69,441.15 100.0")},
+{"userName" : "t3426u1" , "location" : circle("3055.08,945.53 100.0")},
+{"userName" : "p582u1" , "location" : circle("1523.47,739.18 100.0")},
+{"userName" : "t3388u1" , "location" : circle("2919.63,954.46 100.0")},
+{"userName" : "t4823u1" , "location" : circle("2174.09,987.9 100.0")},
+{"userName" : "c1664u1" , "location" : circle("1283.46,1099.95 100.0")},
+{"userName" : "t4051u1" , "location" : circle("4047.21,1215.22 100.0")},
+{"userName" : "c857u1" , "location" : circle("1270.33,664.46 100.0")},
+{"userName" : "c1412u1" , "location" : circle("1279.63,1731.15 100.0")},
+{"userName" : "t2521u1" , "location" : circle("2850.02,1040.64 100.0")},
+{"userName" : "t3114u1" , "location" : circle("2294.53,995.82 100.0")},
+{"userName" : "p404u1" , "location" : circle("3789.35,1022.15 100.0")},
+{"userName" : "p111u1" , "location" : circle("2054.96,904.61 100.0")},
+{"userName" : "t3006u1" , "location" : circle("3095.66,1781.54 100.0")},
+{"userName" : "t2903u1" , "location" : circle("2449.83,984.17 100.0")},
+{"userName" : "t2823u1" , "location" : circle("2116.91,1638.54 100.0")},
+{"userName" : "t4153u1" , "location" : circle("3977.9,1303.22 100.0")},
+{"userName" : "t2589u1" , "location" : circle("2075.58,1455.78 100.0")},
+{"userName" : "c1459u1" , "location" : circle("2149.15,1104.3 100.0")},
+{"userName" : "p766u1" , "location" : circle("3144.94,1042.37 100.0")},
+{"userName" : "p593u1" , "location" : circle("3460.96,1141.22 100.0")},
+{"userName" : "p168u1" , "location" : circle("1719.0,2688.66 100.0")},
+{"userName" : "t4253u1" , "location" : circle("1575.25,0.0 100.0")},
+{"userName" : "t4177u1" , "location" : circle("2185.18,1002.25 100.0")},
+{"userName" : "p387u1" , "location" : circle("2351.74,811.8 100.0")},
+{"userName" : "t2571u1" , "location" : circle("2307.49,1915.48 100.0")},
+{"userName" : "c1513u1" , "location" : circle("3432.64,1067.27 100.0")},
+{"userName" : "p618u1" , "location" : circle("1682.87,1962.44 100.0")},
+{"userName" : "t2735u1" , "location" : circle("2999.73,1635.76 100.0")},
+{"userName" : "t4859u1" , "location" : circle("1763.96,348.87 100.0")},
+{"userName" : "w1848u1" , "location" : circle("2675.62,2150.98 100.0")},
+{"userName" : "t3306u1" , "location" : circle("2274.48,1312.75 100.0")},
+{"userName" : "t2558u1" , "location" : circle("2254.43,2174.64 100.0")},
+{"userName" : "p180u1" , "location" : circle("1253.66,1771.06 100.0")}
+]
+);
+
+insert into Reports
+(
+[
+{"Etype" : "flood" , "location" : circle("846.5,2589.56 1000.0")},
+{"Etype" : "crash" , "location" : circle("953.48,2504.12 100.0")},
+{"Etype" : "flood" , "location" : circle("2313.19,1641.15 1000.0")},
+{"Etype" : "fire" , "location" : circle("3014.66,2332.34 500.0")},
+{"Etype" : "flood" , "location" : circle("1188.75,2307.52 1000.0")},
+{"Etype" : "fire" , "location" : circle("3418.08,1090.2 500.0")},
+{"Etype" : "fire" , "location" : circle("1364.9,1434.81 500.0")},
+{"Etype" : "storm" , "location" : circle("1164.65,2088.5 2000.0")},
+{"Etype" : "flood" , "location" : circle("3582.04,974.89 1000.0")},
+{"Etype" : "flood" , "location" : circle("1016.15,846.6 1000.0")},
+{"Etype" : "flood" , "location" : circle("1416.4,1483.13 1000.0")},
+{"Etype" : "flood" , "location" : circle("2777.23,963.83 1000.0")},
+{"Etype" : "fire" , "location" : circle("3082.74,1746.62 500.0")},
+{"Etype" : "storm" , "location" : circle("1186.15,2283.96 2000.0")},
+{"Etype" : "crash" , "location" : circle("1218.17,1591.68 100.0")},
+{"Etype" : "fire" , "location" : circle("1141.24,1474.73 500.0")},
+{"Etype" : "flood" , "location" : circle("1105.73,1875.44 1000.0")},
+{"Etype" : "flood" , "location" : circle("1805.37,2346.03 1000.0")},
+{"Etype" : "flood" , "location" : circle("1535.83,1546.66 1000.0")},
+{"Etype" : "fire" , "location" : circle("4187.75,1064.17 500.0")}
+]
+);
+
+insert into Reports
+(
+[
+{"Etype" : "flood" , "location" : circle("846.5,2589.56 1000.0")},
+{"Etype" : "crash" , "location" : circle("953.48,2504.12 100.0")},
+{"Etype" : "flood" , "location" : circle("2313.19,1641.15 1000.0")},
+{"Etype" : "fire" , "location" : circle("3014.66,2332.34 500.0")},
+{"Etype" : "flood" , "location" : circle("1188.75,2307.52 1000.0")},
+{"Etype" : "fire" , "location" : circle("3418.08,1090.2 500.0")},
+{"Etype" : "fire" , "location" : circle("1364.9,1434.81 500.0")},
+{"Etype" : "storm" , "location" : circle("1164.65,2088.5 2000.0")},
+{"Etype" : "flood" , "location" : circle("3582.04,974.89 1000.0")},
+{"Etype" : "flood" , "location" : circle("1016.15,846.6 1000.0")},
+{"Etype" : "flood" , "location" : circle("1416.4,1483.13 1000.0")},
+{"Etype" : "flood" , "location" : circle("2777.23,963.83 1000.0")},
+{"Etype" : "fire" , "location" : circle("3082.74,1746.62 500.0")},
+{"Etype" : "storm" , "location" : circle("1186.15,2283.96 2000.0")},
+{"Etype" : "crash" , "location" : circle("1218.17,1591.68 100.0")},
+{"Etype" : "fire" , "location" : circle("1141.24,1474.73 500.0")},
+{"Etype" : "flood" , "location" : circle("1105.73,1875.44 1000.0")},
+{"Etype" : "flood" , "location" : circle("1805.37,2346.03 1000.0")},
+{"Etype" : "flood" , "location" : circle("1535.83,1546.66 1000.0")},
+{"Etype" : "fire" , "location" : circle("4187.75,1064.17 500.0")}
+]
+);
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.3.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.3.update.sqlpp
new file mode 100644
index 0000000..d0f65e5
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.3.update.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date : Apr 2018
+* Author : Steven Jacobs
+*/
+
+use channels;
+
+create index delivery on EmergencyChannelResults(deliveryTime);
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp
new file mode 100644
index 0000000..5f764b3
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date : Apr 2018
+* Author : Steven Jacobs
+*/
+
+15000
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.query.sqlpp
new file mode 100644
index 0000000..dd6e1ca
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date : Apr 2018
+* Author : Steven Jacobs
+*/
+
+use channels;
+
+select value array_count(
+(select * from EmergencyChannelResults where deliveryTime > datetime("2017-05-02T17:52:59.570Z")));
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.6.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.6.ddl.sqlpp
new file mode 100644
index 0000000..54075e5
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.6.ddl.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date : Apr 2018
+* Author : Steven Jacobs
+*/
+
+use channels;
+
+drop channel EmergencyChannel;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_index/drop_index.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_index/drop_index.1.ddl.sqlpp
new file mode 100644
index 0000000..ca01dd7
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_index/drop_index.1.ddl.sqlpp
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Drop Function Dataset Index
+* Expected Res : Error
+* Date : Jan 2018
+* Author : Steven Jacobs
+*/
+
+drop dataverse two if exists;
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+
+create type UserLocation as {
+ location: circle,
+ userName: string,
+ timeStamp: datetime
+};
+
+
+create type UserLocationFeedType as {
+ location: circle,
+ userName: string
+};
+
+create type EmergencyReport as {
+ reportId: uuid,
+ Etype: string,
+ location: circle,
+ timeStamp: datetime
+};
+
+create type EmergencyReportFeedType as {
+ Etype: string,
+ location: circle
+};
+
+
+create type EmergencyShelter as {
+ shelterName: string,
+ location: point
+};
+
+create dataset UserLocations(UserLocation)
+primary key userName;
+create dataset Shelters(EmergencyShelter)
+primary key shelterName;
+create dataset Reports(EmergencyReport)
+primary key reportId autogenerated;
+
+create index location_time on UserLocations(timeStamp);
+create index u_location on UserLocations(location) type RTREE;
+create index s_location on Shelters(location) type RTREE;
+create index report_time on Reports(timeStamp);
+
+create function RecentEmergenciesNearUser(userName) {
+ (
+ select report, shelters from
+ ( select value r from Reports r where r.timeStamp >
+ current_datetime() - day_time_duration("PT10S"))report,
+ UserLocations u
+ let shelters = (select s.location from Shelters s where spatial_intersect(s.location,u.location))
+ where u.userName = userName
+ and spatial_intersect(report.location,u.location)
+ )
+};
+
+create repetitive channel EmergencyChannel using RecentEmergenciesNearUser@1 period duration("PT10S");
+
+use channels;
+drop index Shelters.s_location;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/add_index/add_index.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/add_index/add_index.1.adm
new file mode 100644
index 0000000..2e9bdd9
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/add_index/add_index.1.adm
@@ -0,0 +1 @@
+1074
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
index bee9157..225d83f 100644
--- a/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
@@ -1 +1 @@
-{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel", "SubscriptionsDatasetName": "nearbyTweetChannelSubscriptions", "ResultsDatasetName": "nearbyTweetChannelResults", "Function": [ "channels", "NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ "channels", "nearbyTweetChannelResults" ], [ "channels", "nearbyTweetChannelSubscriptions" ] ], [ [ "channels", "NearbyTweetsContainingText", "2" ] ] ] }
\ No newline at end of file
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel", "SubscriptionsDatasetName": "nearbyTweetChannelSubscriptions", "ResultsDatasetName": "nearbyTweetChannelResults", "Function": [ "channels", "NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ "channels", "nearbyTweetChannelResults" ], [ "channels", "nearbyTweetChannelSubscriptions" ] ], [ [ "channels", "NearbyTweetsContainingText", "2" ] ] ], "Body": "SET inline_with \"false\";\ninsert into channels.nearbyTweetChannelResults as a (\nwith channelExecutionTime as current_datetime() \nselect result, channelExecutionTime, sub.subscriptionId as subscriptionId,current_datetime() as deliveryTime\nfrom channels.nearbyTweetChannelSubscriptions sub,\nMetadata.Broker b, \nchannels.NearbyTweetsContainingText(sub.param0,sub.param1) result \nwhere b.BrokerName = sub.BrokerName\nand b.DataverseName = sub.DataverseName\n) returning a;" }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
index 1c492ac..8d8899d 100644
--- a/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
@@ -1,2 +1,2 @@
-{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel1", "SubscriptionsDatasetName": "nearbyTweetChannel1Subscriptions", "ResultsDatasetName": "nearbyTweetChannel1Results", "Function": [ "channels", "NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ "channels", "nearbyTweetChannel1Results" ], [ "channels", "nearbyTweetChannel1Subscriptions" ] ], [ [ "channels", "NearbyTweetsContainingText", "2" ] ] ] }
-{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel3", "SubscriptionsDatasetName": "nearbyTweetChannel3Subscriptions", "ResultsDatasetName": "nearbyTweetChannel3Results", "Function": [ "channels", "NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ "channels", "nearbyTweetChannel3Results" ], [ "channels", "nearbyTweetChannel3Subscriptions" ] ], [ [ "channels", "NearbyTweetsContainingText", "2" ] ] ] }
\ No newline at end of file
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel1", "SubscriptionsDatasetName": "nearbyTweetChannel1Subscriptions", "ResultsDatasetName": "nearbyTweetChannel1Results", "Function": [ "channels", "NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ "channels", "nearbyTweetChannel1Results" ], [ "channels", "nearbyTweetChannel1Subscriptions" ] ], [ [ "channels", "NearbyTweetsContainingText", "2" ] ] ], "Body": "SET inline_with \"false\";\ninsert into channels.nearbyTweetChannel1Results as a (\nwith channelExecutionTime as current_datetime() \nselect result, channelExecutionTime, sub.subscriptionId as subscriptionId,current_datetime() as deliveryTime\nfrom channels.nearbyTweetChannel1Subscriptions sub,\nMetadata.Broker b, \nchannels.NearbyTweetsContainingText(sub.param0,sub.param1) result \nwhere b.BrokerName = sub.BrokerName\nand b.DataverseName = sub.DataverseName\n) returning a;" }
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel3", "SubscriptionsDatasetName": "nearbyTweetChannel3Subscriptions", "ResultsDatasetName": "nearbyTweetChannel3Results", "Function": [ "channels", "NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ "channels", "nearbyTweetChannel3Results" ], [ "channels", "nearbyTweetChannel3Subscriptions" ] ], [ [ "channels", "NearbyTweetsContainingText", "2" ] ] ], "Body": "SET inline_with \"false\";\ninsert into channels.nearbyTweetChannel3Results as a (\nwith channelExecutionTime as current_datetime() \nselect result, channelExecutionTime, sub.subscriptionId as subscriptionId,current_datetime() as deliveryTime\nfrom channels.nearbyTweetChannel3Subscriptions sub,\nMetadata.Broker b, \nchannels.NearbyTweetsContainingText(sub.param0,sub.param1) result \nwhere b.BrokerName = sub.BrokerName\nand b.DataverseName = sub.DataverseName\n) returning a;" }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.adm b/asterix-bad/src/test/resources/runtimets/results/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.adm
index 4308c83..c41aec1 100644
--- a/asterix-bad/src/test/resources/runtimets/results/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.adm
+++ b/asterix-bad/src/test/resources/runtimets/results/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.adm
@@ -1,6 +1,6 @@
-{ "DataverseName": "two", "ProcedureName": "addMe", "Arity": "0", "Params": [ ], "ReturnType": "VOID", "Definition": "insert into channels.UserLocations([\n {\"timeStamp\":current_datetime(), \"roomNumber\":222}]\n )", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ ] ] }
-{ "DataverseName": "two", "ProcedureName": "deleteSome", "Arity": "2", "Params": [ "$r", "$otherRoom" ], "ReturnType": "VOID", "Definition": "delete from channels.UserLocations\nwhere roomNumber = r\nor roomNumber = otherRoom\nand channels.really_contains(roomNumber,\"l\")", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ [ "channels", "really_contains", "2" ] ] ] }
-{ "DataverseName": "two", "ProcedureName": "localAddMe", "Arity": "0", "Params": [ ], "ReturnType": "VOID", "Definition": "insert into UserLocations([\n {\"timeStamp\":current_datetime(), \"roomNumber\":222}]\n )", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ ] ] }
-{ "DataverseName": "two", "ProcedureName": "localDeleteSome", "Arity": "2", "Params": [ "$r", "$otherRoom" ], "ReturnType": "VOID", "Definition": "delete from UserLocations\nwhere roomNumber = r\nor roomNumber = otherRoom\nand really_contains(roomNumber,\"l\")", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ [ "two", "really_contains", "2" ] ] ] }
-{ "DataverseName": "two", "ProcedureName": "localSelectSome", "Arity": "2", "Params": [ "$r", "$otherRoom" ], "ReturnType": "VOID", "Definition": "select roomNumber from UserLocations\nwhere roomNumber = r\nor roomNumber = otherRoom\nand really_contains(roomNumber,\"l\")\norder by id", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ [ "two", "really_contains", "2" ] ] ] }
-{ "DataverseName": "two", "ProcedureName": "selectSome", "Arity": "2", "Params": [ "$r", "$otherRoom" ], "ReturnType": "VOID", "Definition": "select roomNumber from channels.UserLocations\nwhere roomNumber = r\nor roomNumber = otherRoom\nand channels.really_contains(roomNumber,\"l\")\norder by id", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ [ "channels", "really_contains", "2" ] ] ] }
\ No newline at end of file
+{ "DataverseName": "two", "ProcedureName": "addMe", "Arity": "0", "Params": [ ], "ReturnType": "VOID", "Definition": "use two;\ninsert into channels.UserLocations([\n {\"timeStamp\":current_datetime(), \"roomNumber\":222}]\n );", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ ] ] }
+{ "DataverseName": "two", "ProcedureName": "deleteSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "ReturnType": "VOID", "Definition": "use two;\ndelete from channels.UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand channels.really_contains(roomNumber,\"l\");", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ [ "channels", "really_contains", "2" ], [ "two", "get_job_param", "1" ], [ "two", "get_job_param", "1" ] ] ] }
+{ "DataverseName": "two", "ProcedureName": "localAddMe", "Arity": "0", "Params": [ ], "ReturnType": "VOID", "Definition": "use two;\ninsert into UserLocations([\n {\"timeStamp\":current_datetime(), \"roomNumber\":222}]\n );", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ ] ] }
+{ "DataverseName": "two", "ProcedureName": "localDeleteSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "ReturnType": "VOID", "Definition": "use two;\ndelete from UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand really_contains(roomNumber,\"l\");", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ [ "two", "get_job_param", "1" ], [ "two", "really_contains", "2" ], [ "two", "get_job_param", "1" ] ] ] }
+{ "DataverseName": "two", "ProcedureName": "localSelectSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "ReturnType": "VOID", "Definition": "use two;\nselect roomNumber from UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand really_contains(roomNumber,\"l\")\norder by id;", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ [ "two", "get_job_param", "1" ], [ "two", "really_contains", "2" ], [ "two", "get_job_param", "1" ] ] ] }
+{ "DataverseName": "two", "ProcedureName": "selectSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "ReturnType": "VOID", "Definition": "use two;\nselect roomNumber from channels.UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand channels.really_contains(roomNumber,\"l\")\norder by id;", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ [ "channels", "really_contains", "2" ], [ "two", "get_job_param", "1" ], [ "two", "get_job_param", "1" ] ] ] }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/testsuite.xml b/asterix-bad/src/test/resources/runtimets/testsuite.xml
index 3c72a14..4640af1 100644
--- a/asterix-bad/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-bad/src/test/resources/runtimets/testsuite.xml
@@ -135,6 +135,17 @@
</compilation-unit>
</test-case>
<test-case FilePath="channel">
+ <compilation-unit name="drop_index">
+ <output-dir compare="Text">drop_index</output-dir>
+ <expected-error>Cannot drop index. channels.EmergencyChannel(Channel) depends on it!</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="channel">
+ <compilation-unit name="add_index">
+ <output-dir compare="Text">add_index</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="channel">
<compilation-unit name="drop_results">
<output-dir compare="Text">drop_results</output-dir>
<expected-error>Cannot alter dataset two.nearbyTweetChannelResults. two.nearbyTweetChannel(Channel) depends on it!</expected-error>